Merge 4.7-rc6 into staging-next
[cascardo/linux.git] / drivers / staging / lustre / lustre / ptlrpc / ptlrpcd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ptlrpc/ptlrpcd.c
33  */
34
35 /** \defgroup ptlrpcd PortalRPC daemon
36  *
37  * ptlrpcd is a special thread with its own set where other user might add
38  * requests when they don't want to wait for their completion.
39  * PtlRPCD will take care of sending such requests and then processing their
40  * replies and calling completion callbacks as necessary.
41  * The callbacks are called directly from ptlrpcd context.
42  * It is important to never significantly block (esp. on RPCs!) within such
43  * completion handler or a deadlock might occur where ptlrpcd enters some
44  * callback that attempts to send another RPC and wait for it to return,
45  * during which time ptlrpcd is completely blocked, so e.g. if import
46  * fails, recovery cannot progress because connection requests are also
47  * sent by ptlrpcd.
48  *
49  * @{
50  */
51
52 #define DEBUG_SUBSYSTEM S_RPC
53
54 #include "../../include/linux/libcfs/libcfs.h"
55
56 #include "../include/lustre_net.h"
57 #include "../include/lustre_lib.h"
58 #include "../include/lustre_ha.h"
59 #include "../include/obd_class.h"       /* for obd_zombie */
60 #include "../include/obd_support.h"     /* for OBD_FAIL_CHECK */
61 #include "../include/cl_object.h"       /* cl_env_{get,put}() */
62 #include "../include/lprocfs_status.h"
63
64 #include "ptlrpc_internal.h"
65
66 /* One of these per CPT. */
67 struct ptlrpcd {
68         int pd_size;
69         int pd_index;
70         int pd_cpt;
71         int pd_cursor;
72         int pd_nthreads;
73         int pd_groupsize;
74         struct ptlrpcd_ctl pd_threads[0];
75 };
76
77 /*
78  * max_ptlrpcds is obsolete, but retained to ensure that the kernel
79  * module will load on a system where it has been tuned.
80  * A value other than 0 implies it was tuned, in which case the value
81  * is used to derive a setting for ptlrpcd_per_cpt_max.
82  */
83 static int max_ptlrpcds;
84 module_param(max_ptlrpcds, int, 0644);
85 MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started.");
86
87 /*
88  * ptlrpcd_bind_policy is obsolete, but retained to ensure that
89  * the kernel module will load on a system where it has been tuned.
90  * A value other than 0 implies it was tuned, in which case the value
91  * is used to derive a setting for ptlrpcd_partner_group_size.
92  */
93 static int ptlrpcd_bind_policy;
94 module_param(ptlrpcd_bind_policy, int, 0644);
95 MODULE_PARM_DESC(ptlrpcd_bind_policy,
96                  "Ptlrpcd threads binding mode (obsolete).");
97
98 /*
99  * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run
100  * in a CPT.
101  */
102 static int ptlrpcd_per_cpt_max;
103 module_param(ptlrpcd_per_cpt_max, int, 0644);
104 MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
105                  "Max ptlrpcd thread count to be started per cpt.");
106
107 /*
108  * ptlrpcd_partner_group_size: The desired number of threads in each
109  * ptlrpcd partner thread group. Default is 2, corresponding to the
110  * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in
111  * a CPT partners of each other.
112  */
113 static int ptlrpcd_partner_group_size;
114 module_param(ptlrpcd_partner_group_size, int, 0644);
115 MODULE_PARM_DESC(ptlrpcd_partner_group_size,
116                  "Number of ptlrpcd threads in a partner group.");
117
118 /*
119  * ptlrpcd_cpts: A CPT string describing the CPU partitions that
120  * ptlrpcd threads should run on. Used to make ptlrpcd threads run on
121  * a subset of all CPTs.
122  *
123  * ptlrpcd_cpts=2
124  * ptlrpcd_cpts=[2]
125  *   run ptlrpcd threads only on CPT 2.
126  *
127  * ptlrpcd_cpts=0-3
128  * ptlrpcd_cpts=[0-3]
129  *   run ptlrpcd threads on CPTs 0, 1, 2, and 3.
130  *
131  * ptlrpcd_cpts=[0-3,5,7]
132  *   run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7.
133  */
134 static char *ptlrpcd_cpts;
135 module_param(ptlrpcd_cpts, charp, 0644);
136 MODULE_PARM_DESC(ptlrpcd_cpts,
137                  "CPU partitions ptlrpcd threads should run in");
138
139 /* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */
140 static int              *ptlrpcds_cpt_idx;
141
142 /* ptlrpcds_num is the number of entries in the ptlrpcds array. */
143 static int              ptlrpcds_num;
144 static struct ptlrpcd   **ptlrpcds;
145
146 /*
147  * In addition to the regular thread pool above, there is a single
148  * global recovery thread. Recovery isn't critical for performance,
149  * and doesn't block, but must always be able to proceed, and it is
150  * possible that all normal ptlrpcd threads are blocked. Hence the
151  * need for a dedicated thread.
152  */
153 static struct ptlrpcd_ctl ptlrpcd_rcv;
154
155 struct mutex ptlrpcd_mutex;
156 static int ptlrpcd_users;
157
158 void ptlrpcd_wake(struct ptlrpc_request *req)
159 {
160         struct ptlrpc_request_set *set = req->rq_set;
161
162         wake_up(&set->set_waitq);
163 }
164 EXPORT_SYMBOL(ptlrpcd_wake);
165
166 static struct ptlrpcd_ctl *
167 ptlrpcd_select_pc(struct ptlrpc_request *req)
168 {
169         struct ptlrpcd  *pd;
170         int             cpt;
171         int             idx;
172
173         if (req && req->rq_send_state != LUSTRE_IMP_FULL)
174                 return &ptlrpcd_rcv;
175
176         cpt = cfs_cpt_current(cfs_cpt_table, 1);
177         if (!ptlrpcds_cpt_idx)
178                 idx = cpt;
179         else
180                 idx = ptlrpcds_cpt_idx[cpt];
181         pd = ptlrpcds[idx];
182
183                 /* We do not care whether it is strict load balance. */
184         idx = pd->pd_cursor;
185         if (++idx == pd->pd_nthreads)
186                 idx = 0;
187         pd->pd_cursor = idx;
188
189         return &pd->pd_threads[idx];
190 }
191
192 /**
193  * Return transferred RPCs count.
194  */
195 static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
196                                struct ptlrpc_request_set *src)
197 {
198         struct list_head *tmp, *pos;
199         struct ptlrpc_request *req;
200         int rc = 0;
201
202         spin_lock(&src->set_new_req_lock);
203         if (likely(!list_empty(&src->set_new_requests))) {
204                 list_for_each_safe(pos, tmp, &src->set_new_requests) {
205                         req = list_entry(pos, struct ptlrpc_request,
206                                          rq_set_chain);
207                         req->rq_set = des;
208                 }
209                 list_splice_init(&src->set_new_requests, &des->set_requests);
210                 rc = atomic_read(&src->set_new_count);
211                 atomic_add(rc, &des->set_remaining);
212                 atomic_set(&src->set_new_count, 0);
213         }
214         spin_unlock(&src->set_new_req_lock);
215         return rc;
216 }
217
218 /**
219  * Requests that are added to the ptlrpcd queue are sent via
220  * ptlrpcd_check->ptlrpc_check_set().
221  */
222 void ptlrpcd_add_req(struct ptlrpc_request *req)
223 {
224         struct ptlrpcd_ctl *pc;
225
226         if (req->rq_reqmsg)
227                 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
228
229         spin_lock(&req->rq_lock);
230         if (req->rq_invalid_rqset) {
231                 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
232                                                      back_to_sleep, NULL);
233
234                 req->rq_invalid_rqset = 0;
235                 spin_unlock(&req->rq_lock);
236                 l_wait_event(req->rq_set_waitq, !req->rq_set, &lwi);
237         } else if (req->rq_set) {
238                 /* If we have a valid "rq_set", just reuse it to avoid double
239                  * linked.
240                  */
241                 LASSERT(req->rq_phase == RQ_PHASE_NEW);
242                 LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
243
244                 /* ptlrpc_check_set will decrease the count */
245                 atomic_inc(&req->rq_set->set_remaining);
246                 spin_unlock(&req->rq_lock);
247                 wake_up(&req->rq_set->set_waitq);
248                 return;
249         } else {
250                 spin_unlock(&req->rq_lock);
251         }
252
253         pc = ptlrpcd_select_pc(req);
254
255         DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
256                   req, pc->pc_name, pc->pc_index);
257
258         ptlrpc_set_add_new_req(pc, req);
259 }
260 EXPORT_SYMBOL(ptlrpcd_add_req);
261
262 static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
263 {
264         atomic_inc(&set->set_refcount);
265 }
266
267 /**
268  * Check if there is more work to do on ptlrpcd set.
269  * Returns 1 if yes.
270  */
271 static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
272 {
273         struct list_head *tmp, *pos;
274         struct ptlrpc_request *req;
275         struct ptlrpc_request_set *set = pc->pc_set;
276         int rc = 0;
277         int rc2;
278
279         if (atomic_read(&set->set_new_count)) {
280                 spin_lock(&set->set_new_req_lock);
281                 if (likely(!list_empty(&set->set_new_requests))) {
282                         list_splice_init(&set->set_new_requests,
283                                          &set->set_requests);
284                         atomic_add(atomic_read(&set->set_new_count),
285                                    &set->set_remaining);
286                         atomic_set(&set->set_new_count, 0);
287                         /*
288                          * Need to calculate its timeout.
289                          */
290                         rc = 1;
291                 }
292                 spin_unlock(&set->set_new_req_lock);
293         }
294
295         /* We should call lu_env_refill() before handling new requests to make
296          * sure that env key the requests depending on really exists.
297          */
298         rc2 = lu_env_refill(env);
299         if (rc2 != 0) {
300                 /*
301                  * XXX This is very awkward situation, because
302                  * execution can neither continue (request
303                  * interpreters assume that env is set up), nor repeat
304                  * the loop (as this potentially results in a tight
305                  * loop of -ENOMEM's).
306                  *
307                  * Fortunately, refill only ever does something when
308                  * new modules are loaded, i.e., early during boot up.
309                  */
310                 CERROR("Failure to refill session: %d\n", rc2);
311                 return rc;
312         }
313
314         if (atomic_read(&set->set_remaining))
315                 rc |= ptlrpc_check_set(env, set);
316
317         /* NB: ptlrpc_check_set has already moved completed request at the
318          * head of seq::set_requests
319          */
320         list_for_each_safe(pos, tmp, &set->set_requests) {
321                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
322                 if (req->rq_phase != RQ_PHASE_COMPLETE)
323                         break;
324
325                 list_del_init(&req->rq_set_chain);
326                 req->rq_set = NULL;
327                 ptlrpc_req_finished(req);
328         }
329
330         if (rc == 0) {
331                 /*
332                  * If new requests have been added, make sure to wake up.
333                  */
334                 rc = atomic_read(&set->set_new_count);
335
336                 /* If we have nothing to do, check whether we can take some
337                  * work from our partner threads.
338                  */
339                 if (rc == 0 && pc->pc_npartners > 0) {
340                         struct ptlrpcd_ctl *partner;
341                         struct ptlrpc_request_set *ps;
342                         int first = pc->pc_cursor;
343
344                         do {
345                                 partner = pc->pc_partners[pc->pc_cursor++];
346                                 if (pc->pc_cursor >= pc->pc_npartners)
347                                         pc->pc_cursor = 0;
348                                 if (!partner)
349                                         continue;
350
351                                 spin_lock(&partner->pc_lock);
352                                 ps = partner->pc_set;
353                                 if (!ps) {
354                                         spin_unlock(&partner->pc_lock);
355                                         continue;
356                                 }
357
358                                 ptlrpc_reqset_get(ps);
359                                 spin_unlock(&partner->pc_lock);
360
361                                 if (atomic_read(&ps->set_new_count)) {
362                                         rc = ptlrpcd_steal_rqset(set, ps);
363                                         if (rc > 0)
364                                                 CDEBUG(D_RPCTRACE, "transfer %d async RPCs [%d->%d]\n",
365                                                        rc, partner->pc_index,
366                                                        pc->pc_index);
367                                 }
368                                 ptlrpc_reqset_put(ps);
369                         } while (rc == 0 && pc->pc_cursor != first);
370                 }
371         }
372
373         return rc;
374 }
375
376 /**
377  * Main ptlrpcd thread.
378  * ptlrpc's code paths like to execute in process context, so we have this
379  * thread which spins on a set which contains the rpcs and sends them.
380  *
381  */
382 static int ptlrpcd(void *arg)
383 {
384         struct ptlrpcd_ctl *pc = arg;
385         struct ptlrpc_request_set *set;
386         struct lu_context ses = { 0 };
387         struct lu_env env = { .le_ses = &ses };
388         int rc = 0;
389         int exit = 0;
390
391         unshare_fs_struct();
392         if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
393                 CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
394
395         /*
396          * Allocate the request set after the thread has been bound
397          * above. This is safe because no requests will be queued
398          * until all ptlrpcd threads have confirmed that they have
399          * successfully started.
400          */
401         set = ptlrpc_prep_set();
402         if (!set) {
403                 rc = -ENOMEM;
404                 goto failed;
405         }
406         spin_lock(&pc->pc_lock);
407         pc->pc_set = set;
408         spin_unlock(&pc->pc_lock);
409         /*
410          * XXX So far only "client" ptlrpcd uses an environment. In
411          * the future, ptlrpcd thread (or a thread-set) has to given
412          * an argument, describing its "scope".
413          */
414         rc = lu_context_init(&env.le_ctx,
415                              LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
416         if (rc == 0) {
417                 rc = lu_context_init(env.le_ses,
418                                      LCT_SESSION | LCT_REMEMBER | LCT_NOREF);
419                 if (rc != 0)
420                         lu_context_fini(&env.le_ctx);
421         }
422
423         if (rc != 0)
424                 goto failed;
425
426         complete(&pc->pc_starting);
427
428         /*
429          * This mainloop strongly resembles ptlrpc_set_wait() except that our
430          * set never completes.  ptlrpcd_check() calls ptlrpc_check_set() when
431          * there are requests in the set. New requests come in on the set's
432          * new_req_list and ptlrpcd_check() moves them into the set.
433          */
434         do {
435                 struct l_wait_info lwi;
436                 int timeout;
437
438                 timeout = ptlrpc_set_next_timeout(set);
439                 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
440                                   ptlrpc_expired_set, set);
441
442                 lu_context_enter(&env.le_ctx);
443                 lu_context_enter(env.le_ses);
444                 l_wait_event(set->set_waitq, ptlrpcd_check(&env, pc), &lwi);
445                 lu_context_exit(&env.le_ctx);
446                 lu_context_exit(env.le_ses);
447
448                 /*
449                  * Abort inflight rpcs for forced stop case.
450                  */
451                 if (test_bit(LIOD_STOP, &pc->pc_flags)) {
452                         if (test_bit(LIOD_FORCE, &pc->pc_flags))
453                                 ptlrpc_abort_set(set);
454                         exit++;
455                 }
456
457                 /*
458                  * Let's make one more loop to make sure that ptlrpcd_check()
459                  * copied all raced new rpcs into the set so we can kill them.
460                  */
461         } while (exit < 2);
462
463         /*
464          * Wait for inflight requests to drain.
465          */
466         if (!list_empty(&set->set_requests))
467                 ptlrpc_set_wait(set);
468         lu_context_fini(&env.le_ctx);
469         lu_context_fini(env.le_ses);
470
471         complete(&pc->pc_finishing);
472
473         return 0;
474 failed:
475         pc->pc_error = rc;
476         complete(&pc->pc_starting);
477         return rc;
478 }
479
480 static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
481 {
482         pc->pc_index = index;
483         pc->pc_cpt = cpt;
484         init_completion(&pc->pc_starting);
485         init_completion(&pc->pc_finishing);
486         spin_lock_init(&pc->pc_lock);
487
488         if (index < 0) {
489                 /* Recovery thread. */
490                 snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
491         } else {
492                 /* Regular thread. */
493                 snprintf(pc->pc_name, sizeof(pc->pc_name),
494                          "ptlrpcd_%02d_%02d", cpt, index);
495         }
496 }
497
498 /* XXX: We want multiple CPU cores to share the async RPC load. So we
499  *      start many ptlrpcd threads. We also want to reduce the ptlrpcd
500  *      overhead caused by data transfer cross-CPU cores. So we bind
501  *      all ptlrpcd threads to a CPT, in the expectation that CPTs
502  *      will be defined in a way that matches these boundaries. Within
503  *      a CPT a ptlrpcd thread can be scheduled on any available core.
504  *
505  *      Each ptlrpcd thread has its own request queue. This can cause
506  *      response delay if the thread is already busy. To help with
507  *      this we define partner threads: these are other threads bound
508  *      to the same CPT which will check for work in each other's
509  *      request queues if they have no work to do.
510  *
511  *      The desired number of partner threads can be tuned by setting
512  *      ptlrpcd_partner_group_size. The default is to create pairs of
513  *      partner threads.
514  */
515 static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
516 {
517         struct ptlrpcd_ctl *pc;
518         struct ptlrpcd_ctl **ppc;
519         int first;
520         int i;
521         int rc = 0;
522         int size;
523
524         LASSERT(index >= 0 && index < pd->pd_nthreads);
525         pc = &pd->pd_threads[index];
526         pc->pc_npartners = pd->pd_groupsize - 1;
527
528         if (pc->pc_npartners <= 0)
529                 goto out;
530
531         size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners;
532         pc->pc_partners = kzalloc_node(size, GFP_NOFS,
533                                        cfs_cpt_spread_node(cfs_cpt_table,
534                                                            pc->pc_cpt));
535         if (!pc->pc_partners) {
536                 pc->pc_npartners = 0;
537                 rc = -ENOMEM;
538                 goto out;
539         }
540
541         first = index - index % pd->pd_groupsize;
542         ppc = pc->pc_partners;
543         for (i = first; i < first + pd->pd_groupsize; i++) {
544                 if (i != index)
545                         *ppc++ = &pd->pd_threads[i];
546         }
547 out:
548         return rc;
549 }
550
551 int ptlrpcd_start(struct ptlrpcd_ctl *pc)
552 {
553         struct task_struct *task;
554         int rc = 0;
555
556         /*
557          * Do not allow start second thread for one pc.
558          */
559         if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
560                 CWARN("Starting second thread (%s) for same pc %p\n",
561                       pc->pc_name, pc);
562                 return 0;
563         }
564
565         /*
566          * So far only "client" ptlrpcd uses an environment. In the future,
567          * ptlrpcd thread (or a thread-set) has to be given an argument,
568          * describing its "scope".
569          */
570         rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
571         if (rc != 0)
572                 goto out;
573
574         task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
575         if (IS_ERR(task)) {
576                 rc = PTR_ERR(task);
577                 goto out_set;
578         }
579
580         wait_for_completion(&pc->pc_starting);
581         rc = pc->pc_error;
582         if (rc != 0)
583                 goto out_set;
584
585         return 0;
586
587 out_set:
588         if (pc->pc_set) {
589                 struct ptlrpc_request_set *set = pc->pc_set;
590
591                 spin_lock(&pc->pc_lock);
592                 pc->pc_set = NULL;
593                 spin_unlock(&pc->pc_lock);
594                 ptlrpc_set_destroy(set);
595         }
596         lu_context_fini(&pc->pc_env.le_ctx);
597
598 out:
599         clear_bit(LIOD_START, &pc->pc_flags);
600         return rc;
601 }
602
603 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
604 {
605         if (!test_bit(LIOD_START, &pc->pc_flags)) {
606                 CWARN("Thread for pc %p was not started\n", pc);
607                 return;
608         }
609
610         set_bit(LIOD_STOP, &pc->pc_flags);
611         if (force)
612                 set_bit(LIOD_FORCE, &pc->pc_flags);
613         wake_up(&pc->pc_set->set_waitq);
614 }
615
616 void ptlrpcd_free(struct ptlrpcd_ctl *pc)
617 {
618         struct ptlrpc_request_set *set = pc->pc_set;
619
620         if (!test_bit(LIOD_START, &pc->pc_flags)) {
621                 CWARN("Thread for pc %p was not started\n", pc);
622                 goto out;
623         }
624
625         wait_for_completion(&pc->pc_finishing);
626         lu_context_fini(&pc->pc_env.le_ctx);
627
628         spin_lock(&pc->pc_lock);
629         pc->pc_set = NULL;
630         spin_unlock(&pc->pc_lock);
631         ptlrpc_set_destroy(set);
632
633         clear_bit(LIOD_START, &pc->pc_flags);
634         clear_bit(LIOD_STOP, &pc->pc_flags);
635         clear_bit(LIOD_FORCE, &pc->pc_flags);
636
637 out:
638         if (pc->pc_npartners > 0) {
639                 LASSERT(pc->pc_partners);
640
641                 kfree(pc->pc_partners);
642                 pc->pc_partners = NULL;
643         }
644         pc->pc_npartners = 0;
645         pc->pc_error = 0;
646 }
647
648 static void ptlrpcd_fini(void)
649 {
650         int i;
651         int j;
652
653         if (ptlrpcds) {
654                 for (i = 0; i < ptlrpcds_num; i++) {
655                         if (!ptlrpcds[i])
656                                 break;
657                         for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
658                                 ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
659                         for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
660                                 ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
661                         kfree(ptlrpcds[i]);
662                         ptlrpcds[i] = NULL;
663                 }
664                 kfree(ptlrpcds);
665         }
666         ptlrpcds_num = 0;
667
668         ptlrpcd_stop(&ptlrpcd_rcv, 0);
669         ptlrpcd_free(&ptlrpcd_rcv);
670
671         kfree(ptlrpcds_cpt_idx);
672         ptlrpcds_cpt_idx = NULL;
673 }
674
675 static int ptlrpcd_init(void)
676 {
677         int nthreads;
678         int groupsize;
679         int size;
680         int i;
681         int j;
682         int rc = 0;
683         struct cfs_cpt_table *cptable;
684         __u32 *cpts = NULL;
685         int ncpts;
686         int cpt;
687         struct ptlrpcd *pd;
688
689         /*
690          * Determine the CPTs that ptlrpcd threads will run on.
691          */
692         cptable = cfs_cpt_table;
693         ncpts = cfs_cpt_number(cptable);
694         if (ptlrpcd_cpts) {
695                 struct cfs_expr_list *el;
696
697                 size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
698                 ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL);
699                 if (!ptlrpcds_cpt_idx) {
700                         rc = -ENOMEM;
701                         goto out;
702                 }
703
704                 rc = cfs_expr_list_parse(ptlrpcd_cpts,
705                                          strlen(ptlrpcd_cpts),
706                                          0, ncpts - 1, &el);
707
708                 if (rc != 0) {
709                         CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s",
710                                ptlrpcd_cpts);
711                         rc = -EINVAL;
712                         goto out;
713                 }
714
715                 rc = cfs_expr_list_values(el, ncpts, &cpts);
716                 cfs_expr_list_free(el);
717                 if (rc <= 0) {
718                         CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n",
719                                ptlrpcd_cpts, rc);
720                         if (rc == 0)
721                                 rc = -EINVAL;
722                         goto out;
723                 }
724
725                 /*
726                  * Create the cpt-to-index map. When there is no match
727                  * in the cpt table, pick a cpt at random. This could
728                  * be changed to take the topology of the system into
729                  * account.
730                  */
731                 for (cpt = 0; cpt < ncpts; cpt++) {
732                         for (i = 0; i < rc; i++)
733                                 if (cpts[i] == cpt)
734                                         break;
735                         if (i >= rc)
736                                 i = cpt % rc;
737                         ptlrpcds_cpt_idx[cpt] = i;
738                 }
739
740                 cfs_expr_list_values_free(cpts, rc);
741                 ncpts = rc;
742         }
743         ptlrpcds_num = ncpts;
744
745         size = ncpts * sizeof(ptlrpcds[0]);
746         ptlrpcds = kzalloc(size, GFP_KERNEL);
747         if (!ptlrpcds) {
748                 rc = -ENOMEM;
749                 goto out;
750         }
751
752         /*
753          * The max_ptlrpcds parameter is obsolete, but do something
754          * sane if it has been tuned, and complain if
755          * ptlrpcd_per_cpt_max has also been tuned.
756          */
757         if (max_ptlrpcds != 0) {
758                 CWARN("max_ptlrpcds is obsolete.\n");
759                 if (ptlrpcd_per_cpt_max == 0) {
760                         ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
761                         /* Round up if there is a remainder. */
762                         if (max_ptlrpcds % ncpts != 0)
763                                 ptlrpcd_per_cpt_max++;
764                         CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
765                               ptlrpcd_per_cpt_max);
766                 } else {
767                         CWARN("ptlrpd_per_cpt_max is also set!\n");
768                 }
769         }
770
771         /*
772          * The ptlrpcd_bind_policy parameter is obsolete, but do
773          * something sane if it has been tuned, and complain if
774          * ptlrpcd_partner_group_size is also tuned.
775          */
776         if (ptlrpcd_bind_policy != 0) {
777                 CWARN("ptlrpcd_bind_policy is obsolete.\n");
778                 if (ptlrpcd_partner_group_size == 0) {
779                         switch (ptlrpcd_bind_policy) {
780                         case 1: /* PDB_POLICY_NONE */
781                         case 2: /* PDB_POLICY_FULL */
782                                 ptlrpcd_partner_group_size = 1;
783                                 break;
784                         case 3: /* PDB_POLICY_PAIR */
785                                 ptlrpcd_partner_group_size = 2;
786                                 break;
787                         case 4: /* PDB_POLICY_NEIGHBOR */
788 #ifdef CONFIG_NUMA
789                                 ptlrpcd_partner_group_size = -1; /* CPT */
790 #else
791                                 ptlrpcd_partner_group_size = 3; /* Triplets */
792 #endif
793                                 break;
794                         default: /* Illegal value, use the default. */
795                                 ptlrpcd_partner_group_size = 2;
796                                 break;
797                         }
798                         CWARN("Setting ptlrpcd_partner_group_size = %d\n",
799                               ptlrpcd_partner_group_size);
800                 } else {
801                         CWARN("ptlrpcd_partner_group_size is also set!\n");
802                 }
803         }
804
805         if (ptlrpcd_partner_group_size == 0)
806                 ptlrpcd_partner_group_size = 2;
807         else if (ptlrpcd_partner_group_size < 0)
808                 ptlrpcd_partner_group_size = -1;
809         else if (ptlrpcd_per_cpt_max > 0 &&
810                  ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
811                 ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
812
813         /*
814          * Start the recovery thread first.
815          */
816         set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
817         ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
818         rc = ptlrpcd_start(&ptlrpcd_rcv);
819         if (rc < 0)
820                 goto out;
821
822         for (i = 0; i < ncpts; i++) {
823                 if (!cpts)
824                         cpt = i;
825                 else
826                         cpt = cpts[i];
827
828                 nthreads = cfs_cpt_weight(cptable, cpt);
829                 if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
830                         nthreads = ptlrpcd_per_cpt_max;
831                 if (nthreads < 2)
832                         nthreads = 2;
833
834                 if (ptlrpcd_partner_group_size <= 0) {
835                         groupsize = nthreads;
836                 } else if (nthreads <= ptlrpcd_partner_group_size) {
837                         groupsize = nthreads;
838                 } else {
839                         groupsize = ptlrpcd_partner_group_size;
840                         if (nthreads % groupsize != 0)
841                                 nthreads += groupsize - (nthreads % groupsize);
842                 }
843
844                 size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
845                 pd = kzalloc_node(size, GFP_NOFS,
846                                   cfs_cpt_spread_node(cfs_cpt_table, cpt));
847                 if (!pd) {
848                         rc = -ENOMEM;
849                         goto out;
850                 }
851                 pd->pd_size = size;
852                 pd->pd_index = i;
853                 pd->pd_cpt = cpt;
854                 pd->pd_cursor = 0;
855                 pd->pd_nthreads = nthreads;
856                 pd->pd_groupsize = groupsize;
857                 ptlrpcds[i] = pd;
858
859                 /*
860                  * The ptlrpcd threads in a partner group can access
861                  * each other's struct ptlrpcd_ctl, so these must be
862                  * initialized before any thread is started.
863                  */
864                 for (j = 0; j < nthreads; j++) {
865                         ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
866                         rc = ptlrpcd_partners(pd, j);
867                         if (rc < 0)
868                                 goto out;
869                 }
870
871                 /* XXX: We start nthreads ptlrpc daemons.
872                  *      Each of them can process any non-recovery
873                  *      async RPC to improve overall async RPC
874                  *      efficiency.
875                  *
876                  *      But there are some issues with async I/O RPCs
877                  *      and async non-I/O RPCs processed in the same
878                  *      set under some cases. The ptlrpcd may be
879                  *      blocked by some async I/O RPC(s), then will
880                  *      cause other async non-I/O RPC(s) can not be
881                  *      processed in time.
882                  *
883                  *      Maybe we should distinguish blocked async RPCs
884                  *      from non-blocked async RPCs, and process them
885                  *      in different ptlrpcd sets to avoid unnecessary
886                  *      dependency. But how to distribute async RPCs
887                  *      load among all the ptlrpc daemons becomes
888                  *      another trouble.
889                  */
890                 for (j = 0; j < nthreads; j++) {
891                         rc = ptlrpcd_start(&pd->pd_threads[j]);
892                         if (rc < 0)
893                                 goto out;
894                 }
895         }
896 out:
897         if (rc != 0)
898                 ptlrpcd_fini();
899
900         return rc;
901 }
902
903 int ptlrpcd_addref(void)
904 {
905         int rc = 0;
906
907         mutex_lock(&ptlrpcd_mutex);
908         if (++ptlrpcd_users == 1) {
909                 rc = ptlrpcd_init();
910                 if (rc < 0)
911                         ptlrpcd_users--;
912         }
913         mutex_unlock(&ptlrpcd_mutex);
914         return rc;
915 }
916 EXPORT_SYMBOL(ptlrpcd_addref);
917
918 void ptlrpcd_decref(void)
919 {
920         mutex_lock(&ptlrpcd_mutex);
921         if (--ptlrpcd_users == 0)
922                 ptlrpcd_fini();
923         mutex_unlock(&ptlrpcd_mutex);
924 }
925 EXPORT_SYMBOL(ptlrpcd_decref);
926 /** @} ptlrpcd */