Merge tag 'scsi-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/jejb/scsi
[cascardo/linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #include "../../include/linux/libcfs/libcfs.h"
45 #include "../include/lustre_dlm.h"
46 #include "../include/obd_class.h"
47 #include <linux/list.h>
48 #include "ldlm_internal.h"
49
50 static int ldlm_num_threads;
51 module_param(ldlm_num_threads, int, 0444);
52 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
53
54 static char *ldlm_cpts;
55 module_param(ldlm_cpts, charp, 0444);
56 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
57
58 static struct mutex     ldlm_ref_mutex;
59 static int ldlm_refcount;
60
61 static struct kobject *ldlm_kobj;
62 struct kset *ldlm_ns_kset;
63 static struct kset *ldlm_svc_kset;
64
65 struct ldlm_cb_async_args {
66         struct ldlm_cb_set_arg *ca_set_arg;
67         struct ldlm_lock       *ca_lock;
68 };
69
70 /* LDLM state */
71
72 static struct ldlm_state *ldlm_state;
73
74 #define ELT_STOPPED   0
75 #define ELT_READY     1
76 #define ELT_TERMINATE 2
77
78 struct ldlm_bl_pool {
79         spinlock_t              blp_lock;
80
81         /*
82          * blp_prio_list is used for callbacks that should be handled
83          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
84          * see bug 13843
85          */
86         struct list_head              blp_prio_list;
87
88         /*
89          * blp_list is used for all other callbacks which are likely
90          * to take longer to process.
91          */
92         struct list_head              blp_list;
93
94         wait_queue_head_t            blp_waitq;
95         struct completion       blp_comp;
96         atomic_t            blp_num_threads;
97         atomic_t            blp_busy_threads;
98         int                  blp_min_threads;
99         int                  blp_max_threads;
100 };
101
102 struct ldlm_bl_work_item {
103         struct list_head              blwi_entry;
104         struct ldlm_namespace  *blwi_ns;
105         struct ldlm_lock_desc   blwi_ld;
106         struct ldlm_lock       *blwi_lock;
107         struct list_head              blwi_head;
108         int                  blwi_count;
109         struct completion       blwi_comp;
110         enum ldlm_cancel_flags  blwi_flags;
111         int                  blwi_mem_pressure;
112 };
113
114 /**
115  * Callback handler for receiving incoming blocking ASTs.
116  *
117  * This can only happen on client side.
118  */
119 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
120                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
121 {
122         int do_ast;
123
124         LDLM_DEBUG(lock, "client blocking AST callback handler");
125
126         lock_res_and_lock(lock);
127         ldlm_set_cbpending(lock);
128
129         if (ldlm_is_cancel_on_block(lock))
130                 ldlm_set_cancel(lock);
131
132         do_ast = !lock->l_readers && !lock->l_writers;
133         unlock_res_and_lock(lock);
134
135         if (do_ast) {
136                 CDEBUG(D_DLMTRACE,
137                        "Lock %p already unused, calling callback (%p)\n", lock,
138                        lock->l_blocking_ast);
139                 if (lock->l_blocking_ast)
140                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
141                                              LDLM_CB_BLOCKING);
142         } else {
143                 CDEBUG(D_DLMTRACE,
144                        "Lock %p is referenced, will be cancelled later\n",
145                        lock);
146         }
147
148         LDLM_DEBUG(lock, "client blocking callback handler END");
149         LDLM_LOCK_RELEASE(lock);
150 }
151
152 /**
153  * Callback handler for receiving incoming completion ASTs.
154  *
155  * This only can happen on client side.
156  */
157 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
158                                     struct ldlm_namespace *ns,
159                                     struct ldlm_request *dlm_req,
160                                     struct ldlm_lock *lock)
161 {
162         int lvb_len;
163         LIST_HEAD(ast_list);
164         int rc = 0;
165
166         LDLM_DEBUG(lock, "client completion callback handler START");
167
168         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
169                 int to = cfs_time_seconds(1);
170
171                 while (to > 0) {
172                         set_current_state(TASK_INTERRUPTIBLE);
173                         schedule_timeout(to);
174                         if (lock->l_granted_mode == lock->l_req_mode ||
175                             ldlm_is_destroyed(lock))
176                                 break;
177                 }
178         }
179
180         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
181         if (lvb_len < 0) {
182                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
183                 rc = lvb_len;
184                 goto out;
185         } else if (lvb_len > 0) {
186                 if (lock->l_lvb_len > 0) {
187                         /* for extent lock, lvb contains ost_lvb{}. */
188                         LASSERT(lock->l_lvb_data);
189
190                         if (unlikely(lock->l_lvb_len < lvb_len)) {
191                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
192                                            lock->l_lvb_len, lvb_len);
193                                 rc = -EINVAL;
194                                 goto out;
195                         }
196                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
197                                                      * variable length
198                                                      */
199                         void *lvb_data;
200
201                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
202                         if (!lvb_data) {
203                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
204                                 rc = -ENOMEM;
205                                 goto out;
206                         }
207
208                         lock_res_and_lock(lock);
209                         LASSERT(!lock->l_lvb_data);
210                         lock->l_lvb_type = LVB_T_LAYOUT;
211                         lock->l_lvb_data = lvb_data;
212                         lock->l_lvb_len = lvb_len;
213                         unlock_res_and_lock(lock);
214                 }
215         }
216
217         lock_res_and_lock(lock);
218         if (ldlm_is_destroyed(lock) ||
219             lock->l_granted_mode == lock->l_req_mode) {
220                 /* bug 11300: the lock has already been granted */
221                 unlock_res_and_lock(lock);
222                 LDLM_DEBUG(lock, "Double grant race happened");
223                 rc = 0;
224                 goto out;
225         }
226
227         /* If we receive the completion AST before the actual enqueue returned,
228          * then we might need to switch lock modes, resources, or extents.
229          */
230         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
231                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
232                 LDLM_DEBUG(lock, "completion AST, new lock mode");
233         }
234
235         if (lock->l_resource->lr_type != LDLM_PLAIN) {
236                 ldlm_convert_policy_to_local(req->rq_export,
237                                           dlm_req->lock_desc.l_resource.lr_type,
238                                           &dlm_req->lock_desc.l_policy_data,
239                                           &lock->l_policy_data);
240                 LDLM_DEBUG(lock, "completion AST, new policy data");
241         }
242
243         ldlm_resource_unlink_lock(lock);
244         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
245                    &lock->l_resource->lr_name,
246                    sizeof(lock->l_resource->lr_name)) != 0) {
247                 unlock_res_and_lock(lock);
248                 rc = ldlm_lock_change_resource(ns, lock,
249                                 &dlm_req->lock_desc.l_resource.lr_name);
250                 if (rc < 0) {
251                         LDLM_ERROR(lock, "Failed to allocate resource");
252                         goto out;
253                 }
254                 LDLM_DEBUG(lock, "completion AST, new resource");
255                 CERROR("change resource!\n");
256                 lock_res_and_lock(lock);
257         }
258
259         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
260                 /* BL_AST locks are not needed in LRU.
261                  * Let ldlm_cancel_lru() be fast.
262                  */
263                 ldlm_lock_remove_from_lru(lock);
264                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
265                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
266         }
267
268         if (lock->l_lvb_len > 0) {
269                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
270                                    lock->l_lvb_data, lvb_len);
271                 if (rc < 0) {
272                         unlock_res_and_lock(lock);
273                         goto out;
274                 }
275         }
276
277         ldlm_grant_lock(lock, &ast_list);
278         unlock_res_and_lock(lock);
279
280         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
281
282         /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
283         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
284
285         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
286
287         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
288                           lock);
289         goto out;
290
291 out:
292         if (rc < 0) {
293                 lock_res_and_lock(lock);
294                 ldlm_set_failed(lock);
295                 unlock_res_and_lock(lock);
296                 wake_up(&lock->l_waitq);
297         }
298         LDLM_LOCK_RELEASE(lock);
299 }
300
301 /**
302  * Callback handler for receiving incoming glimpse ASTs.
303  *
304  * This only can happen on client side.  After handling the glimpse AST
305  * we also consider dropping the lock here if it is unused locally for a
306  * long time.
307  */
308 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
309                                     struct ldlm_namespace *ns,
310                                     struct ldlm_request *dlm_req,
311                                     struct ldlm_lock *lock)
312 {
313         int rc = -ENOSYS;
314
315         LDLM_DEBUG(lock, "client glimpse AST callback handler");
316
317         if (lock->l_glimpse_ast)
318                 rc = lock->l_glimpse_ast(lock, req);
319
320         if (req->rq_repmsg) {
321                 ptlrpc_reply(req);
322         } else {
323                 req->rq_status = rc;
324                 ptlrpc_error(req);
325         }
326
327         lock_res_and_lock(lock);
328         if (lock->l_granted_mode == LCK_PW &&
329             !lock->l_readers && !lock->l_writers &&
330             cfs_time_after(cfs_time_current(),
331                            cfs_time_add(lock->l_last_used,
332                                         cfs_time_seconds(10)))) {
333                 unlock_res_and_lock(lock);
334                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
335                         ldlm_handle_bl_callback(ns, NULL, lock);
336
337                 return;
338         }
339         unlock_res_and_lock(lock);
340         LDLM_LOCK_RELEASE(lock);
341 }
342
343 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
344 {
345         if (req->rq_no_reply)
346                 return 0;
347
348         req->rq_status = rc;
349         if (!req->rq_packed_final) {
350                 rc = lustre_pack_reply(req, 1, NULL, NULL);
351                 if (rc)
352                         return rc;
353         }
354         return ptlrpc_reply(req);
355 }
356
357 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
358                                enum ldlm_cancel_flags cancel_flags)
359 {
360         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
361
362         spin_lock(&blp->blp_lock);
363         if (blwi->blwi_lock && ldlm_is_discard_data(blwi->blwi_lock)) {
364                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
365                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
366         } else {
367                 /* other blocking callbacks are added to the regular list */
368                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
369         }
370         spin_unlock(&blp->blp_lock);
371
372         wake_up(&blp->blp_waitq);
373
374         /* can not check blwi->blwi_flags as blwi could be already freed in
375          * LCF_ASYNC mode
376          */
377         if (!(cancel_flags & LCF_ASYNC))
378                 wait_for_completion(&blwi->blwi_comp);
379
380         return 0;
381 }
382
383 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
384                              struct ldlm_namespace *ns,
385                              struct ldlm_lock_desc *ld,
386                              struct list_head *cancels, int count,
387                              struct ldlm_lock *lock,
388                              enum ldlm_cancel_flags cancel_flags)
389 {
390         init_completion(&blwi->blwi_comp);
391         INIT_LIST_HEAD(&blwi->blwi_head);
392
393         if (memory_pressure_get())
394                 blwi->blwi_mem_pressure = 1;
395
396         blwi->blwi_ns = ns;
397         blwi->blwi_flags = cancel_flags;
398         if (ld)
399                 blwi->blwi_ld = *ld;
400         if (count) {
401                 list_add(&blwi->blwi_head, cancels);
402                 list_del_init(cancels);
403                 blwi->blwi_count = count;
404         } else {
405                 blwi->blwi_lock = lock;
406         }
407 }
408
409 /**
410  * Queues a list of locks \a cancels containing \a count locks
411  * for later processing by a blocking thread.  If \a count is zero,
412  * then the lock referenced as \a lock is queued instead.
413  *
414  * The blocking thread would then call ->l_blocking_ast callback in the lock.
415  * If list addition fails an error is returned and caller is supposed to
416  * call ->l_blocking_ast itself.
417  */
418 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
419                              struct ldlm_lock_desc *ld,
420                              struct ldlm_lock *lock,
421                              struct list_head *cancels, int count,
422                              enum ldlm_cancel_flags cancel_flags)
423 {
424         if (cancels && count == 0)
425                 return 0;
426
427         if (cancel_flags & LCF_ASYNC) {
428                 struct ldlm_bl_work_item *blwi;
429
430                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
431                 if (!blwi)
432                         return -ENOMEM;
433                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
434
435                 return __ldlm_bl_to_thread(blwi, cancel_flags);
436         } else {
437                 /* if it is synchronous call do minimum mem alloc, as it could
438                  * be triggered from kernel shrinker
439                  */
440                 struct ldlm_bl_work_item blwi;
441
442                 memset(&blwi, 0, sizeof(blwi));
443                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
444                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
445         }
446 }
447
448 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
449                            struct ldlm_lock *lock)
450 {
451         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
452 }
453
454 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
455                            struct list_head *cancels, int count,
456                            enum ldlm_cancel_flags cancel_flags)
457 {
458         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
459 }
460
461 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
462 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
463 {
464         struct obd_device *obd = req->rq_export->exp_obd;
465         char *key;
466         void *val;
467         int keylen, vallen;
468         int rc = -ENOSYS;
469
470         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
471
472         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
473
474         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
475         if (!key) {
476                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
477                 return -EFAULT;
478         }
479         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
480                                       RCL_CLIENT);
481         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
482         if (!val) {
483                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
484                 return -EFAULT;
485         }
486         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
487                                       RCL_CLIENT);
488
489         /* We are responsible for swabbing contents of val */
490
491         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
492                 /* Pass it on to mdc (the "export" in this case) */
493                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
494                                         req->rq_export,
495                                         sizeof(KEY_HSM_COPYTOOL_SEND),
496                                         KEY_HSM_COPYTOOL_SEND,
497                                         vallen, val, NULL);
498         else
499                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
500
501         return rc;
502 }
503
504 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
505                                         const char *msg, int rc,
506                                         struct lustre_handle *handle)
507 {
508         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
509                   "%s: [nid %s] [rc %d] [lock %#llx]",
510                   msg, libcfs_id2str(req->rq_peer), rc,
511                   handle ? handle->cookie : 0);
512         if (req->rq_no_reply)
513                 CWARN("No reply was sent, maybe cause bug 21636.\n");
514         else if (rc)
515                 CWARN("Send reply failed, maybe cause bug 21636.\n");
516 }
517
518 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
519 {
520         struct obd_quotactl *oqctl;
521         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
522
523         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
524         if (!oqctl) {
525                 CERROR("Can't unpack obd_quotactl\n");
526                 return -EPROTO;
527         }
528
529         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
530
531         cli->cl_qchk_stat = oqctl->qc_stat;
532         return 0;
533 }
534
535 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
536 static int ldlm_callback_handler(struct ptlrpc_request *req)
537 {
538         struct ldlm_namespace *ns;
539         struct ldlm_request *dlm_req;
540         struct ldlm_lock *lock;
541         int rc;
542
543         /* Requests arrive in sender's byte order.  The ptlrpc service
544          * handler has already checked and, if necessary, byte-swapped the
545          * incoming request message body, but I am responsible for the
546          * message buffers.
547          */
548
549         /* do nothing for sec context finalize */
550         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
551                 return 0;
552
553         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
554
555         if (!req->rq_export) {
556                 rc = ldlm_callback_reply(req, -ENOTCONN);
557                 ldlm_callback_errmsg(req, "Operate on unconnected server",
558                                      rc, NULL);
559                 return 0;
560         }
561
562         LASSERT(req->rq_export->exp_obd);
563
564         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
565         case LDLM_BL_CALLBACK:
566                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
567                         return 0;
568                 break;
569         case LDLM_CP_CALLBACK:
570                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
571                         return 0;
572                 break;
573         case LDLM_GL_CALLBACK:
574                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
575                         return 0;
576                 break;
577         case LDLM_SET_INFO:
578                 rc = ldlm_handle_setinfo(req);
579                 ldlm_callback_reply(req, rc);
580                 return 0;
581         case OBD_QC_CALLBACK:
582                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
583                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
584                         return 0;
585                 rc = ldlm_handle_qc_callback(req);
586                 ldlm_callback_reply(req, rc);
587                 return 0;
588         default:
589                 CERROR("unknown opcode %u\n",
590                        lustre_msg_get_opc(req->rq_reqmsg));
591                 ldlm_callback_reply(req, -EPROTO);
592                 return 0;
593         }
594
595         ns = req->rq_export->exp_obd->obd_namespace;
596         LASSERT(ns);
597
598         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
599
600         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
601         if (!dlm_req) {
602                 rc = ldlm_callback_reply(req, -EPROTO);
603                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
604                                      NULL);
605                 return 0;
606         }
607
608         /* Force a known safe race, send a cancel to the server for a lock
609          * which the server has already started a blocking callback on.
610          */
611         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
612             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
613                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
614                 if (rc < 0)
615                         CERROR("ldlm_cli_cancel: %d\n", rc);
616         }
617
618         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
619         if (!lock) {
620                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
621                        dlm_req->lock_handle[0].cookie);
622                 rc = ldlm_callback_reply(req, -EINVAL);
623                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
624                                      &dlm_req->lock_handle[0]);
625                 return 0;
626         }
627
628         if (ldlm_is_fail_loc(lock) &&
629             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
630                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
631
632         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
633         lock_res_and_lock(lock);
634         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
635                                               LDLM_FL_AST_MASK);
636         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
637                 /* If somebody cancels lock and cache is already dropped,
638                  * or lock is failed before cp_ast received on client,
639                  * we can tell the server we have no lock. Otherwise, we
640                  * should send cancel after dropping the cache.
641                  */
642                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
643                     ldlm_is_failed(lock)) {
644                         LDLM_DEBUG(lock, "callback on lock %#llx - lock disappeared\n",
645                                    dlm_req->lock_handle[0].cookie);
646                         unlock_res_and_lock(lock);
647                         LDLM_LOCK_RELEASE(lock);
648                         rc = ldlm_callback_reply(req, -EINVAL);
649                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
650                                              &dlm_req->lock_handle[0]);
651                         return 0;
652                 }
653                 /* BL_AST locks are not needed in LRU.
654                  * Let ldlm_cancel_lru() be fast.
655                  */
656                 ldlm_lock_remove_from_lru(lock);
657                 ldlm_set_bl_ast(lock);
658         }
659         unlock_res_and_lock(lock);
660
661         /* We want the ost thread to get this reply so that it can respond
662          * to ost requests (write cache writeback) that might be triggered
663          * in the callback.
664          *
665          * But we'd also like to be able to indicate in the reply that we're
666          * cancelling right now, because it's unused, or have an intent result
667          * in the reply, so we might have to push the responsibility for sending
668          * the reply down into the AST handlers, alas.
669          */
670
671         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
672         case LDLM_BL_CALLBACK:
673                 CDEBUG(D_INODE, "blocking ast\n");
674                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
675                 if (!ldlm_is_cancel_on_block(lock)) {
676                         rc = ldlm_callback_reply(req, 0);
677                         if (req->rq_no_reply || rc)
678                                 ldlm_callback_errmsg(req, "Normal process", rc,
679                                                      &dlm_req->lock_handle[0]);
680                 }
681                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
682                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
683                 break;
684         case LDLM_CP_CALLBACK:
685                 CDEBUG(D_INODE, "completion ast\n");
686                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
687                 ldlm_callback_reply(req, 0);
688                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
689                 break;
690         case LDLM_GL_CALLBACK:
691                 CDEBUG(D_INODE, "glimpse ast\n");
692                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
693                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
694                 break;
695         default:
696                 LBUG();                  /* checked above */
697         }
698
699         return 0;
700 }
701
702 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
703 {
704         struct ldlm_bl_work_item *blwi = NULL;
705         static unsigned int num_bl;
706
707         spin_lock(&blp->blp_lock);
708         /* process a request from the blp_list at least every blp_num_threads */
709         if (!list_empty(&blp->blp_list) &&
710             (list_empty(&blp->blp_prio_list) || num_bl == 0))
711                 blwi = list_entry(blp->blp_list.next,
712                                       struct ldlm_bl_work_item, blwi_entry);
713         else
714                 if (!list_empty(&blp->blp_prio_list))
715                         blwi = list_entry(blp->blp_prio_list.next,
716                                               struct ldlm_bl_work_item,
717                                               blwi_entry);
718
719         if (blwi) {
720                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
721                         num_bl = 0;
722                 list_del(&blwi->blwi_entry);
723         }
724         spin_unlock(&blp->blp_lock);
725
726         return blwi;
727 }
728
729 /* This only contains temporary data until the thread starts */
730 struct ldlm_bl_thread_data {
731         char                    bltd_name[CFS_CURPROC_COMM_MAX];
732         struct ldlm_bl_pool     *bltd_blp;
733         struct completion       bltd_comp;
734         int                     bltd_num;
735 };
736
737 static int ldlm_bl_thread_main(void *arg);
738
739 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
740 {
741         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
742         struct task_struct *task;
743
744         init_completion(&bltd.bltd_comp);
745         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
746         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
747                 "ldlm_bl_%02d", bltd.bltd_num);
748         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
749         if (IS_ERR(task)) {
750                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
751                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
752                 return PTR_ERR(task);
753         }
754         wait_for_completion(&bltd.bltd_comp);
755
756         return 0;
757 }
758
759 /**
760  * Main blocking requests processing thread.
761  *
762  * Callers put locks into its queue by calling ldlm_bl_to_thread.
763  * This thread in the end ends up doing actual call to ->l_blocking_ast
764  * for queued locks.
765  */
766 static int ldlm_bl_thread_main(void *arg)
767 {
768         struct ldlm_bl_pool *blp;
769
770         {
771                 struct ldlm_bl_thread_data *bltd = arg;
772
773                 blp = bltd->bltd_blp;
774
775                 atomic_inc(&blp->blp_num_threads);
776                 atomic_inc(&blp->blp_busy_threads);
777
778                 complete(&bltd->bltd_comp);
779                 /* cannot use bltd after this, it is only on caller's stack */
780         }
781
782         while (1) {
783                 struct l_wait_info lwi = { 0 };
784                 struct ldlm_bl_work_item *blwi = NULL;
785                 int busy;
786
787                 blwi = ldlm_bl_get_work(blp);
788
789                 if (!blwi) {
790                         atomic_dec(&blp->blp_busy_threads);
791                         l_wait_event_exclusive(blp->blp_waitq,
792                                          (blwi = ldlm_bl_get_work(blp)),
793                                          &lwi);
794                         busy = atomic_inc_return(&blp->blp_busy_threads);
795                 } else {
796                         busy = atomic_read(&blp->blp_busy_threads);
797                 }
798
799                 if (!blwi->blwi_ns)
800                         /* added by ldlm_cleanup() */
801                         break;
802
803                 /* Not fatal if racy and have a few too many threads */
804                 if (unlikely(busy < blp->blp_max_threads &&
805                              busy >= atomic_read(&blp->blp_num_threads) &&
806                              !blwi->blwi_mem_pressure))
807                         /* discard the return value, we tried */
808                         ldlm_bl_thread_start(blp);
809
810                 if (blwi->blwi_mem_pressure)
811                         memory_pressure_set();
812
813                 if (blwi->blwi_count) {
814                         int count;
815                         /* The special case when we cancel locks in LRU
816                          * asynchronously, we pass the list of locks here.
817                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
818                          * canceled locally yet.
819                          */
820                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
821                                                            blwi->blwi_count,
822                                                            LCF_BL_AST);
823                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
824                                              blwi->blwi_flags);
825                 } else {
826                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
827                                                 blwi->blwi_lock);
828                 }
829                 if (blwi->blwi_mem_pressure)
830                         memory_pressure_clr();
831
832                 if (blwi->blwi_flags & LCF_ASYNC)
833                         kfree(blwi);
834                 else
835                         complete(&blwi->blwi_comp);
836         }
837
838         atomic_dec(&blp->blp_busy_threads);
839         atomic_dec(&blp->blp_num_threads);
840         complete(&blp->blp_comp);
841         return 0;
842 }
843
844 static int ldlm_setup(void);
845 static int ldlm_cleanup(void);
846
847 int ldlm_get_ref(void)
848 {
849         int rc = 0;
850
851         mutex_lock(&ldlm_ref_mutex);
852         if (++ldlm_refcount == 1) {
853                 rc = ldlm_setup();
854                 if (rc)
855                         ldlm_refcount--;
856         }
857         mutex_unlock(&ldlm_ref_mutex);
858
859         return rc;
860 }
861 EXPORT_SYMBOL(ldlm_get_ref);
862
863 void ldlm_put_ref(void)
864 {
865         mutex_lock(&ldlm_ref_mutex);
866         if (ldlm_refcount == 1) {
867                 int rc = ldlm_cleanup();
868
869                 if (rc)
870                         CERROR("ldlm_cleanup failed: %d\n", rc);
871                 else
872                         ldlm_refcount--;
873         } else {
874                 ldlm_refcount--;
875         }
876         mutex_unlock(&ldlm_ref_mutex);
877 }
878 EXPORT_SYMBOL(ldlm_put_ref);
879
880 extern unsigned int ldlm_cancel_unused_locks_before_replay;
881
882 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
883                                                       struct attribute *attr,
884                                                       char *buf)
885 {
886         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
887 }
888
889 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
890                                                        struct attribute *attr,
891                                                        const char *buffer,
892                                                        size_t count)
893 {
894         int rc;
895         unsigned long val;
896
897         rc = kstrtoul(buffer, 10, &val);
898         if (rc)
899                 return rc;
900
901         ldlm_cancel_unused_locks_before_replay = val;
902
903         return count;
904 }
905 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
906
907 /* These are for root of /sys/fs/lustre/ldlm */
908 static struct attribute *ldlm_attrs[] = {
909         &lustre_attr_cancel_unused_locks_before_replay.attr,
910         NULL,
911 };
912
913 static struct attribute_group ldlm_attr_group = {
914         .attrs = ldlm_attrs,
915 };
916
917 static int ldlm_setup(void)
918 {
919         static struct ptlrpc_service_conf       conf;
920         struct ldlm_bl_pool                     *blp = NULL;
921         int rc = 0;
922         int i;
923
924         if (ldlm_state)
925                 return -EALREADY;
926
927         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
928         if (!ldlm_state)
929                 return -ENOMEM;
930
931         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
932         if (!ldlm_kobj) {
933                 rc = -ENOMEM;
934                 goto out;
935         }
936
937         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
938         if (rc)
939                 goto out;
940
941         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
942         if (!ldlm_ns_kset) {
943                 rc = -ENOMEM;
944                 goto out;
945         }
946
947         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
948         if (!ldlm_svc_kset) {
949                 rc = -ENOMEM;
950                 goto out;
951         }
952
953         rc = ldlm_debugfs_setup();
954         if (rc != 0)
955                 goto out;
956
957         memset(&conf, 0, sizeof(conf));
958         conf = (typeof(conf)) {
959                 .psc_name               = "ldlm_cbd",
960                 .psc_watchdog_factor    = 2,
961                 .psc_buf                = {
962                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
963                         .bc_buf_size            = LDLM_BUFSIZE,
964                         .bc_req_max_size        = LDLM_MAXREQSIZE,
965                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
966                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
967                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
968                 },
969                 .psc_thr                = {
970                         .tc_thr_name            = "ldlm_cb",
971                         .tc_thr_factor          = LDLM_THR_FACTOR,
972                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
973                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
974                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
975                         .tc_nthrs_user          = ldlm_num_threads,
976                         .tc_cpu_affinity        = 1,
977                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
978                 },
979                 .psc_cpt                = {
980                         .cc_pattern             = ldlm_cpts,
981                 },
982                 .psc_ops                = {
983                         .so_req_handler         = ldlm_callback_handler,
984                 },
985         };
986         ldlm_state->ldlm_cb_service =
987                         ptlrpc_register_service(&conf, ldlm_svc_kset,
988                                                 ldlm_svc_debugfs_dir);
989         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
990                 CERROR("failed to start service\n");
991                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
992                 ldlm_state->ldlm_cb_service = NULL;
993                 goto out;
994         }
995
996         blp = kzalloc(sizeof(*blp), GFP_NOFS);
997         if (!blp) {
998                 rc = -ENOMEM;
999                 goto out;
1000         }
1001         ldlm_state->ldlm_bl_pool = blp;
1002
1003         spin_lock_init(&blp->blp_lock);
1004         INIT_LIST_HEAD(&blp->blp_list);
1005         INIT_LIST_HEAD(&blp->blp_prio_list);
1006         init_waitqueue_head(&blp->blp_waitq);
1007         atomic_set(&blp->blp_num_threads, 0);
1008         atomic_set(&blp->blp_busy_threads, 0);
1009
1010         if (ldlm_num_threads == 0) {
1011                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1012                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1013         } else {
1014                 blp->blp_min_threads = blp->blp_max_threads =
1015                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1016                                                          ldlm_num_threads));
1017         }
1018
1019         for (i = 0; i < blp->blp_min_threads; i++) {
1020                 rc = ldlm_bl_thread_start(blp);
1021                 if (rc < 0)
1022                         goto out;
1023         }
1024
1025         rc = ldlm_pools_init();
1026         if (rc) {
1027                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1028                 goto out;
1029         }
1030         return 0;
1031
1032  out:
1033         ldlm_cleanup();
1034         return rc;
1035 }
1036
1037 static int ldlm_cleanup(void)
1038 {
1039         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1040             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1041                 CERROR("ldlm still has namespaces; clean these up first.\n");
1042                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1043                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1044                 return -EBUSY;
1045         }
1046
1047         ldlm_pools_fini();
1048
1049         if (ldlm_state->ldlm_bl_pool) {
1050                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1051
1052                 while (atomic_read(&blp->blp_num_threads) > 0) {
1053                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1054
1055                         init_completion(&blp->blp_comp);
1056
1057                         spin_lock(&blp->blp_lock);
1058                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1059                         wake_up(&blp->blp_waitq);
1060                         spin_unlock(&blp->blp_lock);
1061
1062                         wait_for_completion(&blp->blp_comp);
1063                 }
1064
1065                 kfree(blp);
1066         }
1067
1068         if (ldlm_state->ldlm_cb_service)
1069                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1070
1071         if (ldlm_ns_kset)
1072                 kset_unregister(ldlm_ns_kset);
1073         if (ldlm_svc_kset)
1074                 kset_unregister(ldlm_svc_kset);
1075         if (ldlm_kobj)
1076                 kobject_put(ldlm_kobj);
1077
1078         ldlm_debugfs_cleanup();
1079
1080         kfree(ldlm_state);
1081         ldlm_state = NULL;
1082
1083         return 0;
1084 }
1085
1086 int ldlm_init(void)
1087 {
1088         mutex_init(&ldlm_ref_mutex);
1089         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1090         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1091         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1092                                                sizeof(struct ldlm_resource), 0,
1093                                                SLAB_HWCACHE_ALIGN, NULL);
1094         if (!ldlm_resource_slab)
1095                 return -ENOMEM;
1096
1097         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1098                               sizeof(struct ldlm_lock), 0,
1099                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1100         if (!ldlm_lock_slab) {
1101                 kmem_cache_destroy(ldlm_resource_slab);
1102                 return -ENOMEM;
1103         }
1104
1105         ldlm_interval_slab = kmem_cache_create("interval_node",
1106                                         sizeof(struct ldlm_interval),
1107                                         0, SLAB_HWCACHE_ALIGN, NULL);
1108         if (!ldlm_interval_slab) {
1109                 kmem_cache_destroy(ldlm_resource_slab);
1110                 kmem_cache_destroy(ldlm_lock_slab);
1111                 return -ENOMEM;
1112         }
1113 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1114         class_export_dump_hook = ldlm_dump_export_locks;
1115 #endif
1116         return 0;
1117 }
1118
1119 void ldlm_exit(void)
1120 {
1121         if (ldlm_refcount)
1122                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1123         kmem_cache_destroy(ldlm_resource_slab);
1124         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1125          * synchronize_rcu() to wait a grace period elapsed, so that
1126          * ldlm_lock_free() get a chance to be called.
1127          */
1128         synchronize_rcu();
1129         kmem_cache_destroy(ldlm_lock_slab);
1130         kmem_cache_destroy(ldlm_interval_slab);
1131 }