4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
19 * Copyright (c) 2011 Intel Corporation
21 * Copyright 2012 Xyratex Technology Limited
26 * Network Request Scheduler (NRS)
28 * Allows to reorder the handling of RPCs at servers.
30 * Author: Liang Zhen <liang@whamcloud.com>
31 * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
38 #define DEBUG_SUBSYSTEM S_RPC
39 #include "../include/obd_support.h"
40 #include "../include/obd_class.h"
41 #include "../include/lustre_net.h"
42 #include "../include/lprocfs_status.h"
43 #include "../../include/linux/libcfs/libcfs.h"
44 #include "ptlrpc_internal.h"
49 struct nrs_core nrs_core;
51 static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
53 return policy->pol_desc->pd_ops->op_policy_init ?
54 policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
57 static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
59 LASSERT(policy->pol_ref == 0);
60 LASSERT(policy->pol_req_queued == 0);
62 if (policy->pol_desc->pd_ops->op_policy_fini)
63 policy->pol_desc->pd_ops->op_policy_fini(policy);
66 static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
67 enum ptlrpc_nrs_ctl opc, void *arg)
70 * The policy may be stopped, but the lprocfs files and
71 * ptlrpc_nrs_policy instances remain present until unregistration time.
72 * Do not perform the ctl operation if the policy is stopped, as
73 * policy->pol_private will be NULL in such a case.
75 if (policy->pol_state == NRS_POL_STATE_STOPPED)
78 return policy->pol_desc->pd_ops->op_policy_ctl ?
79 policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
83 static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
85 struct ptlrpc_nrs *nrs = policy->pol_nrs;
87 if (policy->pol_desc->pd_ops->op_policy_stop) {
88 spin_unlock(&nrs->nrs_lock);
90 policy->pol_desc->pd_ops->op_policy_stop(policy);
92 spin_lock(&nrs->nrs_lock);
95 LASSERT(list_empty(&policy->pol_list_queued));
96 LASSERT(policy->pol_req_queued == 0 &&
97 policy->pol_req_started == 0);
99 policy->pol_private = NULL;
101 policy->pol_state = NRS_POL_STATE_STOPPED;
103 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
104 module_put(policy->pol_desc->pd_owner);
107 static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
109 struct ptlrpc_nrs *nrs = policy->pol_nrs;
111 if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
114 if (policy->pol_state == NRS_POL_STATE_STARTING)
117 /* In progress or already stopped */
118 if (policy->pol_state != NRS_POL_STATE_STARTED)
121 policy->pol_state = NRS_POL_STATE_STOPPING;
123 /* Immediately make it invisible */
124 if (nrs->nrs_policy_primary == policy) {
125 nrs->nrs_policy_primary = NULL;
128 LASSERT(nrs->nrs_policy_fallback == policy);
129 nrs->nrs_policy_fallback = NULL;
132 /* I have the only refcount */
133 if (policy->pol_ref == 1)
134 nrs_policy_stop0(policy);
140 * Transitions the \a nrs NRS head's primary policy to
141 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
142 * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
144 * \param[in] nrs the NRS head to carry out this operation on
146 static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
148 struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
153 nrs->nrs_policy_primary = NULL;
155 LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
156 tmp->pol_state = NRS_POL_STATE_STOPPING;
158 if (tmp->pol_ref == 0)
159 nrs_policy_stop0(tmp);
163 * Transitions a policy across the ptlrpc_nrs_pol_state range of values, in
164 * response to an lprocfs command to start a policy.
166 * If a primary policy different to the current one is specified, this function
167 * will transition the new policy to the
168 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to
169 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition
170 * the old primary policy (if there is one) to
171 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
172 * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED.
174 * If the fallback policy is specified, this is taken to indicate an instruction
175 * to stop the current primary policy, without substituting it with another
176 * primary policy, so the primary policy (if any) is transitioned to
177 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
178 * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
179 * this case, the fallback policy is only left active in the NRS head.
181 static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
183 struct ptlrpc_nrs *nrs = policy->pol_nrs;
187 * Don't allow multiple starting which is too complex, and has no real
190 if (nrs->nrs_policy_starting)
193 LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
195 if (policy->pol_state == NRS_POL_STATE_STOPPING)
198 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
200 * This is for cases in which the user sets the policy to the
201 * fallback policy (currently fifo for all services); i.e. the
202 * user is resetting the policy to the default; so we stop the
203 * primary policy, if any.
205 if (policy == nrs->nrs_policy_fallback) {
206 nrs_policy_stop_primary(nrs);
211 * If we reach here, we must be setting up the fallback policy
212 * at service startup time, and only a single policy with the
213 * nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can
214 * register with NRS core.
216 LASSERT(!nrs->nrs_policy_fallback);
219 * Shouldn't start primary policy if w/o fallback policy.
221 if (!nrs->nrs_policy_fallback)
224 if (policy->pol_state == NRS_POL_STATE_STARTED)
229 * Increase the module usage count for policies registering from other
232 if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
233 !try_module_get(policy->pol_desc->pd_owner)) {
234 atomic_dec(&policy->pol_desc->pd_refs);
235 CERROR("NRS: cannot get module for policy %s; is it alive?\n",
236 policy->pol_desc->pd_name);
241 * Serialize policy starting across the NRS head
243 nrs->nrs_policy_starting = 1;
245 policy->pol_state = NRS_POL_STATE_STARTING;
247 if (policy->pol_desc->pd_ops->op_policy_start) {
248 spin_unlock(&nrs->nrs_lock);
250 rc = policy->pol_desc->pd_ops->op_policy_start(policy);
252 spin_lock(&nrs->nrs_lock);
254 if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
255 module_put(policy->pol_desc->pd_owner);
257 policy->pol_state = NRS_POL_STATE_STOPPED;
262 policy->pol_state = NRS_POL_STATE_STARTED;
264 if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
266 * This path is only used at PTLRPC service setup time.
268 nrs->nrs_policy_fallback = policy;
271 * Try to stop the current primary policy if there is one.
273 nrs_policy_stop_primary(nrs);
276 * And set the newly-started policy as the primary one.
278 nrs->nrs_policy_primary = policy;
282 nrs->nrs_policy_starting = 0;
288 * Increases the policy's usage reference count.
290 static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
296 * Decreases the policy's usage reference count, and stops the policy in case it
297 * was already stopping and have no more outstanding usage references (which
298 * indicates it has no more queued or started requests, and can be safely
301 static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
303 LASSERT(policy->pol_ref > 0);
306 if (unlikely(policy->pol_ref == 0 &&
307 policy->pol_state == NRS_POL_STATE_STOPPING))
308 nrs_policy_stop0(policy);
311 static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
313 spin_lock(&policy->pol_nrs->nrs_lock);
314 nrs_policy_put_locked(policy);
315 spin_unlock(&policy->pol_nrs->nrs_lock);
319 * Find and return a policy by name.
321 static struct ptlrpc_nrs_policy *nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
324 struct ptlrpc_nrs_policy *tmp;
326 list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
327 if (strncmp(tmp->pol_desc->pd_name, name,
328 NRS_POL_NAME_MAX) == 0) {
329 nrs_policy_get_locked(tmp);
337 * Release references for the resource hierarchy moving upwards towards the
338 * policy instance resource.
340 static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
342 struct ptlrpc_nrs_policy *policy = res->res_policy;
344 if (policy->pol_desc->pd_ops->op_res_put) {
345 struct ptlrpc_nrs_resource *parent;
347 for (; res; res = parent) {
348 parent = res->res_parent;
349 policy->pol_desc->pd_ops->op_res_put(policy, res);
355 * Obtains references for each resource in the resource hierarchy for request
356 * \a nrq if it is to be handled by \a policy.
358 * \param[in] policy the policy
359 * \param[in] nrq the request
360 * \param[in] moving_req denotes whether this is a call to the function by
361 * ldlm_lock_reorder_req(), in order to move \a nrq to
362 * the high-priority NRS head; we should not sleep when
365 * \retval NULL resource hierarchy references not obtained
366 * \retval valid-pointer the bottom level of the resource hierarchy
368 * \see ptlrpc_nrs_pol_ops::op_res_get()
371 struct ptlrpc_nrs_resource *nrs_resource_get(struct ptlrpc_nrs_policy *policy,
372 struct ptlrpc_nrs_request *nrq,
376 * Set to NULL to traverse the resource hierarchy from the top.
378 struct ptlrpc_nrs_resource *res = NULL;
379 struct ptlrpc_nrs_resource *tmp = NULL;
383 rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
387 nrs_resource_put(res);
391 tmp->res_parent = res;
392 tmp->res_policy = policy;
396 * Return once we have obtained a reference to the bottom level
397 * of the resource hierarchy.
405 * Obtains resources for the resource hierarchies and policy references for
406 * the fallback and current primary policy (if any), that will later be used
407 * to handle request \a nrq.
409 * \param[in] nrs the NRS head instance that will be handling request \a nrq.
410 * \param[in] nrq the request that is being handled.
411 * \param[out] resp the array where references to the resource hierarchy are
413 * \param[in] moving_req is set when obtaining resources while moving a
414 * request from a policy on the regular NRS head to a
415 * policy on the HP NRS head (via
416 * ldlm_lock_reorder_req()). It signifies that
417 * allocations to get resources should be atomic; for
418 * a full explanation, see comment in
419 * ptlrpc_nrs_pol_ops::op_res_get().
421 static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
422 struct ptlrpc_nrs_request *nrq,
423 struct ptlrpc_nrs_resource **resp,
426 struct ptlrpc_nrs_policy *primary = NULL;
427 struct ptlrpc_nrs_policy *fallback = NULL;
429 memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
432 * Obtain policy references.
434 spin_lock(&nrs->nrs_lock);
436 fallback = nrs->nrs_policy_fallback;
437 nrs_policy_get_locked(fallback);
439 primary = nrs->nrs_policy_primary;
441 nrs_policy_get_locked(primary);
443 spin_unlock(&nrs->nrs_lock);
446 * Obtain resource hierarchy references.
448 resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
449 LASSERT(resp[NRS_RES_FALLBACK]);
452 resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
455 * A primary policy may exist which may not wish to serve a
456 * particular request for different reasons; release the
457 * reference on the policy as it will not be used for this
460 if (!resp[NRS_RES_PRIMARY])
461 nrs_policy_put(primary);
466 * Releases references to resource hierarchies and policies, because they are no
467 * longer required; used when request handling has been completed, or the
468 * request is moving to the high priority NRS head.
470 * \param resp the resource hierarchy that is being released
472 * \see ptlrpc_nrs_req_finalize()
474 static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
476 struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
479 for (i = 0; i < NRS_RES_MAX; i++) {
481 pols[i] = resp[i]->res_policy;
482 nrs_resource_put(resp[i]);
489 for (i = 0; i < NRS_RES_MAX; i++) {
491 nrs_policy_put(pols[i]);
496 * Obtains an NRS request from \a policy for handling or examination; the
497 * request should be removed in the 'handling' case.
499 * Calling into this function implies we already know the policy has a request
500 * waiting to be handled.
502 * \param[in] policy the policy from which a request
503 * \param[in] peek when set, signifies that we just want to examine the
504 * request, and not handle it, so the request is not removed
506 * \param[in] force when set, it will force a policy to return a request if it
509 * \retval the NRS request to be handled
512 struct ptlrpc_nrs_request *nrs_request_get(struct ptlrpc_nrs_policy *policy,
513 bool peek, bool force)
515 struct ptlrpc_nrs_request *nrq;
517 LASSERT(policy->pol_req_queued > 0);
519 nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
521 LASSERT(ergo(nrq, nrs_request_policy(nrq) == policy));
527 * Enqueues request \a nrq for later handling, via one one the policies for
528 * which resources where earlier obtained via nrs_resource_get_safe(). The
529 * function attempts to enqueue the request first on the primary policy
530 * (if any), since this is the preferred choice.
532 * \param nrq the request being enqueued
534 * \see nrs_resource_get_safe()
536 static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
538 struct ptlrpc_nrs_policy *policy;
543 * Try in descending order, because the primary policy (if any) is
544 * the preferred choice.
546 for (i = NRS_RES_MAX - 1; i >= 0; i--) {
547 if (!nrq->nr_res_ptrs[i])
551 policy = nrq->nr_res_ptrs[i]->res_policy;
553 rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
555 policy->pol_nrs->nrs_req_queued++;
556 policy->pol_req_queued++;
561 * Should never get here, as at least the primary policy's
562 * ptlrpc_nrs_pol_ops::op_req_enqueue() implementation should always
569 * Called when a request has been handled
571 * \param[in] nrs the request that has been handled; can be used for
572 * job/resource control.
574 * \see ptlrpc_nrs_req_stop_nolock()
576 static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
578 struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
580 if (policy->pol_desc->pd_ops->op_req_stop)
581 policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
583 LASSERT(policy->pol_nrs->nrs_req_started > 0);
584 LASSERT(policy->pol_req_started > 0);
586 policy->pol_nrs->nrs_req_started--;
587 policy->pol_req_started--;
591 * Handler for operations that can be carried out on policies.
593 * Handles opcodes that are common to all policy types within NRS core, and
594 * passes any unknown opcodes to the policy-specific control function.
596 * \param[in] nrs the NRS head this policy belongs to.
597 * \param[in] name the human-readable policy name; should be the same as
598 * ptlrpc_nrs_pol_desc::pd_name.
599 * \param[in] opc the opcode of the operation being carried out.
600 * \param[in,out] arg can be used to pass information in and out between when
601 * carrying an operation; usually data that is private to
602 * the policy at some level, or generic policy status
605 * \retval -ve error condition
606 * \retval 0 operation was carried out successfully
608 static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
609 enum ptlrpc_nrs_ctl opc, void *arg)
611 struct ptlrpc_nrs_policy *policy;
614 spin_lock(&nrs->nrs_lock);
616 policy = nrs_policy_find_locked(nrs, name);
624 * Unknown opcode, pass it down to the policy-specific control
625 * function for handling.
628 rc = nrs_policy_ctl_locked(policy, opc, arg);
634 case PTLRPC_NRS_CTL_START:
635 rc = nrs_policy_start_locked(policy);
640 nrs_policy_put_locked(policy);
642 spin_unlock(&nrs->nrs_lock);
648 * Unregisters a policy by name.
650 * \param[in] nrs the NRS head this policy belongs to.
651 * \param[in] name the human-readable policy name; should be the same as
652 * ptlrpc_nrs_pol_desc::pd_name
657 static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
659 struct ptlrpc_nrs_policy *policy = NULL;
661 spin_lock(&nrs->nrs_lock);
663 policy = nrs_policy_find_locked(nrs, name);
665 spin_unlock(&nrs->nrs_lock);
667 CERROR("Can't find NRS policy %s\n", name);
671 if (policy->pol_ref > 1) {
672 CERROR("Policy %s is busy with %d references\n", name,
673 (int)policy->pol_ref);
674 nrs_policy_put_locked(policy);
676 spin_unlock(&nrs->nrs_lock);
680 LASSERT(policy->pol_req_queued == 0);
681 LASSERT(policy->pol_req_started == 0);
683 if (policy->pol_state != NRS_POL_STATE_STOPPED) {
684 nrs_policy_stop_locked(policy);
685 LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
688 list_del(&policy->pol_list);
691 nrs_policy_put_locked(policy);
693 spin_unlock(&nrs->nrs_lock);
695 nrs_policy_fini(policy);
697 LASSERT(!policy->pol_private);
704 * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
706 * \param[in] nrs the NRS head on which the policy will be registered.
707 * \param[in] desc the policy descriptor from which the information will be
708 * obtained to register the policy.
713 static int nrs_policy_register(struct ptlrpc_nrs *nrs,
714 struct ptlrpc_nrs_pol_desc *desc)
716 struct ptlrpc_nrs_policy *policy;
717 struct ptlrpc_nrs_policy *tmp;
718 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
721 LASSERT(desc->pd_ops->op_res_get);
722 LASSERT(desc->pd_ops->op_req_get);
723 LASSERT(desc->pd_ops->op_req_enqueue);
724 LASSERT(desc->pd_ops->op_req_dequeue);
725 LASSERT(desc->pd_compat);
727 policy = kzalloc_node(sizeof(*policy), GFP_NOFS,
728 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
733 policy->pol_nrs = nrs;
734 policy->pol_desc = desc;
735 policy->pol_state = NRS_POL_STATE_STOPPED;
736 policy->pol_flags = desc->pd_flags;
738 INIT_LIST_HEAD(&policy->pol_list);
739 INIT_LIST_HEAD(&policy->pol_list_queued);
741 rc = nrs_policy_init(policy);
747 spin_lock(&nrs->nrs_lock);
749 tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
751 CERROR("NRS policy %s has been registered, can't register it for %s\n",
752 policy->pol_desc->pd_name,
753 svcpt->scp_service->srv_name);
754 nrs_policy_put_locked(tmp);
756 spin_unlock(&nrs->nrs_lock);
757 nrs_policy_fini(policy);
763 list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
766 if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
767 rc = nrs_policy_start_locked(policy);
769 spin_unlock(&nrs->nrs_lock);
772 (void) nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
778 * Enqueue request \a req using one of the policies its resources are referring
781 * \param[in] req the request to enqueue.
783 static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
785 struct ptlrpc_nrs_policy *policy;
787 LASSERT(req->rq_nrq.nr_initialized);
788 LASSERT(!req->rq_nrq.nr_enqueued);
790 nrs_request_enqueue(&req->rq_nrq);
791 req->rq_nrq.nr_enqueued = 1;
793 policy = nrs_request_policy(&req->rq_nrq);
795 * Add the policy to the NRS head's list of policies with enqueued
796 * requests, if it has not been added there.
798 if (unlikely(list_empty(&policy->pol_list_queued)))
799 list_add_tail(&policy->pol_list_queued,
800 &policy->pol_nrs->nrs_policy_queued);
804 * Enqueue a request on the high priority NRS head.
806 * \param req the request to enqueue.
808 static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
810 int opc = lustre_msg_get_opc(req->rq_reqmsg);
812 spin_lock(&req->rq_lock);
814 ptlrpc_nrs_req_add_nolock(req);
816 DEBUG_REQ(D_NET, req, "high priority req");
817 spin_unlock(&req->rq_lock);
821 * Returns a boolean predicate indicating whether the policy described by
822 * \a desc is adequate for use with service \a svc.
824 * \param[in] svc the service
825 * \param[in] desc the policy descriptor
827 * \retval false the policy is not compatible with the service
828 * \retval true the policy is compatible with the service
830 static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
831 const struct ptlrpc_nrs_pol_desc *desc)
833 return desc->pd_compat(svc, desc);
837 * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
840 * \param[in] nrs the NRS head
845 * \pre mutex_is_locked(&nrs_core.nrs_mutex)
847 * \see ptlrpc_service_nrs_setup()
849 static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
851 struct ptlrpc_nrs_pol_desc *desc;
852 /* for convenience */
853 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
854 struct ptlrpc_service *svc = svcpt->scp_service;
857 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
859 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
860 if (nrs_policy_compatible(svc, desc)) {
861 rc = nrs_policy_register(nrs, desc);
863 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
864 desc->pd_name, svcpt->scp_cpt,
867 * Fail registration if any of the policies'
868 * registration fails.
879 * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
880 * compatible policies in NRS core, with the NRS head.
882 * \param[in] nrs the NRS head
883 * \param[in] svcpt the PTLRPC service partition to setup
888 * \pre mutex_is_locked(&nrs_core.nrs_mutex)
890 static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
891 struct ptlrpc_service_part *svcpt)
893 enum ptlrpc_nrs_queue_type queue;
895 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
897 if (nrs == &svcpt->scp_nrs_reg)
898 queue = PTLRPC_NRS_QUEUE_REG;
899 else if (nrs == svcpt->scp_nrs_hp)
900 queue = PTLRPC_NRS_QUEUE_HP;
904 nrs->nrs_svcpt = svcpt;
905 nrs->nrs_queue_type = queue;
906 spin_lock_init(&nrs->nrs_lock);
907 INIT_LIST_HEAD(&nrs->nrs_policy_list);
908 INIT_LIST_HEAD(&nrs->nrs_policy_queued);
910 return nrs_register_policies_locked(nrs);
914 * Allocates a regular and optionally a high-priority NRS head (if the service
915 * handles high-priority RPCs), and then registers all available compatible
916 * policies on those NRS heads.
918 * \param[in,out] svcpt the PTLRPC service partition to setup
920 * \pre mutex_is_locked(&nrs_core.nrs_mutex)
922 static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
924 struct ptlrpc_nrs *nrs;
927 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
930 * Initialize the regular NRS head.
932 nrs = nrs_svcpt2nrs(svcpt, false);
933 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
938 * Optionally allocate a high-priority NRS head.
940 if (!svcpt->scp_service->srv_ops.so_hpreq_handler)
944 kzalloc_node(sizeof(*svcpt->scp_nrs_hp), GFP_NOFS,
945 cfs_cpt_spread_node(svcpt->scp_service->srv_cptable,
947 if (!svcpt->scp_nrs_hp) {
952 nrs = nrs_svcpt2nrs(svcpt, true);
953 rc = nrs_svcpt_setup_locked0(nrs, svcpt);
960 * Unregisters all policies on all available NRS heads in a service partition;
961 * called at PTLRPC service unregistration time.
963 * \param[in] svcpt the PTLRPC service partition
965 * \pre mutex_is_locked(&nrs_core.nrs_mutex)
967 static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
969 struct ptlrpc_nrs *nrs;
970 struct ptlrpc_nrs_policy *policy;
971 struct ptlrpc_nrs_policy *tmp;
975 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
978 nrs = nrs_svcpt2nrs(svcpt, hp);
979 nrs->nrs_stopping = 1;
981 list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, pol_list) {
982 rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
987 * If the service partition has an HP NRS head, clean that up as well.
989 if (!hp && nrs_svcpt_has_hp(svcpt)) {
999 * Returns the descriptor for a policy as identified by by \a name.
1001 * \param[in] name the policy name
1003 * \retval the policy descriptor
1006 static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
1008 struct ptlrpc_nrs_pol_desc *tmp;
1010 list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
1011 if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
1018 * Removes the policy from all supported NRS heads of all partitions of all
1021 * \param[in] desc the policy descriptor to unregister
1024 * \retval 0 successfully unregistered policy on all supported NRS heads
1026 * \pre mutex_is_locked(&nrs_core.nrs_mutex)
1027 * \pre mutex_is_locked(&ptlrpc_all_services_mutex)
1029 static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
1031 struct ptlrpc_nrs *nrs;
1032 struct ptlrpc_service *svc;
1033 struct ptlrpc_service_part *svcpt;
1037 LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
1038 LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
1040 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1041 if (!nrs_policy_compatible(svc, desc) ||
1042 unlikely(svc->srv_is_stopping))
1045 ptlrpc_service_for_each_part(svcpt, i, svc) {
1049 nrs = nrs_svcpt2nrs(svcpt, hp);
1050 rc = nrs_policy_unregister(nrs, desc->pd_name);
1052 * Ignore -ENOENT as the policy may not have registered
1053 * successfully on all service partitions.
1055 if (rc == -ENOENT) {
1057 } else if (rc != 0) {
1058 CERROR("Failed to unregister NRS policy %s for partition %d of service %s: %d\n",
1059 desc->pd_name, svcpt->scp_cpt,
1060 svcpt->scp_service->srv_name, rc);
1064 if (!hp && nrs_svc_has_hp(svc)) {
1070 if (desc->pd_ops->op_lprocfs_fini)
1071 desc->pd_ops->op_lprocfs_fini(svc);
1078 * Registers a new policy with NRS core.
1080 * The function will only succeed if policy registration with all compatible
1081 * service partitions (if any) is successful.
1083 * N.B. This function should be called either at ptlrpc module initialization
1084 * time when registering a policy that ships with NRS core, or in a
1085 * module's init() function for policies registering from other modules.
1087 * \param[in] conf configuration information for the new policy to register
1092 static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
1094 struct ptlrpc_service *svc;
1095 struct ptlrpc_nrs_pol_desc *desc;
1099 LASSERT(conf->nc_ops);
1100 LASSERT(conf->nc_compat);
1101 LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
1102 conf->nc_compat_svc_name));
1103 LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
1106 conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
1109 * External policies are not allowed to start immediately upon
1110 * registration, as there is a relatively higher chance that their
1111 * registration might fail. In such a case, some policy instances may
1112 * already have requests queued wen unregistration needs to happen as
1113 * part o cleanup; since there is currently no way to drain requests
1114 * from a policy unless the service is unregistering, we just disallow
1117 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
1118 (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
1119 PTLRPC_NRS_FL_REG_START))) {
1120 CERROR("NRS: failing to register policy %s. Please check policy flags; external policies cannot act as fallback policies, or be started immediately upon registration without interaction with lprocfs\n",
1125 mutex_lock(&nrs_core.nrs_mutex);
1127 if (nrs_policy_find_desc_locked(conf->nc_name)) {
1128 CERROR("NRS: failing to register policy %s which has already been registered with NRS core!\n",
1134 desc = kzalloc(sizeof(*desc), GFP_NOFS);
1140 len = strlcpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name));
1141 if (len >= sizeof(desc->pd_name)) {
1146 desc->pd_ops = conf->nc_ops;
1147 desc->pd_compat = conf->nc_compat;
1148 desc->pd_compat_svc_name = conf->nc_compat_svc_name;
1149 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
1150 desc->pd_owner = conf->nc_owner;
1151 desc->pd_flags = conf->nc_flags;
1152 atomic_set(&desc->pd_refs, 0);
1155 * For policies that are held in the same module as NRS (currently
1156 * ptlrpc), do not register the policy with all compatible services,
1157 * as the services will not have started at this point, since we are
1158 * calling from ptlrpc module initialization code. In such cases each
1159 * service will register all compatible policies later, via
1160 * ptlrpc_service_nrs_setup().
1162 if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
1166 * Register the new policy on all compatible services
1168 mutex_lock(&ptlrpc_all_services_mutex);
1170 list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
1171 struct ptlrpc_service_part *svcpt;
1175 if (!nrs_policy_compatible(svc, desc) ||
1176 unlikely(svc->srv_is_stopping))
1179 ptlrpc_service_for_each_part(svcpt, i, svc) {
1180 struct ptlrpc_nrs *nrs;
1183 nrs = nrs_svcpt2nrs(svcpt, hp);
1184 rc = nrs_policy_register(nrs, desc);
1186 CERROR("Failed to register NRS policy %s for partition %d of service %s: %d\n",
1187 desc->pd_name, svcpt->scp_cpt,
1188 svcpt->scp_service->srv_name, rc);
1190 rc2 = nrs_policy_unregister_locked(desc);
1192 * Should not fail at this point
1195 mutex_unlock(&ptlrpc_all_services_mutex);
1200 if (!hp && nrs_svc_has_hp(svc)) {
1207 * No need to take a reference to other modules here, as we
1208 * will be calling from the module's init() function.
1210 if (desc->pd_ops->op_lprocfs_init) {
1211 rc = desc->pd_ops->op_lprocfs_init(svc);
1213 rc2 = nrs_policy_unregister_locked(desc);
1215 * Should not fail at this point
1218 mutex_unlock(&ptlrpc_all_services_mutex);
1225 mutex_unlock(&ptlrpc_all_services_mutex);
1227 list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
1229 mutex_unlock(&nrs_core.nrs_mutex);
1235 * Setup NRS heads on all service partitions of service \a svc, and register
1236 * all compatible policies on those NRS heads.
1238 * To be called from within ptl
1239 * \param[in] svc the service to setup
1241 * \retval -ve error, the calling logic should eventually call
1242 * ptlrpc_service_nrs_cleanup() to undo any work performed
1245 * \see ptlrpc_register_service()
1246 * \see ptlrpc_service_nrs_cleanup()
1248 int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
1250 struct ptlrpc_service_part *svcpt;
1251 const struct ptlrpc_nrs_pol_desc *desc;
1255 mutex_lock(&nrs_core.nrs_mutex);
1258 * Initialize NRS heads on all service CPTs.
1260 ptlrpc_service_for_each_part(svcpt, i, svc) {
1261 rc = nrs_svcpt_setup_locked(svcpt);
1267 * Set up lprocfs interfaces for all supported policies for the
1270 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1271 if (!nrs_policy_compatible(svc, desc))
1274 if (desc->pd_ops->op_lprocfs_init) {
1275 rc = desc->pd_ops->op_lprocfs_init(svc);
1283 mutex_unlock(&nrs_core.nrs_mutex);
1289 * Unregisters all policies on all service partitions of service \a svc.
1291 * \param[in] svc the PTLRPC service to unregister
1293 void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
1295 struct ptlrpc_service_part *svcpt;
1296 const struct ptlrpc_nrs_pol_desc *desc;
1299 mutex_lock(&nrs_core.nrs_mutex);
1302 * Clean up NRS heads on all service partitions
1304 ptlrpc_service_for_each_part(svcpt, i, svc)
1305 nrs_svcpt_cleanup_locked(svcpt);
1308 * Clean up lprocfs interfaces for all supported policies for the
1311 list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
1312 if (!nrs_policy_compatible(svc, desc))
1315 if (desc->pd_ops->op_lprocfs_fini)
1316 desc->pd_ops->op_lprocfs_fini(svc);
1319 mutex_unlock(&nrs_core.nrs_mutex);
1323 * Obtains NRS head resources for request \a req.
1325 * These could be either on the regular or HP NRS head of \a svcpt; resources
1326 * taken on the regular head can later be swapped for HP head resources by
1327 * ldlm_lock_reorder_req().
1329 * \param[in] svcpt the service partition
1330 * \param[in] req the request
1331 * \param[in] hp which NRS head of \a svcpt to use
1333 void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
1334 struct ptlrpc_request *req, bool hp)
1336 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1338 memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
1339 nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
1343 * It is fine to access \e nr_initialized without locking as there is
1344 * no contention at this early stage.
1346 req->rq_nrq.nr_initialized = 1;
1350 * Releases resources for a request; is called after the request has been
1353 * \param[in] req the request
1355 * \see ptlrpc_server_finish_request()
1357 void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
1359 if (req->rq_nrq.nr_initialized) {
1360 nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
1361 /* no protection on bit nr_initialized because no
1362 * contention at this late stage
1364 req->rq_nrq.nr_finalized = 1;
1368 void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
1370 if (req->rq_nrq.nr_started)
1371 nrs_request_stop(&req->rq_nrq);
1375 * Enqueues request \a req on either the regular or high-priority NRS head
1376 * of service partition \a svcpt.
1378 * \param[in] svcpt the service partition
1379 * \param[in] req the request to be enqueued
1380 * \param[in] hp whether to enqueue the request on the regular or
1381 * high-priority NRS head.
1383 void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
1384 struct ptlrpc_request *req, bool hp)
1386 spin_lock(&svcpt->scp_req_lock);
1389 ptlrpc_nrs_hpreq_add_nolock(req);
1391 ptlrpc_nrs_req_add_nolock(req);
1393 spin_unlock(&svcpt->scp_req_lock);
1396 static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
1398 LASSERT(policy->pol_nrs->nrs_req_queued > 0);
1399 LASSERT(policy->pol_req_queued > 0);
1401 policy->pol_nrs->nrs_req_queued--;
1402 policy->pol_req_queued--;
1405 * If the policy has no more requests queued, remove it from
1406 * ptlrpc_nrs::nrs_policy_queued.
1408 if (unlikely(policy->pol_req_queued == 0)) {
1409 list_del_init(&policy->pol_list_queued);
1412 * If there are other policies with queued requests, move the
1413 * current policy to the end so that we can round robin over
1414 * all policies and drain the requests.
1416 } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
1417 LASSERT(policy->pol_req_queued <
1418 policy->pol_nrs->nrs_req_queued);
1420 list_move_tail(&policy->pol_list_queued,
1421 &policy->pol_nrs->nrs_policy_queued);
1426 * Obtains a request for handling from an NRS head of service partition
1429 * \param[in] svcpt the service partition
1430 * \param[in] hp whether to obtain a request from the regular or
1431 * high-priority NRS head.
1432 * \param[in] peek when set, signifies that we just want to examine the
1433 * request, and not handle it, so the request is not removed
1435 * \param[in] force when set, it will force a policy to return a request if it
1438 * \retval the request to be handled
1439 * \retval NULL the head has no requests to serve
1441 struct ptlrpc_request *
1442 ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
1443 bool peek, bool force)
1445 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1446 struct ptlrpc_nrs_policy *policy;
1447 struct ptlrpc_nrs_request *nrq;
1450 * Always try to drain requests from all NRS polices even if they are
1451 * inactive, because the user can change policy status at runtime.
1453 list_for_each_entry(policy, &nrs->nrs_policy_queued, pol_list_queued) {
1454 nrq = nrs_request_get(policy, peek, force);
1456 if (likely(!peek)) {
1457 nrq->nr_started = 1;
1459 policy->pol_req_started++;
1460 policy->pol_nrs->nrs_req_started++;
1462 nrs_request_removed(policy);
1465 return container_of(nrq, struct ptlrpc_request, rq_nrq);
1473 * Returns whether there are any requests currently enqueued on any of the
1474 * policies of service partition's \a svcpt NRS head specified by \a hp. Should
1475 * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
1478 * \param[in] svcpt the service partition to enquire.
1479 * \param[in] hp whether the regular or high-priority NRS head is to be
1482 * \retval false the indicated NRS head has no enqueued requests.
1483 * \retval true the indicated NRS head has some enqueued requests.
1485 bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
1487 struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
1489 return nrs->nrs_req_queued > 0;
1493 * Carries out a control operation \a opc on the policy identified by the
1494 * human-readable \a name, on either all partitions, or only on the first
1495 * partition of service \a svc.
1497 * \param[in] svc the service the policy belongs to.
1498 * \param[in] queue whether to carry out the command on the policy which
1499 * belongs to the regular, high-priority, or both NRS
1500 * heads of service partitions of \a svc.
1501 * \param[in] name the policy to act upon, by human-readable name
1502 * \param[in] opc the opcode of the operation to carry out
1503 * \param[in] single when set, the operation will only be carried out on the
1504 * NRS heads of the first service partition of \a svc.
1505 * This is useful for some policies which e.g. share
1506 * identical values on the same parameters of different
1507 * service partitions; when reading these parameters via
1508 * lprocfs, these policies may just want to obtain and
1509 * print out the values from the first service partition.
1510 * Storing these values centrally elsewhere then could be
1511 * another solution for this.
1512 * \param[in,out] arg can be used as a generic in/out buffer between control
1513 * operations and the user environment.
1515 *\retval -ve error condition
1516 *\retval 0 operation was carried out successfully
1518 int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
1519 enum ptlrpc_nrs_queue_type queue, char *name,
1520 enum ptlrpc_nrs_ctl opc, bool single, void *arg)
1522 struct ptlrpc_service_part *svcpt;
1526 LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
1528 if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
1531 ptlrpc_service_for_each_part(svcpt, i, svc) {
1532 if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
1533 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
1535 if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
1540 if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
1542 * XXX: We could optionally check for
1543 * nrs_svc_has_hp(svc) here, and return an error if it
1544 * is false. Right now we rely on the policies' lprocfs
1545 * handlers that call the present function to make this
1546 * check; if they fail to do so, they might hit the
1547 * assertion inside nrs_svcpt2nrs() below.
1549 rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
1551 if (rc != 0 || single)
1559 /* ptlrpc/nrs_fifo.c */
1560 extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
1563 * Adds all policies that ship with the ptlrpc module, to NRS core's list of
1564 * policies \e nrs_core.nrs_policies.
1566 * \retval 0 all policies have been registered successfully
1569 int ptlrpc_nrs_init(void)
1573 mutex_init(&nrs_core.nrs_mutex);
1574 INIT_LIST_HEAD(&nrs_core.nrs_policies);
1576 rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
1583 * Since no PTLRPC services have been started at this point, all we need
1584 * to do for cleanup is to free the descriptors.
1592 * Removes all policy descriptors from nrs_core::nrs_policies, and frees the
1593 * policy descriptors.
1595 * Since all PTLRPC services are stopped at this point, there are no more
1596 * instances of any policies, because each service will have stopped its policy
1597 * instances in ptlrpc_service_nrs_cleanup(), so we just need to free the
1600 void ptlrpc_nrs_fini(void)
1602 struct ptlrpc_nrs_pol_desc *desc;
1603 struct ptlrpc_nrs_pol_desc *tmp;
1605 list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies, pd_list) {
1606 list_del_init(&desc->pd_list);