Merge branch 'for-upstream' of git://git.kernel.org/pub/scm/linux/kernel/git/bluetoot...
[cascardo/linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 # include <linux/libcfs/libcfs.h>
45
46 #include <lustre_dlm.h>
47 #include <obd_class.h>
48 #include <linux/list.h>
49 #include "ldlm_internal.h"
50
51 static int ldlm_num_threads;
52 CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
53                 "number of DLM service threads to start");
54
55 static char *ldlm_cpts;
56 CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
57                 "CPU partitions ldlm threads should run on");
58
59 extern struct kmem_cache *ldlm_resource_slab;
60 extern struct kmem_cache *ldlm_lock_slab;
61 static struct mutex     ldlm_ref_mutex;
62 static int ldlm_refcount;
63
64 struct ldlm_cb_async_args {
65         struct ldlm_cb_set_arg *ca_set_arg;
66         struct ldlm_lock       *ca_lock;
67 };
68
69 /* LDLM state */
70
71 static struct ldlm_state *ldlm_state;
72
73 inline cfs_time_t round_timeout(cfs_time_t timeout)
74 {
75         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
76 }
77
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
80 {
81         /* Non-AT value */
82         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
83
84         return timeout < 1 ? 1 : timeout;
85 }
86
87 #define ELT_STOPPED   0
88 #define ELT_READY     1
89 #define ELT_TERMINATE 2
90
91 struct ldlm_bl_pool {
92         spinlock_t              blp_lock;
93
94         /*
95          * blp_prio_list is used for callbacks that should be handled
96          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
97          * see bug 13843
98          */
99         struct list_head              blp_prio_list;
100
101         /*
102          * blp_list is used for all other callbacks which are likely
103          * to take longer to process.
104          */
105         struct list_head              blp_list;
106
107         wait_queue_head_t            blp_waitq;
108         struct completion       blp_comp;
109         atomic_t            blp_num_threads;
110         atomic_t            blp_busy_threads;
111         int                  blp_min_threads;
112         int                  blp_max_threads;
113 };
114
115 struct ldlm_bl_work_item {
116         struct list_head              blwi_entry;
117         struct ldlm_namespace  *blwi_ns;
118         struct ldlm_lock_desc   blwi_ld;
119         struct ldlm_lock       *blwi_lock;
120         struct list_head              blwi_head;
121         int                  blwi_count;
122         struct completion       blwi_comp;
123         ldlm_cancel_flags_t     blwi_flags;
124         int                  blwi_mem_pressure;
125 };
126
127
128 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
129 {
130         return 0;
131 }
132
133 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
134 {
135         return 0;
136 }
137
138
139
140 /**
141  * Callback handler for receiving incoming blocking ASTs.
142  *
143  * This can only happen on client side.
144  */
145 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
146                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
147 {
148         int do_ast;
149
150         LDLM_DEBUG(lock, "client blocking AST callback handler");
151
152         lock_res_and_lock(lock);
153         lock->l_flags |= LDLM_FL_CBPENDING;
154
155         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
156                 lock->l_flags |= LDLM_FL_CANCEL;
157
158         do_ast = (!lock->l_readers && !lock->l_writers);
159         unlock_res_and_lock(lock);
160
161         if (do_ast) {
162                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
163                        lock, lock->l_blocking_ast);
164                 if (lock->l_blocking_ast != NULL)
165                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
166                                              LDLM_CB_BLOCKING);
167         } else {
168                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
169                        lock);
170         }
171
172         LDLM_DEBUG(lock, "client blocking callback handler END");
173         LDLM_LOCK_RELEASE(lock);
174 }
175
176 /**
177  * Callback handler for receiving incoming completion ASTs.
178  *
179  * This only can happen on client side.
180  */
181 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
182                                     struct ldlm_namespace *ns,
183                                     struct ldlm_request *dlm_req,
184                                     struct ldlm_lock *lock)
185 {
186         int lvb_len;
187         LIST_HEAD(ast_list);
188         int rc = 0;
189
190         LDLM_DEBUG(lock, "client completion callback handler START");
191
192         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
193                 int to = cfs_time_seconds(1);
194                 while (to > 0) {
195                         schedule_timeout_and_set_state(
196                                 TASK_INTERRUPTIBLE, to);
197                         if (lock->l_granted_mode == lock->l_req_mode ||
198                             lock->l_flags & LDLM_FL_DESTROYED)
199                                 break;
200                 }
201         }
202
203         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
204         if (lvb_len < 0) {
205                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
206                 GOTO(out, rc = lvb_len);
207         } else if (lvb_len > 0) {
208                 if (lock->l_lvb_len > 0) {
209                         /* for extent lock, lvb contains ost_lvb{}. */
210                         LASSERT(lock->l_lvb_data != NULL);
211
212                         if (unlikely(lock->l_lvb_len < lvb_len)) {
213                                 LDLM_ERROR(lock, "Replied LVB is larger than "
214                                            "expectation, expected = %d, "
215                                            "replied = %d",
216                                            lock->l_lvb_len, lvb_len);
217                                 GOTO(out, rc = -EINVAL);
218                         }
219                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
220                                                      * variable length */
221                         void *lvb_data;
222
223                         OBD_ALLOC(lvb_data, lvb_len);
224                         if (lvb_data == NULL) {
225                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
226                                 GOTO(out, rc = -ENOMEM);
227                         }
228
229                         lock_res_and_lock(lock);
230                         LASSERT(lock->l_lvb_data == NULL);
231                         lock->l_lvb_data = lvb_data;
232                         lock->l_lvb_len = lvb_len;
233                         unlock_res_and_lock(lock);
234                 }
235         }
236
237         lock_res_and_lock(lock);
238         if ((lock->l_flags & LDLM_FL_DESTROYED) ||
239             lock->l_granted_mode == lock->l_req_mode) {
240                 /* bug 11300: the lock has already been granted */
241                 unlock_res_and_lock(lock);
242                 LDLM_DEBUG(lock, "Double grant race happened");
243                 GOTO(out, rc = 0);
244         }
245
246         /* If we receive the completion AST before the actual enqueue returned,
247          * then we might need to switch lock modes, resources, or extents. */
248         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
249                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
250                 LDLM_DEBUG(lock, "completion AST, new lock mode");
251         }
252
253         if (lock->l_resource->lr_type != LDLM_PLAIN) {
254                 ldlm_convert_policy_to_local(req->rq_export,
255                                           dlm_req->lock_desc.l_resource.lr_type,
256                                           &dlm_req->lock_desc.l_policy_data,
257                                           &lock->l_policy_data);
258                 LDLM_DEBUG(lock, "completion AST, new policy data");
259         }
260
261         ldlm_resource_unlink_lock(lock);
262         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
263                    &lock->l_resource->lr_name,
264                    sizeof(lock->l_resource->lr_name)) != 0) {
265                 unlock_res_and_lock(lock);
266                 rc = ldlm_lock_change_resource(ns, lock,
267                                 &dlm_req->lock_desc.l_resource.lr_name);
268                 if (rc < 0) {
269                         LDLM_ERROR(lock, "Failed to allocate resource");
270                         GOTO(out, rc);
271                 }
272                 LDLM_DEBUG(lock, "completion AST, new resource");
273                 CERROR("change resource!\n");
274                 lock_res_and_lock(lock);
275         }
276
277         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
278                 /* BL_AST locks are not needed in LRU.
279                  * Let ldlm_cancel_lru() be fast. */
280                 ldlm_lock_remove_from_lru(lock);
281                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
282                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
283         }
284
285         if (lock->l_lvb_len > 0) {
286                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
287                                    lock->l_lvb_data, lvb_len);
288                 if (rc < 0) {
289                         unlock_res_and_lock(lock);
290                         GOTO(out, rc);
291                 }
292         }
293
294         ldlm_grant_lock(lock, &ast_list);
295         unlock_res_and_lock(lock);
296
297         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
298
299         /* Let Enqueue to call osc_lock_upcall() and initialize
300          * l_ast_data */
301         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
302
303         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
304
305         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
306                           lock);
307         GOTO(out, rc);
308
309 out:
310         if (rc < 0) {
311                 lock_res_and_lock(lock);
312                 lock->l_flags |= LDLM_FL_FAILED;
313                 unlock_res_and_lock(lock);
314                 wake_up(&lock->l_waitq);
315         }
316         LDLM_LOCK_RELEASE(lock);
317 }
318
319 /**
320  * Callback handler for receiving incoming glimpse ASTs.
321  *
322  * This only can happen on client side.  After handling the glimpse AST
323  * we also consider dropping the lock here if it is unused locally for a
324  * long time.
325  */
326 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
327                                     struct ldlm_namespace *ns,
328                                     struct ldlm_request *dlm_req,
329                                     struct ldlm_lock *lock)
330 {
331         int rc = -ENOSYS;
332
333         LDLM_DEBUG(lock, "client glimpse AST callback handler");
334
335         if (lock->l_glimpse_ast != NULL)
336                 rc = lock->l_glimpse_ast(lock, req);
337
338         if (req->rq_repmsg != NULL) {
339                 ptlrpc_reply(req);
340         } else {
341                 req->rq_status = rc;
342                 ptlrpc_error(req);
343         }
344
345         lock_res_and_lock(lock);
346         if (lock->l_granted_mode == LCK_PW &&
347             !lock->l_readers && !lock->l_writers &&
348             cfs_time_after(cfs_time_current(),
349                            cfs_time_add(lock->l_last_used,
350                                         cfs_time_seconds(10)))) {
351                 unlock_res_and_lock(lock);
352                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
353                         ldlm_handle_bl_callback(ns, NULL, lock);
354
355                 return;
356         }
357         unlock_res_and_lock(lock);
358         LDLM_LOCK_RELEASE(lock);
359 }
360
361 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
362 {
363         if (req->rq_no_reply)
364                 return 0;
365
366         req->rq_status = rc;
367         if (!req->rq_packed_final) {
368                 rc = lustre_pack_reply(req, 1, NULL, NULL);
369                 if (rc)
370                         return rc;
371         }
372         return ptlrpc_reply(req);
373 }
374
375 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
376                                ldlm_cancel_flags_t cancel_flags)
377 {
378         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
379
380         spin_lock(&blp->blp_lock);
381         if (blwi->blwi_lock &&
382             blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
383                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
384                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
385         } else {
386                 /* other blocking callbacks are added to the regular list */
387                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
388         }
389         spin_unlock(&blp->blp_lock);
390
391         wake_up(&blp->blp_waitq);
392
393         /* can not check blwi->blwi_flags as blwi could be already freed in
394            LCF_ASYNC mode */
395         if (!(cancel_flags & LCF_ASYNC))
396                 wait_for_completion(&blwi->blwi_comp);
397
398         return 0;
399 }
400
401 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
402                              struct ldlm_namespace *ns,
403                              struct ldlm_lock_desc *ld,
404                              struct list_head *cancels, int count,
405                              struct ldlm_lock *lock,
406                              ldlm_cancel_flags_t cancel_flags)
407 {
408         init_completion(&blwi->blwi_comp);
409         INIT_LIST_HEAD(&blwi->blwi_head);
410
411         if (memory_pressure_get())
412                 blwi->blwi_mem_pressure = 1;
413
414         blwi->blwi_ns = ns;
415         blwi->blwi_flags = cancel_flags;
416         if (ld != NULL)
417                 blwi->blwi_ld = *ld;
418         if (count) {
419                 list_add(&blwi->blwi_head, cancels);
420                 list_del_init(cancels);
421                 blwi->blwi_count = count;
422         } else {
423                 blwi->blwi_lock = lock;
424         }
425 }
426
427 /**
428  * Queues a list of locks \a cancels containing \a count locks
429  * for later processing by a blocking thread.  If \a count is zero,
430  * then the lock referenced as \a lock is queued instead.
431  *
432  * The blocking thread would then call ->l_blocking_ast callback in the lock.
433  * If list addition fails an error is returned and caller is supposed to
434  * call ->l_blocking_ast itself.
435  */
436 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
437                              struct ldlm_lock_desc *ld,
438                              struct ldlm_lock *lock,
439                              struct list_head *cancels, int count,
440                              ldlm_cancel_flags_t cancel_flags)
441 {
442         if (cancels && count == 0)
443                 return 0;
444
445         if (cancel_flags & LCF_ASYNC) {
446                 struct ldlm_bl_work_item *blwi;
447
448                 OBD_ALLOC(blwi, sizeof(*blwi));
449                 if (blwi == NULL)
450                         return -ENOMEM;
451                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
452
453                 return __ldlm_bl_to_thread(blwi, cancel_flags);
454         } else {
455                 /* if it is synchronous call do minimum mem alloc, as it could
456                  * be triggered from kernel shrinker
457                  */
458                 struct ldlm_bl_work_item blwi;
459
460                 memset(&blwi, 0, sizeof(blwi));
461                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
462                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
463         }
464 }
465
466
467 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
468                            struct ldlm_lock *lock)
469 {
470         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
471 }
472
473 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
474                            struct list_head *cancels, int count,
475                            ldlm_cancel_flags_t cancel_flags)
476 {
477         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
478 }
479
480 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
481 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
482 {
483         struct obd_device *obd = req->rq_export->exp_obd;
484         char *key;
485         void *val;
486         int keylen, vallen;
487         int rc = -ENOSYS;
488
489         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
490
491         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
492
493         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
494         if (key == NULL) {
495                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
496                 return -EFAULT;
497         }
498         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
499                                       RCL_CLIENT);
500         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
501         if (val == NULL) {
502                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
503                 return -EFAULT;
504         }
505         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
506                                       RCL_CLIENT);
507
508         /* We are responsible for swabbing contents of val */
509
510         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
511                 /* Pass it on to mdc (the "export" in this case) */
512                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
513                                         req->rq_export,
514                                         sizeof(KEY_HSM_COPYTOOL_SEND),
515                                         KEY_HSM_COPYTOOL_SEND,
516                                         vallen, val, NULL);
517         else
518                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
519
520         return rc;
521 }
522
523 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
524                                         const char *msg, int rc,
525                                         struct lustre_handle *handle)
526 {
527         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
528                   "%s: [nid %s] [rc %d] [lock "LPX64"]",
529                   msg, libcfs_id2str(req->rq_peer), rc,
530                   handle ? handle->cookie : 0);
531         if (req->rq_no_reply)
532                 CWARN("No reply was sent, maybe cause bug 21636.\n");
533         else if (rc)
534                 CWARN("Send reply failed, maybe cause bug 21636.\n");
535 }
536
537 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
538 {
539         struct obd_quotactl *oqctl;
540         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
541
542         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
543         if (oqctl == NULL) {
544                 CERROR("Can't unpack obd_quotactl\n");
545                 return -EPROTO;
546         }
547
548         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
549
550         cli->cl_qchk_stat = oqctl->qc_stat;
551         return 0;
552 }
553
554 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
555 static int ldlm_callback_handler(struct ptlrpc_request *req)
556 {
557         struct ldlm_namespace *ns;
558         struct ldlm_request *dlm_req;
559         struct ldlm_lock *lock;
560         int rc;
561
562         /* Requests arrive in sender's byte order.  The ptlrpc service
563          * handler has already checked and, if necessary, byte-swapped the
564          * incoming request message body, but I am responsible for the
565          * message buffers. */
566
567         /* do nothing for sec context finalize */
568         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
569                 return 0;
570
571         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
572
573         if (req->rq_export == NULL) {
574                 rc = ldlm_callback_reply(req, -ENOTCONN);
575                 ldlm_callback_errmsg(req, "Operate on unconnected server",
576                                      rc, NULL);
577                 return 0;
578         }
579
580         LASSERT(req->rq_export != NULL);
581         LASSERT(req->rq_export->exp_obd != NULL);
582
583         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
584         case LDLM_BL_CALLBACK:
585                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
586                         return 0;
587                 break;
588         case LDLM_CP_CALLBACK:
589                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
590                         return 0;
591                 break;
592         case LDLM_GL_CALLBACK:
593                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
594                         return 0;
595                 break;
596         case LDLM_SET_INFO:
597                 rc = ldlm_handle_setinfo(req);
598                 ldlm_callback_reply(req, rc);
599                 return 0;
600         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
601                 CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
602                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
603                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
604                         return 0;
605                 rc = llog_origin_handle_cancel(req);
606                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
607                         return 0;
608                 ldlm_callback_reply(req, rc);
609                 return 0;
610         case LLOG_ORIGIN_HANDLE_CREATE:
611                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
612                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
613                         return 0;
614                 rc = llog_origin_handle_open(req);
615                 ldlm_callback_reply(req, rc);
616                 return 0;
617         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
618                 req_capsule_set(&req->rq_pill,
619                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
620                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
621                         return 0;
622                 rc = llog_origin_handle_next_block(req);
623                 ldlm_callback_reply(req, rc);
624                 return 0;
625         case LLOG_ORIGIN_HANDLE_READ_HEADER:
626                 req_capsule_set(&req->rq_pill,
627                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
628                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
629                         return 0;
630                 rc = llog_origin_handle_read_header(req);
631                 ldlm_callback_reply(req, rc);
632                 return 0;
633         case LLOG_ORIGIN_HANDLE_CLOSE:
634                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
635                         return 0;
636                 rc = llog_origin_handle_close(req);
637                 ldlm_callback_reply(req, rc);
638                 return 0;
639         case OBD_QC_CALLBACK:
640                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
641                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
642                         return 0;
643                 rc = ldlm_handle_qc_callback(req);
644                 ldlm_callback_reply(req, rc);
645                 return 0;
646         default:
647                 CERROR("unknown opcode %u\n",
648                        lustre_msg_get_opc(req->rq_reqmsg));
649                 ldlm_callback_reply(req, -EPROTO);
650                 return 0;
651         }
652
653         ns = req->rq_export->exp_obd->obd_namespace;
654         LASSERT(ns != NULL);
655
656         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
657
658         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
659         if (dlm_req == NULL) {
660                 rc = ldlm_callback_reply(req, -EPROTO);
661                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
662                                      NULL);
663                 return 0;
664         }
665
666         /* Force a known safe race, send a cancel to the server for a lock
667          * which the server has already started a blocking callback on. */
668         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
669             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
670                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
671                 if (rc < 0)
672                         CERROR("ldlm_cli_cancel: %d\n", rc);
673         }
674
675         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
676         if (!lock) {
677                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
678                        "disappeared\n", dlm_req->lock_handle[0].cookie);
679                 rc = ldlm_callback_reply(req, -EINVAL);
680                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
681                                      &dlm_req->lock_handle[0]);
682                 return 0;
683         }
684
685         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
686             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
687                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
688
689         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
690         lock_res_and_lock(lock);
691         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
692                                               LDLM_AST_FLAGS);
693         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
694                 /* If somebody cancels lock and cache is already dropped,
695                  * or lock is failed before cp_ast received on client,
696                  * we can tell the server we have no lock. Otherwise, we
697                  * should send cancel after dropping the cache. */
698                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
699                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
700                     (lock->l_flags & LDLM_FL_FAILED)) {
701                         LDLM_DEBUG(lock, "callback on lock "
702                                    LPX64" - lock disappeared\n",
703                                    dlm_req->lock_handle[0].cookie);
704                         unlock_res_and_lock(lock);
705                         LDLM_LOCK_RELEASE(lock);
706                         rc = ldlm_callback_reply(req, -EINVAL);
707                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
708                                              &dlm_req->lock_handle[0]);
709                         return 0;
710                 }
711                 /* BL_AST locks are not needed in LRU.
712                  * Let ldlm_cancel_lru() be fast. */
713                 ldlm_lock_remove_from_lru(lock);
714                 lock->l_flags |= LDLM_FL_BL_AST;
715         }
716         unlock_res_and_lock(lock);
717
718         /* We want the ost thread to get this reply so that it can respond
719          * to ost requests (write cache writeback) that might be triggered
720          * in the callback.
721          *
722          * But we'd also like to be able to indicate in the reply that we're
723          * cancelling right now, because it's unused, or have an intent result
724          * in the reply, so we might have to push the responsibility for sending
725          * the reply down into the AST handlers, alas. */
726
727         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
728         case LDLM_BL_CALLBACK:
729                 CDEBUG(D_INODE, "blocking ast\n");
730                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
731                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
732                         rc = ldlm_callback_reply(req, 0);
733                         if (req->rq_no_reply || rc)
734                                 ldlm_callback_errmsg(req, "Normal process", rc,
735                                                      &dlm_req->lock_handle[0]);
736                 }
737                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
738                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
739                 break;
740         case LDLM_CP_CALLBACK:
741                 CDEBUG(D_INODE, "completion ast\n");
742                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
743                 ldlm_callback_reply(req, 0);
744                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
745                 break;
746         case LDLM_GL_CALLBACK:
747                 CDEBUG(D_INODE, "glimpse ast\n");
748                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
749                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
750                 break;
751         default:
752                 LBUG();                  /* checked above */
753         }
754
755         return 0;
756 }
757
758
759 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
760 {
761         struct ldlm_bl_work_item *blwi = NULL;
762         static unsigned int num_bl = 0;
763
764         spin_lock(&blp->blp_lock);
765         /* process a request from the blp_list at least every blp_num_threads */
766         if (!list_empty(&blp->blp_list) &&
767             (list_empty(&blp->blp_prio_list) || num_bl == 0))
768                 blwi = list_entry(blp->blp_list.next,
769                                       struct ldlm_bl_work_item, blwi_entry);
770         else
771                 if (!list_empty(&blp->blp_prio_list))
772                         blwi = list_entry(blp->blp_prio_list.next,
773                                               struct ldlm_bl_work_item,
774                                               blwi_entry);
775
776         if (blwi) {
777                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
778                         num_bl = 0;
779                 list_del(&blwi->blwi_entry);
780         }
781         spin_unlock(&blp->blp_lock);
782
783         return blwi;
784 }
785
786 /* This only contains temporary data until the thread starts */
787 struct ldlm_bl_thread_data {
788         char                    bltd_name[CFS_CURPROC_COMM_MAX];
789         struct ldlm_bl_pool     *bltd_blp;
790         struct completion       bltd_comp;
791         int                     bltd_num;
792 };
793
794 static int ldlm_bl_thread_main(void *arg);
795
796 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
797 {
798         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
799         struct task_struct *task;
800
801         init_completion(&bltd.bltd_comp);
802         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
803         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
804                 "ldlm_bl_%02d", bltd.bltd_num);
805         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
806         if (IS_ERR(task)) {
807                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
808                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
809                 return PTR_ERR(task);
810         }
811         wait_for_completion(&bltd.bltd_comp);
812
813         return 0;
814 }
815
816 /**
817  * Main blocking requests processing thread.
818  *
819  * Callers put locks into its queue by calling ldlm_bl_to_thread.
820  * This thread in the end ends up doing actual call to ->l_blocking_ast
821  * for queued locks.
822  */
823 static int ldlm_bl_thread_main(void *arg)
824 {
825         struct ldlm_bl_pool *blp;
826
827         {
828                 struct ldlm_bl_thread_data *bltd = arg;
829
830                 blp = bltd->bltd_blp;
831
832                 atomic_inc(&blp->blp_num_threads);
833                 atomic_inc(&blp->blp_busy_threads);
834
835                 complete(&bltd->bltd_comp);
836                 /* cannot use bltd after this, it is only on caller's stack */
837         }
838
839         while (1) {
840                 struct l_wait_info lwi = { 0 };
841                 struct ldlm_bl_work_item *blwi = NULL;
842                 int busy;
843
844                 blwi = ldlm_bl_get_work(blp);
845
846                 if (blwi == NULL) {
847                         atomic_dec(&blp->blp_busy_threads);
848                         l_wait_event_exclusive(blp->blp_waitq,
849                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
850                                          &lwi);
851                         busy = atomic_inc_return(&blp->blp_busy_threads);
852                 } else {
853                         busy = atomic_read(&blp->blp_busy_threads);
854                 }
855
856                 if (blwi->blwi_ns == NULL)
857                         /* added by ldlm_cleanup() */
858                         break;
859
860                 /* Not fatal if racy and have a few too many threads */
861                 if (unlikely(busy < blp->blp_max_threads &&
862                              busy >= atomic_read(&blp->blp_num_threads) &&
863                              !blwi->blwi_mem_pressure))
864                         /* discard the return value, we tried */
865                         ldlm_bl_thread_start(blp);
866
867                 if (blwi->blwi_mem_pressure)
868                         memory_pressure_set();
869
870                 if (blwi->blwi_count) {
871                         int count;
872                         /* The special case when we cancel locks in LRU
873                          * asynchronously, we pass the list of locks here.
874                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
875                          * canceled locally yet. */
876                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
877                                                            blwi->blwi_count,
878                                                            LCF_BL_AST);
879                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
880                                              blwi->blwi_flags);
881                 } else {
882                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
883                                                 blwi->blwi_lock);
884                 }
885                 if (blwi->blwi_mem_pressure)
886                         memory_pressure_clr();
887
888                 if (blwi->blwi_flags & LCF_ASYNC)
889                         OBD_FREE(blwi, sizeof(*blwi));
890                 else
891                         complete(&blwi->blwi_comp);
892         }
893
894         atomic_dec(&blp->blp_busy_threads);
895         atomic_dec(&blp->blp_num_threads);
896         complete(&blp->blp_comp);
897         return 0;
898 }
899
900
901 static int ldlm_setup(void);
902 static int ldlm_cleanup(void);
903
904 int ldlm_get_ref(void)
905 {
906         int rc = 0;
907
908         mutex_lock(&ldlm_ref_mutex);
909         if (++ldlm_refcount == 1) {
910                 rc = ldlm_setup();
911                 if (rc)
912                         ldlm_refcount--;
913         }
914         mutex_unlock(&ldlm_ref_mutex);
915
916         return rc;
917 }
918 EXPORT_SYMBOL(ldlm_get_ref);
919
920 void ldlm_put_ref(void)
921 {
922         mutex_lock(&ldlm_ref_mutex);
923         if (ldlm_refcount == 1) {
924                 int rc = ldlm_cleanup();
925                 if (rc)
926                         CERROR("ldlm_cleanup failed: %d\n", rc);
927                 else
928                         ldlm_refcount--;
929         } else {
930                 ldlm_refcount--;
931         }
932         mutex_unlock(&ldlm_ref_mutex);
933 }
934 EXPORT_SYMBOL(ldlm_put_ref);
935
936 /*
937  * Export handle<->lock hash operations.
938  */
939 static unsigned
940 ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
941 {
942         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
943 }
944
945 static void *
946 ldlm_export_lock_key(struct hlist_node *hnode)
947 {
948         struct ldlm_lock *lock;
949
950         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
951         return &lock->l_remote_handle;
952 }
953
954 static void
955 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
956 {
957         struct ldlm_lock     *lock;
958
959         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
960         lock->l_remote_handle = *(struct lustre_handle *)key;
961 }
962
963 static int
964 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
965 {
966         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
967 }
968
969 static void *
970 ldlm_export_lock_object(struct hlist_node *hnode)
971 {
972         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
973 }
974
975 static void
976 ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode)
977 {
978         struct ldlm_lock *lock;
979
980         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
981         LDLM_LOCK_GET(lock);
982 }
983
984 static void
985 ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode)
986 {
987         struct ldlm_lock *lock;
988
989         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
990         LDLM_LOCK_RELEASE(lock);
991 }
992
993 static cfs_hash_ops_t ldlm_export_lock_ops = {
994         .hs_hash        = ldlm_export_lock_hash,
995         .hs_key  = ldlm_export_lock_key,
996         .hs_keycmp      = ldlm_export_lock_keycmp,
997         .hs_keycpy      = ldlm_export_lock_keycpy,
998         .hs_object      = ldlm_export_lock_object,
999         .hs_get  = ldlm_export_lock_get,
1000         .hs_put  = ldlm_export_lock_put,
1001         .hs_put_locked  = ldlm_export_lock_put,
1002 };
1003
1004 int ldlm_init_export(struct obd_export *exp)
1005 {
1006         exp->exp_lock_hash =
1007                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
1008                                 HASH_EXP_LOCK_CUR_BITS,
1009                                 HASH_EXP_LOCK_MAX_BITS,
1010                                 HASH_EXP_LOCK_BKT_BITS, 0,
1011                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
1012                                 &ldlm_export_lock_ops,
1013                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
1014                                 CFS_HASH_NBLK_CHANGE);
1015
1016         if (!exp->exp_lock_hash)
1017                 return -ENOMEM;
1018
1019         return 0;
1020 }
1021 EXPORT_SYMBOL(ldlm_init_export);
1022
1023 void ldlm_destroy_export(struct obd_export *exp)
1024 {
1025         cfs_hash_putref(exp->exp_lock_hash);
1026         exp->exp_lock_hash = NULL;
1027
1028         ldlm_destroy_flock_export(exp);
1029 }
1030 EXPORT_SYMBOL(ldlm_destroy_export);
1031
1032 static int ldlm_setup(void)
1033 {
1034         static struct ptlrpc_service_conf       conf;
1035         struct ldlm_bl_pool                     *blp = NULL;
1036         int rc = 0;
1037         int i;
1038
1039         if (ldlm_state != NULL)
1040                 return -EALREADY;
1041
1042         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1043         if (ldlm_state == NULL)
1044                 return -ENOMEM;
1045
1046 #ifdef LPROCFS
1047         rc = ldlm_proc_setup();
1048         if (rc != 0)
1049                 GOTO(out, rc);
1050 #endif
1051
1052         memset(&conf, 0, sizeof(conf));
1053         conf = (typeof(conf)) {
1054                 .psc_name               = "ldlm_cbd",
1055                 .psc_watchdog_factor    = 2,
1056                 .psc_buf                = {
1057                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
1058                         .bc_buf_size            = LDLM_BUFSIZE,
1059                         .bc_req_max_size        = LDLM_MAXREQSIZE,
1060                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
1061                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
1062                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
1063                 },
1064                 .psc_thr                = {
1065                         .tc_thr_name            = "ldlm_cb",
1066                         .tc_thr_factor          = LDLM_THR_FACTOR,
1067                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
1068                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
1069                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
1070                         .tc_nthrs_user          = ldlm_num_threads,
1071                         .tc_cpu_affinity        = 1,
1072                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
1073                 },
1074                 .psc_cpt                = {
1075                         .cc_pattern             = ldlm_cpts,
1076                 },
1077                 .psc_ops                = {
1078                         .so_req_handler         = ldlm_callback_handler,
1079                 },
1080         };
1081         ldlm_state->ldlm_cb_service = \
1082                         ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
1083         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1084                 CERROR("failed to start service\n");
1085                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1086                 ldlm_state->ldlm_cb_service = NULL;
1087                 GOTO(out, rc);
1088         }
1089
1090
1091         OBD_ALLOC(blp, sizeof(*blp));
1092         if (blp == NULL)
1093                 GOTO(out, rc = -ENOMEM);
1094         ldlm_state->ldlm_bl_pool = blp;
1095
1096         spin_lock_init(&blp->blp_lock);
1097         INIT_LIST_HEAD(&blp->blp_list);
1098         INIT_LIST_HEAD(&blp->blp_prio_list);
1099         init_waitqueue_head(&blp->blp_waitq);
1100         atomic_set(&blp->blp_num_threads, 0);
1101         atomic_set(&blp->blp_busy_threads, 0);
1102
1103         if (ldlm_num_threads == 0) {
1104                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1105                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1106         } else {
1107                 blp->blp_min_threads = blp->blp_max_threads = \
1108                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1109                                                          ldlm_num_threads));
1110         }
1111
1112         for (i = 0; i < blp->blp_min_threads; i++) {
1113                 rc = ldlm_bl_thread_start(blp);
1114                 if (rc < 0)
1115                         GOTO(out, rc);
1116         }
1117
1118
1119         rc = ldlm_pools_init();
1120         if (rc) {
1121                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1122                 GOTO(out, rc);
1123         }
1124         return 0;
1125
1126  out:
1127         ldlm_cleanup();
1128         return rc;
1129 }
1130
1131 static int ldlm_cleanup(void)
1132 {
1133         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1134             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1135                 CERROR("ldlm still has namespaces; clean these up first.\n");
1136                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1137                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1138                 return -EBUSY;
1139         }
1140
1141         ldlm_pools_fini();
1142
1143         if (ldlm_state->ldlm_bl_pool != NULL) {
1144                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1145
1146                 while (atomic_read(&blp->blp_num_threads) > 0) {
1147                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1148
1149                         init_completion(&blp->blp_comp);
1150
1151                         spin_lock(&blp->blp_lock);
1152                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1153                         wake_up(&blp->blp_waitq);
1154                         spin_unlock(&blp->blp_lock);
1155
1156                         wait_for_completion(&blp->blp_comp);
1157                 }
1158
1159                 OBD_FREE(blp, sizeof(*blp));
1160         }
1161
1162         if (ldlm_state->ldlm_cb_service != NULL)
1163                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1164
1165         ldlm_proc_cleanup();
1166
1167
1168         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1169         ldlm_state = NULL;
1170
1171         return 0;
1172 }
1173
1174 int ldlm_init(void)
1175 {
1176         mutex_init(&ldlm_ref_mutex);
1177         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1178         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1179         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1180                                                sizeof(struct ldlm_resource), 0,
1181                                                SLAB_HWCACHE_ALIGN, NULL);
1182         if (ldlm_resource_slab == NULL)
1183                 return -ENOMEM;
1184
1185         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1186                               sizeof(struct ldlm_lock), 0,
1187                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1188         if (ldlm_lock_slab == NULL) {
1189                 kmem_cache_destroy(ldlm_resource_slab);
1190                 return -ENOMEM;
1191         }
1192
1193         ldlm_interval_slab = kmem_cache_create("interval_node",
1194                                         sizeof(struct ldlm_interval),
1195                                         0, SLAB_HWCACHE_ALIGN, NULL);
1196         if (ldlm_interval_slab == NULL) {
1197                 kmem_cache_destroy(ldlm_resource_slab);
1198                 kmem_cache_destroy(ldlm_lock_slab);
1199                 return -ENOMEM;
1200         }
1201 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1202         class_export_dump_hook = ldlm_dump_export_locks;
1203 #endif
1204         return 0;
1205 }
1206
1207 void ldlm_exit(void)
1208 {
1209         if (ldlm_refcount)
1210                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1211         kmem_cache_destroy(ldlm_resource_slab);
1212         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1213          * synchronize_rcu() to wait a grace period elapsed, so that
1214          * ldlm_lock_free() get a chance to be called. */
1215         synchronize_rcu();
1216         kmem_cache_destroy(ldlm_lock_slab);
1217         kmem_cache_destroy(ldlm_interval_slab);
1218 }