Merge tag 'docs-for-linus' of git://git.lwn.net/linux
[cascardo/linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/obd_class.h"
43 #include <linux/list.h>
44 #include "ldlm_internal.h"
45
46 static int ldlm_num_threads;
47 module_param(ldlm_num_threads, int, 0444);
48 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
49
50 static char *ldlm_cpts;
51 module_param(ldlm_cpts, charp, 0444);
52 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
53
54 static struct mutex     ldlm_ref_mutex;
55 static int ldlm_refcount;
56
57 static struct kobject *ldlm_kobj;
58 struct kset *ldlm_ns_kset;
59 static struct kset *ldlm_svc_kset;
60
61 struct ldlm_cb_async_args {
62         struct ldlm_cb_set_arg *ca_set_arg;
63         struct ldlm_lock       *ca_lock;
64 };
65
66 /* LDLM state */
67
68 static struct ldlm_state *ldlm_state;
69
70 #define ELT_STOPPED   0
71 #define ELT_READY     1
72 #define ELT_TERMINATE 2
73
74 struct ldlm_bl_pool {
75         spinlock_t              blp_lock;
76
77         /*
78          * blp_prio_list is used for callbacks that should be handled
79          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
80          * see bug 13843
81          */
82         struct list_head              blp_prio_list;
83
84         /*
85          * blp_list is used for all other callbacks which are likely
86          * to take longer to process.
87          */
88         struct list_head              blp_list;
89
90         wait_queue_head_t            blp_waitq;
91         struct completion       blp_comp;
92         atomic_t            blp_num_threads;
93         atomic_t            blp_busy_threads;
94         int                  blp_min_threads;
95         int                  blp_max_threads;
96 };
97
98 struct ldlm_bl_work_item {
99         struct list_head              blwi_entry;
100         struct ldlm_namespace  *blwi_ns;
101         struct ldlm_lock_desc   blwi_ld;
102         struct ldlm_lock       *blwi_lock;
103         struct list_head              blwi_head;
104         int                  blwi_count;
105         struct completion       blwi_comp;
106         enum ldlm_cancel_flags  blwi_flags;
107         int                  blwi_mem_pressure;
108 };
109
110 /**
111  * Callback handler for receiving incoming blocking ASTs.
112  *
113  * This can only happen on client side.
114  */
115 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
116                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
117 {
118         int do_ast;
119
120         LDLM_DEBUG(lock, "client blocking AST callback handler");
121
122         lock_res_and_lock(lock);
123         ldlm_set_cbpending(lock);
124
125         if (ldlm_is_cancel_on_block(lock))
126                 ldlm_set_cancel(lock);
127
128         do_ast = !lock->l_readers && !lock->l_writers;
129         unlock_res_and_lock(lock);
130
131         if (do_ast) {
132                 CDEBUG(D_DLMTRACE,
133                        "Lock %p already unused, calling callback (%p)\n", lock,
134                        lock->l_blocking_ast);
135                 if (lock->l_blocking_ast)
136                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
137                                              LDLM_CB_BLOCKING);
138         } else {
139                 CDEBUG(D_DLMTRACE,
140                        "Lock %p is referenced, will be cancelled later\n",
141                        lock);
142         }
143
144         LDLM_DEBUG(lock, "client blocking callback handler END");
145         LDLM_LOCK_RELEASE(lock);
146 }
147
148 /**
149  * Callback handler for receiving incoming completion ASTs.
150  *
151  * This only can happen on client side.
152  */
153 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
154                                     struct ldlm_namespace *ns,
155                                     struct ldlm_request *dlm_req,
156                                     struct ldlm_lock *lock)
157 {
158         int lvb_len;
159         LIST_HEAD(ast_list);
160         int rc = 0;
161
162         LDLM_DEBUG(lock, "client completion callback handler START");
163
164         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
165                 int to = cfs_time_seconds(1);
166
167                 while (to > 0) {
168                         set_current_state(TASK_INTERRUPTIBLE);
169                         schedule_timeout(to);
170                         if (lock->l_granted_mode == lock->l_req_mode ||
171                             ldlm_is_destroyed(lock))
172                                 break;
173                 }
174         }
175
176         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
177         if (lvb_len < 0) {
178                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
179                 rc = lvb_len;
180                 goto out;
181         } else if (lvb_len > 0) {
182                 if (lock->l_lvb_len > 0) {
183                         /* for extent lock, lvb contains ost_lvb{}. */
184                         LASSERT(lock->l_lvb_data);
185
186                         if (unlikely(lock->l_lvb_len < lvb_len)) {
187                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
188                                            lock->l_lvb_len, lvb_len);
189                                 rc = -EINVAL;
190                                 goto out;
191                         }
192                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
193                                                      * variable length
194                                                      */
195                         void *lvb_data;
196
197                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
198                         if (!lvb_data) {
199                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
200                                 rc = -ENOMEM;
201                                 goto out;
202                         }
203
204                         lock_res_and_lock(lock);
205                         LASSERT(!lock->l_lvb_data);
206                         lock->l_lvb_type = LVB_T_LAYOUT;
207                         lock->l_lvb_data = lvb_data;
208                         lock->l_lvb_len = lvb_len;
209                         unlock_res_and_lock(lock);
210                 }
211         }
212
213         lock_res_and_lock(lock);
214         if (ldlm_is_destroyed(lock) ||
215             lock->l_granted_mode == lock->l_req_mode) {
216                 /* bug 11300: the lock has already been granted */
217                 unlock_res_and_lock(lock);
218                 LDLM_DEBUG(lock, "Double grant race happened");
219                 rc = 0;
220                 goto out;
221         }
222
223         /* If we receive the completion AST before the actual enqueue returned,
224          * then we might need to switch lock modes, resources, or extents.
225          */
226         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
227                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
228                 LDLM_DEBUG(lock, "completion AST, new lock mode");
229         }
230
231         if (lock->l_resource->lr_type != LDLM_PLAIN) {
232                 ldlm_convert_policy_to_local(req->rq_export,
233                                           dlm_req->lock_desc.l_resource.lr_type,
234                                           &dlm_req->lock_desc.l_policy_data,
235                                           &lock->l_policy_data);
236                 LDLM_DEBUG(lock, "completion AST, new policy data");
237         }
238
239         ldlm_resource_unlink_lock(lock);
240         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
241                    &lock->l_resource->lr_name,
242                    sizeof(lock->l_resource->lr_name)) != 0) {
243                 unlock_res_and_lock(lock);
244                 rc = ldlm_lock_change_resource(ns, lock,
245                                 &dlm_req->lock_desc.l_resource.lr_name);
246                 if (rc < 0) {
247                         LDLM_ERROR(lock, "Failed to allocate resource");
248                         goto out;
249                 }
250                 LDLM_DEBUG(lock, "completion AST, new resource");
251                 CERROR("change resource!\n");
252                 lock_res_and_lock(lock);
253         }
254
255         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
256                 /* BL_AST locks are not needed in LRU.
257                  * Let ldlm_cancel_lru() be fast.
258                  */
259                 ldlm_lock_remove_from_lru(lock);
260                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
261                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
262         }
263
264         if (lock->l_lvb_len > 0) {
265                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
266                                    lock->l_lvb_data, lvb_len);
267                 if (rc < 0) {
268                         unlock_res_and_lock(lock);
269                         goto out;
270                 }
271         }
272
273         ldlm_grant_lock(lock, &ast_list);
274         unlock_res_and_lock(lock);
275
276         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
277
278         /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
280
281         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
282
283         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
284                           lock);
285         goto out;
286
287 out:
288         if (rc < 0) {
289                 lock_res_and_lock(lock);
290                 ldlm_set_failed(lock);
291                 unlock_res_and_lock(lock);
292                 wake_up(&lock->l_waitq);
293         }
294         LDLM_LOCK_RELEASE(lock);
295 }
296
297 /**
298  * Callback handler for receiving incoming glimpse ASTs.
299  *
300  * This only can happen on client side.  After handling the glimpse AST
301  * we also consider dropping the lock here if it is unused locally for a
302  * long time.
303  */
304 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
305                                     struct ldlm_namespace *ns,
306                                     struct ldlm_request *dlm_req,
307                                     struct ldlm_lock *lock)
308 {
309         int rc = -ENOSYS;
310
311         LDLM_DEBUG(lock, "client glimpse AST callback handler");
312
313         if (lock->l_glimpse_ast)
314                 rc = lock->l_glimpse_ast(lock, req);
315
316         if (req->rq_repmsg) {
317                 ptlrpc_reply(req);
318         } else {
319                 req->rq_status = rc;
320                 ptlrpc_error(req);
321         }
322
323         lock_res_and_lock(lock);
324         if (lock->l_granted_mode == LCK_PW &&
325             !lock->l_readers && !lock->l_writers &&
326             cfs_time_after(cfs_time_current(),
327                            cfs_time_add(lock->l_last_used,
328                                         cfs_time_seconds(10)))) {
329                 unlock_res_and_lock(lock);
330                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
331                         ldlm_handle_bl_callback(ns, NULL, lock);
332
333                 return;
334         }
335         unlock_res_and_lock(lock);
336         LDLM_LOCK_RELEASE(lock);
337 }
338
339 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
340 {
341         if (req->rq_no_reply)
342                 return 0;
343
344         req->rq_status = rc;
345         if (!req->rq_packed_final) {
346                 rc = lustre_pack_reply(req, 1, NULL, NULL);
347                 if (rc)
348                         return rc;
349         }
350         return ptlrpc_reply(req);
351 }
352
353 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
354                                enum ldlm_cancel_flags cancel_flags)
355 {
356         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
357
358         spin_lock(&blp->blp_lock);
359         if (blwi->blwi_lock && ldlm_is_discard_data(blwi->blwi_lock)) {
360                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
361                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
362         } else {
363                 /* other blocking callbacks are added to the regular list */
364                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
365         }
366         spin_unlock(&blp->blp_lock);
367
368         wake_up(&blp->blp_waitq);
369
370         /* can not check blwi->blwi_flags as blwi could be already freed in
371          * LCF_ASYNC mode
372          */
373         if (!(cancel_flags & LCF_ASYNC))
374                 wait_for_completion(&blwi->blwi_comp);
375
376         return 0;
377 }
378
379 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
380                              struct ldlm_namespace *ns,
381                              struct ldlm_lock_desc *ld,
382                              struct list_head *cancels, int count,
383                              struct ldlm_lock *lock,
384                              enum ldlm_cancel_flags cancel_flags)
385 {
386         init_completion(&blwi->blwi_comp);
387         INIT_LIST_HEAD(&blwi->blwi_head);
388
389         if (memory_pressure_get())
390                 blwi->blwi_mem_pressure = 1;
391
392         blwi->blwi_ns = ns;
393         blwi->blwi_flags = cancel_flags;
394         if (ld)
395                 blwi->blwi_ld = *ld;
396         if (count) {
397                 list_add(&blwi->blwi_head, cancels);
398                 list_del_init(cancels);
399                 blwi->blwi_count = count;
400         } else {
401                 blwi->blwi_lock = lock;
402         }
403 }
404
405 /**
406  * Queues a list of locks \a cancels containing \a count locks
407  * for later processing by a blocking thread.  If \a count is zero,
408  * then the lock referenced as \a lock is queued instead.
409  *
410  * The blocking thread would then call ->l_blocking_ast callback in the lock.
411  * If list addition fails an error is returned and caller is supposed to
412  * call ->l_blocking_ast itself.
413  */
414 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
415                              struct ldlm_lock_desc *ld,
416                              struct ldlm_lock *lock,
417                              struct list_head *cancels, int count,
418                              enum ldlm_cancel_flags cancel_flags)
419 {
420         if (cancels && count == 0)
421                 return 0;
422
423         if (cancel_flags & LCF_ASYNC) {
424                 struct ldlm_bl_work_item *blwi;
425
426                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
427                 if (!blwi)
428                         return -ENOMEM;
429                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
430
431                 return __ldlm_bl_to_thread(blwi, cancel_flags);
432         } else {
433                 /* if it is synchronous call do minimum mem alloc, as it could
434                  * be triggered from kernel shrinker
435                  */
436                 struct ldlm_bl_work_item blwi;
437
438                 memset(&blwi, 0, sizeof(blwi));
439                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
440                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
441         }
442 }
443
444 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
445                            struct ldlm_lock *lock)
446 {
447         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
448 }
449
450 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
451                            struct list_head *cancels, int count,
452                            enum ldlm_cancel_flags cancel_flags)
453 {
454         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
455 }
456
457 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
458 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
459 {
460         struct obd_device *obd = req->rq_export->exp_obd;
461         char *key;
462         void *val;
463         int keylen, vallen;
464         int rc = -ENOSYS;
465
466         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
467
468         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
469
470         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
471         if (!key) {
472                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
473                 return -EFAULT;
474         }
475         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
476                                       RCL_CLIENT);
477         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
478         if (!val) {
479                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
480                 return -EFAULT;
481         }
482         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
483                                       RCL_CLIENT);
484
485         /* We are responsible for swabbing contents of val */
486
487         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
488                 /* Pass it on to mdc (the "export" in this case) */
489                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
490                                         req->rq_export,
491                                         sizeof(KEY_HSM_COPYTOOL_SEND),
492                                         KEY_HSM_COPYTOOL_SEND,
493                                         vallen, val, NULL);
494         else
495                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
496
497         return rc;
498 }
499
500 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
501                                         const char *msg, int rc,
502                                         const struct lustre_handle *handle)
503 {
504         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
505                   "%s: [nid %s] [rc %d] [lock %#llx]",
506                   msg, libcfs_id2str(req->rq_peer), rc,
507                   handle ? handle->cookie : 0);
508         if (req->rq_no_reply)
509                 CWARN("No reply was sent, maybe cause bug 21636.\n");
510         else if (rc)
511                 CWARN("Send reply failed, maybe cause bug 21636.\n");
512 }
513
514 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
515 {
516         struct obd_quotactl *oqctl;
517         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
518
519         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
520         if (!oqctl) {
521                 CERROR("Can't unpack obd_quotactl\n");
522                 return -EPROTO;
523         }
524
525         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
526
527         cli->cl_qchk_stat = oqctl->qc_stat;
528         return 0;
529 }
530
531 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
532 static int ldlm_callback_handler(struct ptlrpc_request *req)
533 {
534         struct ldlm_namespace *ns;
535         struct ldlm_request *dlm_req;
536         struct ldlm_lock *lock;
537         int rc;
538
539         /* Requests arrive in sender's byte order.  The ptlrpc service
540          * handler has already checked and, if necessary, byte-swapped the
541          * incoming request message body, but I am responsible for the
542          * message buffers.
543          */
544
545         /* do nothing for sec context finalize */
546         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
547                 return 0;
548
549         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
550
551         if (!req->rq_export) {
552                 rc = ldlm_callback_reply(req, -ENOTCONN);
553                 ldlm_callback_errmsg(req, "Operate on unconnected server",
554                                      rc, NULL);
555                 return 0;
556         }
557
558         LASSERT(req->rq_export->exp_obd);
559
560         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
561         case LDLM_BL_CALLBACK:
562                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
563                         return 0;
564                 break;
565         case LDLM_CP_CALLBACK:
566                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
567                         return 0;
568                 break;
569         case LDLM_GL_CALLBACK:
570                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
571                         return 0;
572                 break;
573         case LDLM_SET_INFO:
574                 rc = ldlm_handle_setinfo(req);
575                 ldlm_callback_reply(req, rc);
576                 return 0;
577         case OBD_QC_CALLBACK:
578                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
579                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
580                         return 0;
581                 rc = ldlm_handle_qc_callback(req);
582                 ldlm_callback_reply(req, rc);
583                 return 0;
584         default:
585                 CERROR("unknown opcode %u\n",
586                        lustre_msg_get_opc(req->rq_reqmsg));
587                 ldlm_callback_reply(req, -EPROTO);
588                 return 0;
589         }
590
591         ns = req->rq_export->exp_obd->obd_namespace;
592         LASSERT(ns);
593
594         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
595
596         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
597         if (!dlm_req) {
598                 rc = ldlm_callback_reply(req, -EPROTO);
599                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
600                                      NULL);
601                 return 0;
602         }
603
604         /* Force a known safe race, send a cancel to the server for a lock
605          * which the server has already started a blocking callback on.
606          */
607         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
608             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
609                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
610                 if (rc < 0)
611                         CERROR("ldlm_cli_cancel: %d\n", rc);
612         }
613
614         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
615         if (!lock) {
616                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
617                        dlm_req->lock_handle[0].cookie);
618                 rc = ldlm_callback_reply(req, -EINVAL);
619                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
620                                      &dlm_req->lock_handle[0]);
621                 return 0;
622         }
623
624         if (ldlm_is_fail_loc(lock) &&
625             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
626                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
627
628         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
629         lock_res_and_lock(lock);
630         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
631                                               LDLM_FL_AST_MASK);
632         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
633                 /* If somebody cancels lock and cache is already dropped,
634                  * or lock is failed before cp_ast received on client,
635                  * we can tell the server we have no lock. Otherwise, we
636                  * should send cancel after dropping the cache.
637                  */
638                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
639                     ldlm_is_failed(lock)) {
640                         LDLM_DEBUG(lock,
641                                    "callback on lock %#llx - lock disappeared",
642                                    dlm_req->lock_handle[0].cookie);
643                         unlock_res_and_lock(lock);
644                         LDLM_LOCK_RELEASE(lock);
645                         rc = ldlm_callback_reply(req, -EINVAL);
646                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
647                                              &dlm_req->lock_handle[0]);
648                         return 0;
649                 }
650                 /* BL_AST locks are not needed in LRU.
651                  * Let ldlm_cancel_lru() be fast.
652                  */
653                 ldlm_lock_remove_from_lru(lock);
654                 ldlm_set_bl_ast(lock);
655         }
656         unlock_res_and_lock(lock);
657
658         /* We want the ost thread to get this reply so that it can respond
659          * to ost requests (write cache writeback) that might be triggered
660          * in the callback.
661          *
662          * But we'd also like to be able to indicate in the reply that we're
663          * cancelling right now, because it's unused, or have an intent result
664          * in the reply, so we might have to push the responsibility for sending
665          * the reply down into the AST handlers, alas.
666          */
667
668         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
669         case LDLM_BL_CALLBACK:
670                 CDEBUG(D_INODE, "blocking ast\n");
671                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
672                 if (!ldlm_is_cancel_on_block(lock)) {
673                         rc = ldlm_callback_reply(req, 0);
674                         if (req->rq_no_reply || rc)
675                                 ldlm_callback_errmsg(req, "Normal process", rc,
676                                                      &dlm_req->lock_handle[0]);
677                 }
678                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
679                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
680                 break;
681         case LDLM_CP_CALLBACK:
682                 CDEBUG(D_INODE, "completion ast\n");
683                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
684                 ldlm_callback_reply(req, 0);
685                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
686                 break;
687         case LDLM_GL_CALLBACK:
688                 CDEBUG(D_INODE, "glimpse ast\n");
689                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
690                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
691                 break;
692         default:
693                 LBUG();                  /* checked above */
694         }
695
696         return 0;
697 }
698
699 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
700 {
701         struct ldlm_bl_work_item *blwi = NULL;
702         static unsigned int num_bl;
703
704         spin_lock(&blp->blp_lock);
705         /* process a request from the blp_list at least every blp_num_threads */
706         if (!list_empty(&blp->blp_list) &&
707             (list_empty(&blp->blp_prio_list) || num_bl == 0))
708                 blwi = list_entry(blp->blp_list.next,
709                                       struct ldlm_bl_work_item, blwi_entry);
710         else
711                 if (!list_empty(&blp->blp_prio_list))
712                         blwi = list_entry(blp->blp_prio_list.next,
713                                               struct ldlm_bl_work_item,
714                                               blwi_entry);
715
716         if (blwi) {
717                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
718                         num_bl = 0;
719                 list_del(&blwi->blwi_entry);
720         }
721         spin_unlock(&blp->blp_lock);
722
723         return blwi;
724 }
725
726 /* This only contains temporary data until the thread starts */
727 struct ldlm_bl_thread_data {
728         char                    bltd_name[CFS_CURPROC_COMM_MAX];
729         struct ldlm_bl_pool     *bltd_blp;
730         struct completion       bltd_comp;
731         int                     bltd_num;
732 };
733
734 static int ldlm_bl_thread_main(void *arg);
735
736 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
737 {
738         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
739         struct task_struct *task;
740
741         init_completion(&bltd.bltd_comp);
742         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
743         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
744                 "ldlm_bl_%02d", bltd.bltd_num);
745         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
746         if (IS_ERR(task)) {
747                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
748                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
749                 return PTR_ERR(task);
750         }
751         wait_for_completion(&bltd.bltd_comp);
752
753         return 0;
754 }
755
756 /**
757  * Main blocking requests processing thread.
758  *
759  * Callers put locks into its queue by calling ldlm_bl_to_thread.
760  * This thread in the end ends up doing actual call to ->l_blocking_ast
761  * for queued locks.
762  */
763 static int ldlm_bl_thread_main(void *arg)
764 {
765         struct ldlm_bl_pool *blp;
766
767         {
768                 struct ldlm_bl_thread_data *bltd = arg;
769
770                 blp = bltd->bltd_blp;
771
772                 atomic_inc(&blp->blp_num_threads);
773                 atomic_inc(&blp->blp_busy_threads);
774
775                 complete(&bltd->bltd_comp);
776                 /* cannot use bltd after this, it is only on caller's stack */
777         }
778
779         while (1) {
780                 struct l_wait_info lwi = { 0 };
781                 struct ldlm_bl_work_item *blwi = NULL;
782                 int busy;
783
784                 blwi = ldlm_bl_get_work(blp);
785
786                 if (!blwi) {
787                         atomic_dec(&blp->blp_busy_threads);
788                         l_wait_event_exclusive(blp->blp_waitq,
789                                          (blwi = ldlm_bl_get_work(blp)),
790                                          &lwi);
791                         busy = atomic_inc_return(&blp->blp_busy_threads);
792                 } else {
793                         busy = atomic_read(&blp->blp_busy_threads);
794                 }
795
796                 if (!blwi->blwi_ns)
797                         /* added by ldlm_cleanup() */
798                         break;
799
800                 /* Not fatal if racy and have a few too many threads */
801                 if (unlikely(busy < blp->blp_max_threads &&
802                              busy >= atomic_read(&blp->blp_num_threads) &&
803                              !blwi->blwi_mem_pressure))
804                         /* discard the return value, we tried */
805                         ldlm_bl_thread_start(blp);
806
807                 if (blwi->blwi_mem_pressure)
808                         memory_pressure_set();
809
810                 if (blwi->blwi_count) {
811                         int count;
812                         /* The special case when we cancel locks in LRU
813                          * asynchronously, we pass the list of locks here.
814                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
815                          * canceled locally yet.
816                          */
817                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
818                                                            blwi->blwi_count,
819                                                            LCF_BL_AST);
820                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
821                                              blwi->blwi_flags);
822                 } else {
823                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
824                                                 blwi->blwi_lock);
825                 }
826                 if (blwi->blwi_mem_pressure)
827                         memory_pressure_clr();
828
829                 if (blwi->blwi_flags & LCF_ASYNC)
830                         kfree(blwi);
831                 else
832                         complete(&blwi->blwi_comp);
833         }
834
835         atomic_dec(&blp->blp_busy_threads);
836         atomic_dec(&blp->blp_num_threads);
837         complete(&blp->blp_comp);
838         return 0;
839 }
840
841 static int ldlm_setup(void);
842 static int ldlm_cleanup(void);
843
844 int ldlm_get_ref(void)
845 {
846         int rc = 0;
847
848         mutex_lock(&ldlm_ref_mutex);
849         if (++ldlm_refcount == 1) {
850                 rc = ldlm_setup();
851                 if (rc)
852                         ldlm_refcount--;
853         }
854         mutex_unlock(&ldlm_ref_mutex);
855
856         return rc;
857 }
858 EXPORT_SYMBOL(ldlm_get_ref);
859
860 void ldlm_put_ref(void)
861 {
862         mutex_lock(&ldlm_ref_mutex);
863         if (ldlm_refcount == 1) {
864                 int rc = ldlm_cleanup();
865
866                 if (rc)
867                         CERROR("ldlm_cleanup failed: %d\n", rc);
868                 else
869                         ldlm_refcount--;
870         } else {
871                 ldlm_refcount--;
872         }
873         mutex_unlock(&ldlm_ref_mutex);
874 }
875 EXPORT_SYMBOL(ldlm_put_ref);
876
877 extern unsigned int ldlm_cancel_unused_locks_before_replay;
878
879 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
880                                                       struct attribute *attr,
881                                                       char *buf)
882 {
883         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
884 }
885
886 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
887                                                        struct attribute *attr,
888                                                        const char *buffer,
889                                                        size_t count)
890 {
891         int rc;
892         unsigned long val;
893
894         rc = kstrtoul(buffer, 10, &val);
895         if (rc)
896                 return rc;
897
898         ldlm_cancel_unused_locks_before_replay = val;
899
900         return count;
901 }
902 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
903
904 /* These are for root of /sys/fs/lustre/ldlm */
905 static struct attribute *ldlm_attrs[] = {
906         &lustre_attr_cancel_unused_locks_before_replay.attr,
907         NULL,
908 };
909
910 static struct attribute_group ldlm_attr_group = {
911         .attrs = ldlm_attrs,
912 };
913
914 static int ldlm_setup(void)
915 {
916         static struct ptlrpc_service_conf       conf;
917         struct ldlm_bl_pool                     *blp = NULL;
918         int rc = 0;
919         int i;
920
921         if (ldlm_state)
922                 return -EALREADY;
923
924         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
925         if (!ldlm_state)
926                 return -ENOMEM;
927
928         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
929         if (!ldlm_kobj) {
930                 rc = -ENOMEM;
931                 goto out;
932         }
933
934         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
935         if (rc)
936                 goto out;
937
938         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
939         if (!ldlm_ns_kset) {
940                 rc = -ENOMEM;
941                 goto out;
942         }
943
944         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
945         if (!ldlm_svc_kset) {
946                 rc = -ENOMEM;
947                 goto out;
948         }
949
950         rc = ldlm_debugfs_setup();
951         if (rc != 0)
952                 goto out;
953
954         memset(&conf, 0, sizeof(conf));
955         conf = (typeof(conf)) {
956                 .psc_name               = "ldlm_cbd",
957                 .psc_watchdog_factor    = 2,
958                 .psc_buf                = {
959                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
960                         .bc_buf_size            = LDLM_BUFSIZE,
961                         .bc_req_max_size        = LDLM_MAXREQSIZE,
962                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
963                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
964                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
965                 },
966                 .psc_thr                = {
967                         .tc_thr_name            = "ldlm_cb",
968                         .tc_thr_factor          = LDLM_THR_FACTOR,
969                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
970                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
971                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
972                         .tc_nthrs_user          = ldlm_num_threads,
973                         .tc_cpu_affinity        = 1,
974                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
975                 },
976                 .psc_cpt                = {
977                         .cc_pattern             = ldlm_cpts,
978                 },
979                 .psc_ops                = {
980                         .so_req_handler         = ldlm_callback_handler,
981                 },
982         };
983         ldlm_state->ldlm_cb_service =
984                         ptlrpc_register_service(&conf, ldlm_svc_kset,
985                                                 ldlm_svc_debugfs_dir);
986         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
987                 CERROR("failed to start service\n");
988                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
989                 ldlm_state->ldlm_cb_service = NULL;
990                 goto out;
991         }
992
993         blp = kzalloc(sizeof(*blp), GFP_NOFS);
994         if (!blp) {
995                 rc = -ENOMEM;
996                 goto out;
997         }
998         ldlm_state->ldlm_bl_pool = blp;
999
1000         spin_lock_init(&blp->blp_lock);
1001         INIT_LIST_HEAD(&blp->blp_list);
1002         INIT_LIST_HEAD(&blp->blp_prio_list);
1003         init_waitqueue_head(&blp->blp_waitq);
1004         atomic_set(&blp->blp_num_threads, 0);
1005         atomic_set(&blp->blp_busy_threads, 0);
1006
1007         if (ldlm_num_threads == 0) {
1008                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1009                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1010         } else {
1011                 blp->blp_min_threads = min_t(int, LDLM_NTHRS_MAX,
1012                                              max_t(int, LDLM_NTHRS_INIT,
1013                                                    ldlm_num_threads));
1014
1015                 blp->blp_max_threads = blp->blp_min_threads;
1016         }
1017
1018         for (i = 0; i < blp->blp_min_threads; i++) {
1019                 rc = ldlm_bl_thread_start(blp);
1020                 if (rc < 0)
1021                         goto out;
1022         }
1023
1024         rc = ldlm_pools_init();
1025         if (rc) {
1026                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1027                 goto out;
1028         }
1029         return 0;
1030
1031  out:
1032         ldlm_cleanup();
1033         return rc;
1034 }
1035
1036 static int ldlm_cleanup(void)
1037 {
1038         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1039             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1040                 CERROR("ldlm still has namespaces; clean these up first.\n");
1041                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1042                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1043                 return -EBUSY;
1044         }
1045
1046         ldlm_pools_fini();
1047
1048         if (ldlm_state->ldlm_bl_pool) {
1049                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1050
1051                 while (atomic_read(&blp->blp_num_threads) > 0) {
1052                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1053
1054                         init_completion(&blp->blp_comp);
1055
1056                         spin_lock(&blp->blp_lock);
1057                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1058                         wake_up(&blp->blp_waitq);
1059                         spin_unlock(&blp->blp_lock);
1060
1061                         wait_for_completion(&blp->blp_comp);
1062                 }
1063
1064                 kfree(blp);
1065         }
1066
1067         if (ldlm_state->ldlm_cb_service)
1068                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1069
1070         if (ldlm_ns_kset)
1071                 kset_unregister(ldlm_ns_kset);
1072         if (ldlm_svc_kset)
1073                 kset_unregister(ldlm_svc_kset);
1074         if (ldlm_kobj)
1075                 kobject_put(ldlm_kobj);
1076
1077         ldlm_debugfs_cleanup();
1078
1079         kfree(ldlm_state);
1080         ldlm_state = NULL;
1081
1082         return 0;
1083 }
1084
1085 int ldlm_init(void)
1086 {
1087         mutex_init(&ldlm_ref_mutex);
1088         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1089         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1090         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1091                                                sizeof(struct ldlm_resource), 0,
1092                                                SLAB_HWCACHE_ALIGN, NULL);
1093         if (!ldlm_resource_slab)
1094                 return -ENOMEM;
1095
1096         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1097                               sizeof(struct ldlm_lock), 0,
1098                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1099         if (!ldlm_lock_slab) {
1100                 kmem_cache_destroy(ldlm_resource_slab);
1101                 return -ENOMEM;
1102         }
1103
1104         ldlm_interval_slab = kmem_cache_create("interval_node",
1105                                         sizeof(struct ldlm_interval),
1106                                         0, SLAB_HWCACHE_ALIGN, NULL);
1107         if (!ldlm_interval_slab) {
1108                 kmem_cache_destroy(ldlm_resource_slab);
1109                 kmem_cache_destroy(ldlm_lock_slab);
1110                 return -ENOMEM;
1111         }
1112 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1113         class_export_dump_hook = ldlm_dump_export_locks;
1114 #endif
1115         return 0;
1116 }
1117
1118 void ldlm_exit(void)
1119 {
1120         if (ldlm_refcount)
1121                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1122         kmem_cache_destroy(ldlm_resource_slab);
1123         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1124          * synchronize_rcu() to wait a grace period elapsed, so that
1125          * ldlm_lock_free() get a chance to be called.
1126          */
1127         synchronize_rcu();
1128         kmem_cache_destroy(ldlm_lock_slab);
1129         kmem_cache_destroy(ldlm_interval_slab);
1130 }