Merge tag 'arc-v3.11-rc1-part2' of git://git.kernel.org/pub/scm/linux/kernel/git...
[cascardo/linux.git] / drivers / staging / lustre / lustre / ptlrpc / sec.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/sec.c
37  *
38  * Author: Eric Mei <ericm@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_SEC
42
43 #include <linux/libcfs/libcfs.h>
44 #include <linux/crypto.h>
45 #include <linux/key.h>
46
47 #include <obd.h>
48 #include <obd_class.h>
49 #include <obd_support.h>
50 #include <lustre_net.h>
51 #include <lustre_import.h>
52 #include <lustre_dlm.h>
53 #include <lustre_sec.h>
54
55 #include "ptlrpc_internal.h"
56
57 /***********************************************
58  * policy registers                         *
59  ***********************************************/
60
61 static rwlock_t policy_lock;
62 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
63         NULL,
64 };
65
66 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
67 {
68         __u16 number = policy->sp_policy;
69
70         LASSERT(policy->sp_name);
71         LASSERT(policy->sp_cops);
72         LASSERT(policy->sp_sops);
73
74         if (number >= SPTLRPC_POLICY_MAX)
75                 return -EINVAL;
76
77         write_lock(&policy_lock);
78         if (unlikely(policies[number])) {
79                 write_unlock(&policy_lock);
80                 return -EALREADY;
81         }
82         policies[number] = policy;
83         write_unlock(&policy_lock);
84
85         CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
86         return 0;
87 }
88 EXPORT_SYMBOL(sptlrpc_register_policy);
89
90 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
91 {
92         __u16 number = policy->sp_policy;
93
94         LASSERT(number < SPTLRPC_POLICY_MAX);
95
96         write_lock(&policy_lock);
97         if (unlikely(policies[number] == NULL)) {
98                 write_unlock(&policy_lock);
99                 CERROR("%s: already unregistered\n", policy->sp_name);
100                 return -EINVAL;
101         }
102
103         LASSERT(policies[number] == policy);
104         policies[number] = NULL;
105         write_unlock(&policy_lock);
106
107         CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
108         return 0;
109 }
110 EXPORT_SYMBOL(sptlrpc_unregister_policy);
111
112 static
113 struct ptlrpc_sec_policy * sptlrpc_wireflavor2policy(__u32 flavor)
114 {
115         static DEFINE_MUTEX(load_mutex);
116         static atomic_t       loaded = ATOMIC_INIT(0);
117         struct ptlrpc_sec_policy *policy;
118         __u16                number = SPTLRPC_FLVR_POLICY(flavor);
119         __u16                flag = 0;
120
121         if (number >= SPTLRPC_POLICY_MAX)
122                 return NULL;
123
124         while (1) {
125                 read_lock(&policy_lock);
126                 policy = policies[number];
127                 if (policy && !try_module_get(policy->sp_owner))
128                         policy = NULL;
129                 if (policy == NULL)
130                         flag = atomic_read(&loaded);
131                 read_unlock(&policy_lock);
132
133                 if (policy != NULL || flag != 0 ||
134                     number != SPTLRPC_POLICY_GSS)
135                         break;
136
137                 /* try to load gss module, once */
138                 mutex_lock(&load_mutex);
139                 if (atomic_read(&loaded) == 0) {
140                         if (request_module("ptlrpc_gss") == 0)
141                                 CDEBUG(D_SEC,
142                                        "module ptlrpc_gss loaded on demand\n");
143                         else
144                                 CERROR("Unable to load module ptlrpc_gss\n");
145
146                         atomic_set(&loaded, 1);
147                 }
148                 mutex_unlock(&load_mutex);
149         }
150
151         return policy;
152 }
153
154 __u32 sptlrpc_name2flavor_base(const char *name)
155 {
156         if (!strcmp(name, "null"))
157                 return SPTLRPC_FLVR_NULL;
158         if (!strcmp(name, "plain"))
159                 return SPTLRPC_FLVR_PLAIN;
160         if (!strcmp(name, "krb5n"))
161                 return SPTLRPC_FLVR_KRB5N;
162         if (!strcmp(name, "krb5a"))
163                 return SPTLRPC_FLVR_KRB5A;
164         if (!strcmp(name, "krb5i"))
165                 return SPTLRPC_FLVR_KRB5I;
166         if (!strcmp(name, "krb5p"))
167                 return SPTLRPC_FLVR_KRB5P;
168
169         return SPTLRPC_FLVR_INVALID;
170 }
171 EXPORT_SYMBOL(sptlrpc_name2flavor_base);
172
173 const char *sptlrpc_flavor2name_base(__u32 flvr)
174 {
175         __u32   base = SPTLRPC_FLVR_BASE(flvr);
176
177         if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL))
178                 return "null";
179         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN))
180                 return "plain";
181         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5N))
182                 return "krb5n";
183         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5A))
184                 return "krb5a";
185         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5I))
186                 return "krb5i";
187         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5P))
188                 return "krb5p";
189
190         CERROR("invalid wire flavor 0x%x\n", flvr);
191         return "invalid";
192 }
193 EXPORT_SYMBOL(sptlrpc_flavor2name_base);
194
195 char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
196                                char *buf, int bufsize)
197 {
198         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN)
199                 snprintf(buf, bufsize, "hash:%s",
200                          sptlrpc_get_hash_name(sf->u_bulk.hash.hash_alg));
201         else
202                 snprintf(buf, bufsize, "%s",
203                          sptlrpc_flavor2name_base(sf->sf_rpc));
204
205         buf[bufsize - 1] = '\0';
206         return buf;
207 }
208 EXPORT_SYMBOL(sptlrpc_flavor2name_bulk);
209
210 char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
211 {
212         snprintf(buf, bufsize, "%s", sptlrpc_flavor2name_base(sf->sf_rpc));
213
214         /*
215          * currently we don't support customized bulk specification for
216          * flavors other than plain
217          */
218         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN) {
219                 char bspec[16];
220
221                 bspec[0] = '-';
222                 sptlrpc_flavor2name_bulk(sf, &bspec[1], sizeof(bspec) - 1);
223                 strncat(buf, bspec, bufsize);
224         }
225
226         buf[bufsize - 1] = '\0';
227         return buf;
228 }
229 EXPORT_SYMBOL(sptlrpc_flavor2name);
230
231 char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
232 {
233         buf[0] = '\0';
234
235         if (flags & PTLRPC_SEC_FL_REVERSE)
236                 strlcat(buf, "reverse,", bufsize);
237         if (flags & PTLRPC_SEC_FL_ROOTONLY)
238                 strlcat(buf, "rootonly,", bufsize);
239         if (flags & PTLRPC_SEC_FL_UDESC)
240                 strlcat(buf, "udesc,", bufsize);
241         if (flags & PTLRPC_SEC_FL_BULK)
242                 strlcat(buf, "bulk,", bufsize);
243         if (buf[0] == '\0')
244                 strlcat(buf, "-,", bufsize);
245
246         return buf;
247 }
248 EXPORT_SYMBOL(sptlrpc_secflags2str);
249
250 /**************************************************
251  * client context APIs                      *
252  **************************************************/
253
254 static
255 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
256 {
257         struct vfs_cred vcred;
258         int create = 1, remove_dead = 1;
259
260         LASSERT(sec);
261         LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
262
263         if (sec->ps_flvr.sf_flags & (PTLRPC_SEC_FL_REVERSE |
264                                      PTLRPC_SEC_FL_ROOTONLY)) {
265                 vcred.vc_uid = 0;
266                 vcred.vc_gid = 0;
267                 if (sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_REVERSE) {
268                         create = 0;
269                         remove_dead = 0;
270                 }
271         } else {
272                 vcred.vc_uid = current_uid();
273                 vcred.vc_gid = current_gid();
274         }
275
276         return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
277                                                    create, remove_dead);
278 }
279
280 struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
281 {
282         atomic_inc(&ctx->cc_refcount);
283         return ctx;
284 }
285 EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
286
287 void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
288 {
289         struct ptlrpc_sec *sec = ctx->cc_sec;
290
291         LASSERT(sec);
292         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
293
294         if (!atomic_dec_and_test(&ctx->cc_refcount))
295                 return;
296
297         sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
298 }
299 EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
300
301 /**
302  * Expire the client context immediately.
303  *
304  * \pre Caller must hold at least 1 reference on the \a ctx.
305  */
306 void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
307 {
308         LASSERT(ctx->cc_ops->die);
309         ctx->cc_ops->die(ctx, 0);
310 }
311 EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
312
313 /**
314  * To wake up the threads who are waiting for this client context. Called
315  * after some status change happened on \a ctx.
316  */
317 void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
318 {
319         struct ptlrpc_request *req, *next;
320
321         spin_lock(&ctx->cc_lock);
322         list_for_each_entry_safe(req, next, &ctx->cc_req_list,
323                                      rq_ctx_chain) {
324                 list_del_init(&req->rq_ctx_chain);
325                 ptlrpc_client_wake_req(req);
326         }
327         spin_unlock(&ctx->cc_lock);
328 }
329 EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
330
331 int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
332 {
333         LASSERT(ctx->cc_ops);
334
335         if (ctx->cc_ops->display == NULL)
336                 return 0;
337
338         return ctx->cc_ops->display(ctx, buf, bufsize);
339 }
340
341 static int import_sec_check_expire(struct obd_import *imp)
342 {
343         int     adapt = 0;
344
345         spin_lock(&imp->imp_lock);
346         if (imp->imp_sec_expire &&
347             imp->imp_sec_expire < cfs_time_current_sec()) {
348                 adapt = 1;
349                 imp->imp_sec_expire = 0;
350         }
351         spin_unlock(&imp->imp_lock);
352
353         if (!adapt)
354                 return 0;
355
356         CDEBUG(D_SEC, "found delayed sec adapt expired, do it now\n");
357         return sptlrpc_import_sec_adapt(imp, NULL, 0);
358 }
359
360 static int import_sec_validate_get(struct obd_import *imp,
361                                    struct ptlrpc_sec **sec)
362 {
363         int     rc;
364
365         if (unlikely(imp->imp_sec_expire)) {
366                 rc = import_sec_check_expire(imp);
367                 if (rc)
368                         return rc;
369         }
370
371         *sec = sptlrpc_import_sec_ref(imp);
372         if (*sec == NULL) {
373                 CERROR("import %p (%s) with no sec\n",
374                        imp, ptlrpc_import_state_name(imp->imp_state));
375                 return -EACCES;
376         }
377
378         if (unlikely((*sec)->ps_dying)) {
379                 CERROR("attempt to use dying sec %p\n", sec);
380                 sptlrpc_sec_put(*sec);
381                 return -EACCES;
382         }
383
384         return 0;
385 }
386
387 /**
388  * Given a \a req, find or allocate a appropriate context for it.
389  * \pre req->rq_cli_ctx == NULL.
390  *
391  * \retval 0 succeed, and req->rq_cli_ctx is set.
392  * \retval -ev error number, and req->rq_cli_ctx == NULL.
393  */
394 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
395 {
396         struct obd_import *imp = req->rq_import;
397         struct ptlrpc_sec *sec;
398         int             rc;
399         ENTRY;
400
401         LASSERT(!req->rq_cli_ctx);
402         LASSERT(imp);
403
404         rc = import_sec_validate_get(imp, &sec);
405         if (rc)
406                 RETURN(rc);
407
408         req->rq_cli_ctx = get_my_ctx(sec);
409
410         sptlrpc_sec_put(sec);
411
412         if (!req->rq_cli_ctx) {
413                 CERROR("req %p: fail to get context\n", req);
414                 RETURN(-ENOMEM);
415         }
416
417         RETURN(0);
418 }
419
420 /**
421  * Drop the context for \a req.
422  * \pre req->rq_cli_ctx != NULL.
423  * \post req->rq_cli_ctx == NULL.
424  *
425  * If \a sync == 0, this function should return quickly without sleep;
426  * otherwise it might trigger and wait for the whole process of sending
427  * an context-destroying rpc to server.
428  */
429 void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
430 {
431         ENTRY;
432
433         LASSERT(req);
434         LASSERT(req->rq_cli_ctx);
435
436         /* request might be asked to release earlier while still
437          * in the context waiting list.
438          */
439         if (!list_empty(&req->rq_ctx_chain)) {
440                 spin_lock(&req->rq_cli_ctx->cc_lock);
441                 list_del_init(&req->rq_ctx_chain);
442                 spin_unlock(&req->rq_cli_ctx->cc_lock);
443         }
444
445         sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
446         req->rq_cli_ctx = NULL;
447         EXIT;
448 }
449
450 static
451 int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
452                            struct ptlrpc_cli_ctx *oldctx,
453                            struct ptlrpc_cli_ctx *newctx)
454 {
455         struct sptlrpc_flavor   old_flvr;
456         char               *reqmsg = NULL; /* to workaround old gcc */
457         int                  reqmsg_size;
458         int                  rc = 0;
459
460         LASSERT(req->rq_reqmsg);
461         LASSERT(req->rq_reqlen);
462         LASSERT(req->rq_replen);
463
464         CDEBUG(D_SEC, "req %p: switch ctx %p(%u->%s) -> %p(%u->%s), "
465                "switch sec %p(%s) -> %p(%s)\n", req,
466                oldctx, oldctx->cc_vcred.vc_uid, sec2target_str(oldctx->cc_sec),
467                newctx, newctx->cc_vcred.vc_uid, sec2target_str(newctx->cc_sec),
468                oldctx->cc_sec, oldctx->cc_sec->ps_policy->sp_name,
469                newctx->cc_sec, newctx->cc_sec->ps_policy->sp_name);
470
471         /* save flavor */
472         old_flvr = req->rq_flvr;
473
474         /* save request message */
475         reqmsg_size = req->rq_reqlen;
476         if (reqmsg_size != 0) {
477                 OBD_ALLOC_LARGE(reqmsg, reqmsg_size);
478                 if (reqmsg == NULL)
479                         return -ENOMEM;
480                 memcpy(reqmsg, req->rq_reqmsg, reqmsg_size);
481         }
482
483         /* release old req/rep buf */
484         req->rq_cli_ctx = oldctx;
485         sptlrpc_cli_free_reqbuf(req);
486         sptlrpc_cli_free_repbuf(req);
487         req->rq_cli_ctx = newctx;
488
489         /* recalculate the flavor */
490         sptlrpc_req_set_flavor(req, 0);
491
492         /* alloc new request buffer
493          * we don't need to alloc reply buffer here, leave it to the
494          * rest procedure of ptlrpc */
495         if (reqmsg_size != 0) {
496                 rc = sptlrpc_cli_alloc_reqbuf(req, reqmsg_size);
497                 if (!rc) {
498                         LASSERT(req->rq_reqmsg);
499                         memcpy(req->rq_reqmsg, reqmsg, reqmsg_size);
500                 } else {
501                         CWARN("failed to alloc reqbuf: %d\n", rc);
502                         req->rq_flvr = old_flvr;
503                 }
504
505                 OBD_FREE_LARGE(reqmsg, reqmsg_size);
506         }
507         return rc;
508 }
509
510 /**
511  * If current context of \a req is dead somehow, e.g. we just switched flavor
512  * thus marked original contexts dead, we'll find a new context for it. if
513  * no switch is needed, \a req will end up with the same context.
514  *
515  * \note a request must have a context, to keep other parts of code happy.
516  * In any case of failure during the switching, we must restore the old one.
517  */
518 int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
519 {
520         struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
521         struct ptlrpc_cli_ctx *newctx;
522         int                 rc;
523         ENTRY;
524
525         LASSERT(oldctx);
526
527         sptlrpc_cli_ctx_get(oldctx);
528         sptlrpc_req_put_ctx(req, 0);
529
530         rc = sptlrpc_req_get_ctx(req);
531         if (unlikely(rc)) {
532                 LASSERT(!req->rq_cli_ctx);
533
534                 /* restore old ctx */
535                 req->rq_cli_ctx = oldctx;
536                 RETURN(rc);
537         }
538
539         newctx = req->rq_cli_ctx;
540         LASSERT(newctx);
541
542         if (unlikely(newctx == oldctx &&
543                      test_bit(PTLRPC_CTX_DEAD_BIT, &oldctx->cc_flags))) {
544                 /*
545                  * still get the old dead ctx, usually means system too busy
546                  */
547                 CDEBUG(D_SEC,
548                        "ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
549                        newctx, newctx->cc_flags);
550
551                 schedule_timeout_and_set_state(TASK_INTERRUPTIBLE,
552                                                    HZ);
553         } else {
554                 /*
555                  * it's possible newctx == oldctx if we're switching
556                  * subflavor with the same sec.
557                  */
558                 rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
559                 if (rc) {
560                         /* restore old ctx */
561                         sptlrpc_req_put_ctx(req, 0);
562                         req->rq_cli_ctx = oldctx;
563                         RETURN(rc);
564                 }
565
566                 LASSERT(req->rq_cli_ctx == newctx);
567         }
568
569         sptlrpc_cli_ctx_put(oldctx, 1);
570         RETURN(0);
571 }
572 EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
573
574 static
575 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
576 {
577         if (cli_ctx_is_refreshed(ctx))
578                 return 1;
579         return 0;
580 }
581
582 static
583 int ctx_refresh_timeout(void *data)
584 {
585         struct ptlrpc_request *req = data;
586         int rc;
587
588         /* conn_cnt is needed in expire_one_request */
589         lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
590
591         rc = ptlrpc_expire_one_request(req, 1);
592         /* if we started recovery, we should mark this ctx dead; otherwise
593          * in case of lgssd died nobody would retire this ctx, following
594          * connecting will still find the same ctx thus cause deadlock.
595          * there's an assumption that expire time of the request should be
596          * later than the context refresh expire time.
597          */
598         if (rc == 0)
599                 req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
600         return rc;
601 }
602
603 static
604 void ctx_refresh_interrupt(void *data)
605 {
606         struct ptlrpc_request *req = data;
607
608         spin_lock(&req->rq_lock);
609         req->rq_intr = 1;
610         spin_unlock(&req->rq_lock);
611 }
612
613 static
614 void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
615 {
616         spin_lock(&ctx->cc_lock);
617         if (!list_empty(&req->rq_ctx_chain))
618                 list_del_init(&req->rq_ctx_chain);
619         spin_unlock(&ctx->cc_lock);
620 }
621
622 /**
623  * To refresh the context of \req, if it's not up-to-date.
624  * \param timeout
625  * - < 0: don't wait
626  * - = 0: wait until success or fatal error occur
627  * - > 0: timeout value (in seconds)
628  *
629  * The status of the context could be subject to be changed by other threads
630  * at any time. We allow this race, but once we return with 0, the caller will
631  * suppose it's uptodated and keep using it until the owning rpc is done.
632  *
633  * \retval 0 only if the context is uptodated.
634  * \retval -ev error number.
635  */
636 int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
637 {
638         struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
639         struct ptlrpc_sec      *sec;
640         struct l_wait_info      lwi;
641         int                  rc;
642         ENTRY;
643
644         LASSERT(ctx);
645
646         if (req->rq_ctx_init || req->rq_ctx_fini)
647                 RETURN(0);
648
649         /*
650          * during the process a request's context might change type even
651          * (e.g. from gss ctx to null ctx), so each loop we need to re-check
652          * everything
653          */
654 again:
655         rc = import_sec_validate_get(req->rq_import, &sec);
656         if (rc)
657                 RETURN(rc);
658
659         if (sec->ps_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
660                 CDEBUG(D_SEC, "req %p: flavor has changed %x -> %x\n",
661                       req, req->rq_flvr.sf_rpc, sec->ps_flvr.sf_rpc);
662                 req_off_ctx_list(req, ctx);
663                 sptlrpc_req_replace_dead_ctx(req);
664                 ctx = req->rq_cli_ctx;
665         }
666         sptlrpc_sec_put(sec);
667
668         if (cli_ctx_is_eternal(ctx))
669                 RETURN(0);
670
671         if (unlikely(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags))) {
672                 LASSERT(ctx->cc_ops->refresh);
673                 ctx->cc_ops->refresh(ctx);
674         }
675         LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
676
677         LASSERT(ctx->cc_ops->validate);
678         if (ctx->cc_ops->validate(ctx) == 0) {
679                 req_off_ctx_list(req, ctx);
680                 RETURN(0);
681         }
682
683         if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
684                 spin_lock(&req->rq_lock);
685                 req->rq_err = 1;
686                 spin_unlock(&req->rq_lock);
687                 req_off_ctx_list(req, ctx);
688                 RETURN(-EPERM);
689         }
690
691         /*
692          * There's a subtle issue for resending RPCs, suppose following
693          * situation:
694          *  1. the request was sent to server.
695          *  2. recovery was kicked start, after finished the request was
696          *     marked as resent.
697          *  3. resend the request.
698          *  4. old reply from server received, we accept and verify the reply.
699          *     this has to be success, otherwise the error will be aware
700          *     by application.
701          *  5. new reply from server received, dropped by LNet.
702          *
703          * Note the xid of old & new request is the same. We can't simply
704          * change xid for the resent request because the server replies on
705          * it for reply reconstruction.
706          *
707          * Commonly the original context should be uptodate because we
708          * have a expiry nice time; server will keep its context because
709          * we at least hold a ref of old context which prevent context
710          * destroying RPC being sent. So server still can accept the request
711          * and finish the RPC. But if that's not the case:
712          *  1. If server side context has been trimmed, a NO_CONTEXT will
713          *     be returned, gss_cli_ctx_verify/unseal will switch to new
714          *     context by force.
715          *  2. Current context never be refreshed, then we are fine: we
716          *     never really send request with old context before.
717          */
718         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
719             unlikely(req->rq_reqmsg) &&
720             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
721                 req_off_ctx_list(req, ctx);
722                 RETURN(0);
723         }
724
725         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
726                 req_off_ctx_list(req, ctx);
727                 /*
728                  * don't switch ctx if import was deactivated
729                  */
730                 if (req->rq_import->imp_deactive) {
731                         spin_lock(&req->rq_lock);
732                         req->rq_err = 1;
733                         spin_unlock(&req->rq_lock);
734                         RETURN(-EINTR);
735                 }
736
737                 rc = sptlrpc_req_replace_dead_ctx(req);
738                 if (rc) {
739                         LASSERT(ctx == req->rq_cli_ctx);
740                         CERROR("req %p: failed to replace dead ctx %p: %d\n",
741                                req, ctx, rc);
742                         spin_lock(&req->rq_lock);
743                         req->rq_err = 1;
744                         spin_unlock(&req->rq_lock);
745                         RETURN(rc);
746                 }
747
748                 ctx = req->rq_cli_ctx;
749                 goto again;
750         }
751
752         /*
753          * Now we're sure this context is during upcall, add myself into
754          * waiting list
755          */
756         spin_lock(&ctx->cc_lock);
757         if (list_empty(&req->rq_ctx_chain))
758                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
759         spin_unlock(&ctx->cc_lock);
760
761         if (timeout < 0)
762                 RETURN(-EWOULDBLOCK);
763
764         /* Clear any flags that may be present from previous sends */
765         LASSERT(req->rq_receiving_reply == 0);
766         spin_lock(&req->rq_lock);
767         req->rq_err = 0;
768         req->rq_timedout = 0;
769         req->rq_resend = 0;
770         req->rq_restart = 0;
771         spin_unlock(&req->rq_lock);
772
773         lwi = LWI_TIMEOUT_INTR(timeout * HZ, ctx_refresh_timeout,
774                                ctx_refresh_interrupt, req);
775         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
776
777         /*
778          * following cases could lead us here:
779          * - successfully refreshed;
780          * - interrupted;
781          * - timedout, and we don't want recover from the failure;
782          * - timedout, and waked up upon recovery finished;
783          * - someone else mark this ctx dead by force;
784          * - someone invalidate the req and call ptlrpc_client_wake_req(),
785          *   e.g. ptlrpc_abort_inflight();
786          */
787         if (!cli_ctx_is_refreshed(ctx)) {
788                 /* timed out or interruptted */
789                 req_off_ctx_list(req, ctx);
790
791                 LASSERT(rc != 0);
792                 RETURN(rc);
793         }
794
795         goto again;
796 }
797
798 /**
799  * Initialize flavor settings for \a req, according to \a opcode.
800  *
801  * \note this could be called in two situations:
802  * - new request from ptlrpc_pre_req(), with proper @opcode
803  * - old request which changed ctx in the middle, with @opcode == 0
804  */
805 void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
806 {
807         struct ptlrpc_sec *sec;
808
809         LASSERT(req->rq_import);
810         LASSERT(req->rq_cli_ctx);
811         LASSERT(req->rq_cli_ctx->cc_sec);
812         LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
813
814         /* special security flags accoding to opcode */
815         switch (opcode) {
816         case OST_READ:
817         case MDS_READPAGE:
818         case MGS_CONFIG_READ:
819         case OBD_IDX_READ:
820                 req->rq_bulk_read = 1;
821                 break;
822         case OST_WRITE:
823         case MDS_WRITEPAGE:
824                 req->rq_bulk_write = 1;
825                 break;
826         case SEC_CTX_INIT:
827                 req->rq_ctx_init = 1;
828                 break;
829         case SEC_CTX_FINI:
830                 req->rq_ctx_fini = 1;
831                 break;
832         case 0:
833                 /* init/fini rpc won't be resend, so can't be here */
834                 LASSERT(req->rq_ctx_init == 0);
835                 LASSERT(req->rq_ctx_fini == 0);
836
837                 /* cleanup flags, which should be recalculated */
838                 req->rq_pack_udesc = 0;
839                 req->rq_pack_bulk = 0;
840                 break;
841         }
842
843         sec = req->rq_cli_ctx->cc_sec;
844
845         spin_lock(&sec->ps_lock);
846         req->rq_flvr = sec->ps_flvr;
847         spin_unlock(&sec->ps_lock);
848
849         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
850          * destruction rpc */
851         if (unlikely(req->rq_ctx_init))
852                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
853         else if (unlikely(req->rq_ctx_fini))
854                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
855
856         /* user descriptor flag, null security can't do it anyway */
857         if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
858             (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL))
859                 req->rq_pack_udesc = 1;
860
861         /* bulk security flag */
862         if ((req->rq_bulk_read || req->rq_bulk_write) &&
863             sptlrpc_flavor_has_bulk(&req->rq_flvr))
864                 req->rq_pack_bulk = 1;
865 }
866
867 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
868 {
869         if (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
870                 return;
871
872         LASSERT(req->rq_clrbuf);
873         if (req->rq_pool || !req->rq_reqbuf)
874                 return;
875
876         OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
877         req->rq_reqbuf = NULL;
878         req->rq_reqbuf_len = 0;
879 }
880
881 /**
882  * Given an import \a imp, check whether current user has a valid context
883  * or not. We may create a new context and try to refresh it, and try
884  * repeatedly try in case of non-fatal errors. Return 0 means success.
885  */
886 int sptlrpc_import_check_ctx(struct obd_import *imp)
887 {
888         struct ptlrpc_sec     *sec;
889         struct ptlrpc_cli_ctx *ctx;
890         struct ptlrpc_request *req = NULL;
891         int rc;
892         ENTRY;
893
894         might_sleep();
895
896         sec = sptlrpc_import_sec_ref(imp);
897         ctx = get_my_ctx(sec);
898         sptlrpc_sec_put(sec);
899
900         if (!ctx)
901                 RETURN(-ENOMEM);
902
903         if (cli_ctx_is_eternal(ctx) ||
904             ctx->cc_ops->validate(ctx) == 0) {
905                 sptlrpc_cli_ctx_put(ctx, 1);
906                 RETURN(0);
907         }
908
909         if (cli_ctx_is_error(ctx)) {
910                 sptlrpc_cli_ctx_put(ctx, 1);
911                 RETURN(-EACCES);
912         }
913
914         OBD_ALLOC_PTR(req);
915         if (!req)
916                 RETURN(-ENOMEM);
917
918         spin_lock_init(&req->rq_lock);
919         atomic_set(&req->rq_refcount, 10000);
920         INIT_LIST_HEAD(&req->rq_ctx_chain);
921         init_waitqueue_head(&req->rq_reply_waitq);
922         init_waitqueue_head(&req->rq_set_waitq);
923         req->rq_import = imp;
924         req->rq_flvr = sec->ps_flvr;
925         req->rq_cli_ctx = ctx;
926
927         rc = sptlrpc_req_refresh_ctx(req, 0);
928         LASSERT(list_empty(&req->rq_ctx_chain));
929         sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
930         OBD_FREE_PTR(req);
931
932         RETURN(rc);
933 }
934
935 /**
936  * Used by ptlrpc client, to perform the pre-defined security transformation
937  * upon the request message of \a req. After this function called,
938  * req->rq_reqmsg is still accessible as clear text.
939  */
940 int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
941 {
942         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
943         int rc = 0;
944         ENTRY;
945
946         LASSERT(ctx);
947         LASSERT(ctx->cc_sec);
948         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
949
950         /* we wrap bulk request here because now we can be sure
951          * the context is uptodate.
952          */
953         if (req->rq_bulk) {
954                 rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
955                 if (rc)
956                         RETURN(rc);
957         }
958
959         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
960         case SPTLRPC_SVC_NULL:
961         case SPTLRPC_SVC_AUTH:
962         case SPTLRPC_SVC_INTG:
963                 LASSERT(ctx->cc_ops->sign);
964                 rc = ctx->cc_ops->sign(ctx, req);
965                 break;
966         case SPTLRPC_SVC_PRIV:
967                 LASSERT(ctx->cc_ops->seal);
968                 rc = ctx->cc_ops->seal(ctx, req);
969                 break;
970         default:
971                 LBUG();
972         }
973
974         if (rc == 0) {
975                 LASSERT(req->rq_reqdata_len);
976                 LASSERT(req->rq_reqdata_len % 8 == 0);
977                 LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
978         }
979
980         RETURN(rc);
981 }
982
983 static int do_cli_unwrap_reply(struct ptlrpc_request *req)
984 {
985         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
986         int                 rc;
987         ENTRY;
988
989         LASSERT(ctx);
990         LASSERT(ctx->cc_sec);
991         LASSERT(req->rq_repbuf);
992         LASSERT(req->rq_repdata);
993         LASSERT(req->rq_repmsg == NULL);
994
995         req->rq_rep_swab_mask = 0;
996
997         rc = __lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len);
998         switch (rc) {
999         case 1:
1000                 lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
1001         case 0:
1002                 break;
1003         default:
1004                 CERROR("failed unpack reply: x"LPU64"\n", req->rq_xid);
1005                 RETURN(-EPROTO);
1006         }
1007
1008         if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
1009                 CERROR("replied data length %d too small\n",
1010                        req->rq_repdata_len);
1011                 RETURN(-EPROTO);
1012         }
1013
1014         if (SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr) !=
1015             SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
1016                 CERROR("reply policy %u doesn't match request policy %u\n",
1017                        SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr),
1018                        SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc));
1019                 RETURN(-EPROTO);
1020         }
1021
1022         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
1023         case SPTLRPC_SVC_NULL:
1024         case SPTLRPC_SVC_AUTH:
1025         case SPTLRPC_SVC_INTG:
1026                 LASSERT(ctx->cc_ops->verify);
1027                 rc = ctx->cc_ops->verify(ctx, req);
1028                 break;
1029         case SPTLRPC_SVC_PRIV:
1030                 LASSERT(ctx->cc_ops->unseal);
1031                 rc = ctx->cc_ops->unseal(ctx, req);
1032                 break;
1033         default:
1034                 LBUG();
1035         }
1036         LASSERT(rc || req->rq_repmsg || req->rq_resend);
1037
1038         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL &&
1039             !req->rq_ctx_init)
1040                 req->rq_rep_swab_mask = 0;
1041         RETURN(rc);
1042 }
1043
1044 /**
1045  * Used by ptlrpc client, to perform security transformation upon the reply
1046  * message of \a req. After return successfully, req->rq_repmsg points to
1047  * the reply message in clear text.
1048  *
1049  * \pre the reply buffer should have been un-posted from LNet, so nothing is
1050  * going to change.
1051  */
1052 int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
1053 {
1054         LASSERT(req->rq_repbuf);
1055         LASSERT(req->rq_repdata == NULL);
1056         LASSERT(req->rq_repmsg == NULL);
1057         LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
1058
1059         if (req->rq_reply_off == 0 &&
1060             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1061                 CERROR("real reply with offset 0\n");
1062                 return -EPROTO;
1063         }
1064
1065         if (req->rq_reply_off % 8 != 0) {
1066                 CERROR("reply at odd offset %u\n", req->rq_reply_off);
1067                 return -EPROTO;
1068         }
1069
1070         req->rq_repdata = (struct lustre_msg *)
1071                                 (req->rq_repbuf + req->rq_reply_off);
1072         req->rq_repdata_len = req->rq_nob_received;
1073
1074         return do_cli_unwrap_reply(req);
1075 }
1076
1077 /**
1078  * Used by ptlrpc client, to perform security transformation upon the early
1079  * reply message of \a req. We expect the rq_reply_off is 0, and
1080  * rq_nob_received is the early reply size.
1081  *
1082  * Because the receive buffer might be still posted, the reply data might be
1083  * changed at any time, no matter we're holding rq_lock or not. For this reason
1084  * we allocate a separate ptlrpc_request and reply buffer for early reply
1085  * processing.
1086  *
1087  * \retval 0 success, \a req_ret is filled with a duplicated ptlrpc_request.
1088  * Later the caller must call sptlrpc_cli_finish_early_reply() on the returned
1089  * \a *req_ret to release it.
1090  * \retval -ev error number, and \a req_ret will not be set.
1091  */
1092 int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
1093                                    struct ptlrpc_request **req_ret)
1094 {
1095         struct ptlrpc_request  *early_req;
1096         char               *early_buf;
1097         int                  early_bufsz, early_size;
1098         int                  rc;
1099         ENTRY;
1100
1101         OBD_ALLOC_PTR(early_req);
1102         if (early_req == NULL)
1103                 RETURN(-ENOMEM);
1104
1105         early_size = req->rq_nob_received;
1106         early_bufsz = size_roundup_power2(early_size);
1107         OBD_ALLOC_LARGE(early_buf, early_bufsz);
1108         if (early_buf == NULL)
1109                 GOTO(err_req, rc = -ENOMEM);
1110
1111         /* sanity checkings and copy data out, do it inside spinlock */
1112         spin_lock(&req->rq_lock);
1113
1114         if (req->rq_replied) {
1115                 spin_unlock(&req->rq_lock);
1116                 GOTO(err_buf, rc = -EALREADY);
1117         }
1118
1119         LASSERT(req->rq_repbuf);
1120         LASSERT(req->rq_repdata == NULL);
1121         LASSERT(req->rq_repmsg == NULL);
1122
1123         if (req->rq_reply_off != 0) {
1124                 CERROR("early reply with offset %u\n", req->rq_reply_off);
1125                 spin_unlock(&req->rq_lock);
1126                 GOTO(err_buf, rc = -EPROTO);
1127         }
1128
1129         if (req->rq_nob_received != early_size) {
1130                 /* even another early arrived the size should be the same */
1131                 CERROR("data size has changed from %u to %u\n",
1132                        early_size, req->rq_nob_received);
1133                 spin_unlock(&req->rq_lock);
1134                 GOTO(err_buf, rc = -EINVAL);
1135         }
1136
1137         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
1138                 CERROR("early reply length %d too small\n",
1139                        req->rq_nob_received);
1140                 spin_unlock(&req->rq_lock);
1141                 GOTO(err_buf, rc = -EALREADY);
1142         }
1143
1144         memcpy(early_buf, req->rq_repbuf, early_size);
1145         spin_unlock(&req->rq_lock);
1146
1147         spin_lock_init(&early_req->rq_lock);
1148         early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
1149         early_req->rq_flvr = req->rq_flvr;
1150         early_req->rq_repbuf = early_buf;
1151         early_req->rq_repbuf_len = early_bufsz;
1152         early_req->rq_repdata = (struct lustre_msg *) early_buf;
1153         early_req->rq_repdata_len = early_size;
1154         early_req->rq_early = 1;
1155         early_req->rq_reqmsg = req->rq_reqmsg;
1156
1157         rc = do_cli_unwrap_reply(early_req);
1158         if (rc) {
1159                 DEBUG_REQ(D_ADAPTTO, early_req,
1160                           "error %d unwrap early reply", rc);
1161                 GOTO(err_ctx, rc);
1162         }
1163
1164         LASSERT(early_req->rq_repmsg);
1165         *req_ret = early_req;
1166         RETURN(0);
1167
1168 err_ctx:
1169         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1170 err_buf:
1171         OBD_FREE_LARGE(early_buf, early_bufsz);
1172 err_req:
1173         OBD_FREE_PTR(early_req);
1174         RETURN(rc);
1175 }
1176
1177 /**
1178  * Used by ptlrpc client, to release a processed early reply \a early_req.
1179  *
1180  * \pre \a early_req was obtained from calling sptlrpc_cli_unwrap_early_reply().
1181  */
1182 void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
1183 {
1184         LASSERT(early_req->rq_repbuf);
1185         LASSERT(early_req->rq_repdata);
1186         LASSERT(early_req->rq_repmsg);
1187
1188         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1189         OBD_FREE_LARGE(early_req->rq_repbuf, early_req->rq_repbuf_len);
1190         OBD_FREE_PTR(early_req);
1191 }
1192
1193 /**************************************************
1194  * sec ID                                        *
1195  **************************************************/
1196
1197 /*
1198  * "fixed" sec (e.g. null) use sec_id < 0
1199  */
1200 static atomic_t sptlrpc_sec_id = ATOMIC_INIT(1);
1201
1202 int sptlrpc_get_next_secid(void)
1203 {
1204         return atomic_inc_return(&sptlrpc_sec_id);
1205 }
1206 EXPORT_SYMBOL(sptlrpc_get_next_secid);
1207
1208 /**************************************************
1209  * client side high-level security APIs    *
1210  **************************************************/
1211
1212 static int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
1213                                    int grace, int force)
1214 {
1215         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1216
1217         LASSERT(policy->sp_cops);
1218         LASSERT(policy->sp_cops->flush_ctx_cache);
1219
1220         return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
1221 }
1222
1223 static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
1224 {
1225         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1226
1227         LASSERT_ATOMIC_ZERO(&sec->ps_refcount);
1228         LASSERT_ATOMIC_ZERO(&sec->ps_nctx);
1229         LASSERT(policy->sp_cops->destroy_sec);
1230
1231         CDEBUG(D_SEC, "%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
1232
1233         policy->sp_cops->destroy_sec(sec);
1234         sptlrpc_policy_put(policy);
1235 }
1236
1237 void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
1238 {
1239         sec_cop_destroy_sec(sec);
1240 }
1241 EXPORT_SYMBOL(sptlrpc_sec_destroy);
1242
1243 static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
1244 {
1245         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1246
1247         if (sec->ps_policy->sp_cops->kill_sec) {
1248                 sec->ps_policy->sp_cops->kill_sec(sec);
1249
1250                 sec_cop_flush_ctx_cache(sec, -1, 1, 1);
1251         }
1252 }
1253
1254 struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
1255 {
1256         if (sec)
1257                 atomic_inc(&sec->ps_refcount);
1258
1259         return sec;
1260 }
1261 EXPORT_SYMBOL(sptlrpc_sec_get);
1262
1263 void sptlrpc_sec_put(struct ptlrpc_sec *sec)
1264 {
1265         if (sec) {
1266                 LASSERT_ATOMIC_POS(&sec->ps_refcount);
1267
1268                 if (atomic_dec_and_test(&sec->ps_refcount)) {
1269                         sptlrpc_gc_del_sec(sec);
1270                         sec_cop_destroy_sec(sec);
1271                 }
1272         }
1273 }
1274 EXPORT_SYMBOL(sptlrpc_sec_put);
1275
1276 /*
1277  * policy module is responsible for taking refrence of import
1278  */
1279 static
1280 struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
1281                                        struct ptlrpc_svc_ctx *svc_ctx,
1282                                        struct sptlrpc_flavor *sf,
1283                                        enum lustre_sec_part sp)
1284 {
1285         struct ptlrpc_sec_policy *policy;
1286         struct ptlrpc_sec       *sec;
1287         char                  str[32];
1288         ENTRY;
1289
1290         if (svc_ctx) {
1291                 LASSERT(imp->imp_dlm_fake == 1);
1292
1293                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
1294                        imp->imp_obd->obd_type->typ_name,
1295                        imp->imp_obd->obd_name,
1296                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1297
1298                 policy = sptlrpc_policy_get(svc_ctx->sc_policy);
1299                 sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1300         } else {
1301                 LASSERT(imp->imp_dlm_fake == 0);
1302
1303                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
1304                        imp->imp_obd->obd_type->typ_name,
1305                        imp->imp_obd->obd_name,
1306                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1307
1308                 policy = sptlrpc_wireflavor2policy(sf->sf_rpc);
1309                 if (!policy) {
1310                         CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
1311                         RETURN(NULL);
1312                 }
1313         }
1314
1315         sec = policy->sp_cops->create_sec(imp, svc_ctx, sf);
1316         if (sec) {
1317                 atomic_inc(&sec->ps_refcount);
1318
1319                 sec->ps_part = sp;
1320
1321                 if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
1322                         sptlrpc_gc_add_sec(sec);
1323         } else {
1324                 sptlrpc_policy_put(policy);
1325         }
1326
1327         RETURN(sec);
1328 }
1329
1330 struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp)
1331 {
1332         struct ptlrpc_sec *sec;
1333
1334         spin_lock(&imp->imp_lock);
1335         sec = sptlrpc_sec_get(imp->imp_sec);
1336         spin_unlock(&imp->imp_lock);
1337
1338         return sec;
1339 }
1340 EXPORT_SYMBOL(sptlrpc_import_sec_ref);
1341
1342 static void sptlrpc_import_sec_install(struct obd_import *imp,
1343                                        struct ptlrpc_sec *sec)
1344 {
1345         struct ptlrpc_sec *old_sec;
1346
1347         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1348
1349         spin_lock(&imp->imp_lock);
1350         old_sec = imp->imp_sec;
1351         imp->imp_sec = sec;
1352         spin_unlock(&imp->imp_lock);
1353
1354         if (old_sec) {
1355                 sptlrpc_sec_kill(old_sec);
1356
1357                 /* balance the ref taken by this import */
1358                 sptlrpc_sec_put(old_sec);
1359         }
1360 }
1361
1362 static inline
1363 int flavor_equal(struct sptlrpc_flavor *sf1, struct sptlrpc_flavor *sf2)
1364 {
1365         return (memcmp(sf1, sf2, sizeof(*sf1)) == 0);
1366 }
1367
1368 static inline
1369 void flavor_copy(struct sptlrpc_flavor *dst, struct sptlrpc_flavor *src)
1370 {
1371         *dst = *src;
1372 }
1373
1374 static void sptlrpc_import_sec_adapt_inplace(struct obd_import *imp,
1375                                              struct ptlrpc_sec *sec,
1376                                              struct sptlrpc_flavor *sf)
1377 {
1378         char    str1[32], str2[32];
1379
1380         if (sec->ps_flvr.sf_flags != sf->sf_flags)
1381                 CDEBUG(D_SEC, "changing sec flags: %s -> %s\n",
1382                        sptlrpc_secflags2str(sec->ps_flvr.sf_flags,
1383                                             str1, sizeof(str1)),
1384                        sptlrpc_secflags2str(sf->sf_flags,
1385                                             str2, sizeof(str2)));
1386
1387         spin_lock(&sec->ps_lock);
1388         flavor_copy(&sec->ps_flvr, sf);
1389         spin_unlock(&sec->ps_lock);
1390 }
1391
1392 /**
1393  * To get an appropriate ptlrpc_sec for the \a imp, according to the current
1394  * configuration. Upon called, imp->imp_sec may or may not be NULL.
1395  *
1396  *  - regular import: \a svc_ctx should be NULL and \a flvr is ignored;
1397  *  - reverse import: \a svc_ctx and \a flvr are obtained from incoming request.
1398  */
1399 int sptlrpc_import_sec_adapt(struct obd_import *imp,
1400                              struct ptlrpc_svc_ctx *svc_ctx,
1401                              struct sptlrpc_flavor *flvr)
1402 {
1403         struct ptlrpc_connection   *conn;
1404         struct sptlrpc_flavor       sf;
1405         struct ptlrpc_sec         *sec, *newsec;
1406         enum lustre_sec_part    sp;
1407         char                    str[24];
1408         int                      rc = 0;
1409         ENTRY;
1410
1411         might_sleep();
1412
1413         if (imp == NULL)
1414                 RETURN(0);
1415
1416         conn = imp->imp_connection;
1417
1418         if (svc_ctx == NULL) {
1419                 struct client_obd *cliobd = &imp->imp_obd->u.cli;
1420                 /*
1421                  * normal import, determine flavor from rule set, except
1422                  * for mgc the flavor is predetermined.
1423                  */
1424                 if (cliobd->cl_sp_me == LUSTRE_SP_MGC)
1425                         sf = cliobd->cl_flvr_mgc;
1426                 else
1427                         sptlrpc_conf_choose_flavor(cliobd->cl_sp_me,
1428                                                    cliobd->cl_sp_to,
1429                                                    &cliobd->cl_target_uuid,
1430                                                    conn->c_self, &sf);
1431
1432                 sp = imp->imp_obd->u.cli.cl_sp_me;
1433         } else {
1434                 /* reverse import, determine flavor from incoming reqeust */
1435                 sf = *flvr;
1436
1437                 if (sf.sf_rpc != SPTLRPC_FLVR_NULL)
1438                         sf.sf_flags = PTLRPC_SEC_FL_REVERSE |
1439                                       PTLRPC_SEC_FL_ROOTONLY;
1440
1441                 sp = sptlrpc_target_sec_part(imp->imp_obd);
1442         }
1443
1444         sec = sptlrpc_import_sec_ref(imp);
1445         if (sec) {
1446                 char    str2[24];
1447
1448                 if (flavor_equal(&sf, &sec->ps_flvr))
1449                         GOTO(out, rc);
1450
1451                 CDEBUG(D_SEC, "import %s->%s: changing flavor %s -> %s\n",
1452                        imp->imp_obd->obd_name,
1453                        obd_uuid2str(&conn->c_remote_uuid),
1454                        sptlrpc_flavor2name(&sec->ps_flvr, str, sizeof(str)),
1455                        sptlrpc_flavor2name(&sf, str2, sizeof(str2)));
1456
1457                 if (SPTLRPC_FLVR_POLICY(sf.sf_rpc) ==
1458                     SPTLRPC_FLVR_POLICY(sec->ps_flvr.sf_rpc) &&
1459                     SPTLRPC_FLVR_MECH(sf.sf_rpc) ==
1460                     SPTLRPC_FLVR_MECH(sec->ps_flvr.sf_rpc)) {
1461                         sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
1462                         GOTO(out, rc);
1463                 }
1464         } else if (SPTLRPC_FLVR_BASE(sf.sf_rpc) !=
1465                    SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL)) {
1466                 CDEBUG(D_SEC, "import %s->%s netid %x: select flavor %s\n",
1467                        imp->imp_obd->obd_name,
1468                        obd_uuid2str(&conn->c_remote_uuid),
1469                        LNET_NIDNET(conn->c_self),
1470                        sptlrpc_flavor2name(&sf, str, sizeof(str)));
1471         }
1472
1473         mutex_lock(&imp->imp_sec_mutex);
1474
1475         newsec = sptlrpc_sec_create(imp, svc_ctx, &sf, sp);
1476         if (newsec) {
1477                 sptlrpc_import_sec_install(imp, newsec);
1478         } else {
1479                 CERROR("import %s->%s: failed to create new sec\n",
1480                        imp->imp_obd->obd_name,
1481                        obd_uuid2str(&conn->c_remote_uuid));
1482                 rc = -EPERM;
1483         }
1484
1485         mutex_unlock(&imp->imp_sec_mutex);
1486 out:
1487         sptlrpc_sec_put(sec);
1488         RETURN(rc);
1489 }
1490
1491 void sptlrpc_import_sec_put(struct obd_import *imp)
1492 {
1493         if (imp->imp_sec) {
1494                 sptlrpc_sec_kill(imp->imp_sec);
1495
1496                 sptlrpc_sec_put(imp->imp_sec);
1497                 imp->imp_sec = NULL;
1498         }
1499 }
1500
1501 static void import_flush_ctx_common(struct obd_import *imp,
1502                                     uid_t uid, int grace, int force)
1503 {
1504         struct ptlrpc_sec *sec;
1505
1506         if (imp == NULL)
1507                 return;
1508
1509         sec = sptlrpc_import_sec_ref(imp);
1510         if (sec == NULL)
1511                 return;
1512
1513         sec_cop_flush_ctx_cache(sec, uid, grace, force);
1514         sptlrpc_sec_put(sec);
1515 }
1516
1517 void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
1518 {
1519         /* it's important to use grace mode, see explain in
1520          * sptlrpc_req_refresh_ctx() */
1521         import_flush_ctx_common(imp, 0, 1, 1);
1522 }
1523
1524 void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
1525 {
1526         import_flush_ctx_common(imp, current_uid(), 1, 1);
1527 }
1528 EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
1529
1530 void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
1531 {
1532         import_flush_ctx_common(imp, -1, 1, 1);
1533 }
1534 EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
1535
1536 /**
1537  * Used by ptlrpc client to allocate request buffer of \a req. Upon return
1538  * successfully, req->rq_reqmsg points to a buffer with size \a msgsize.
1539  */
1540 int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
1541 {
1542         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1543         struct ptlrpc_sec_policy *policy;
1544         int rc;
1545
1546         LASSERT(ctx);
1547         LASSERT(ctx->cc_sec);
1548         LASSERT(ctx->cc_sec->ps_policy);
1549         LASSERT(req->rq_reqmsg == NULL);
1550         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1551
1552         policy = ctx->cc_sec->ps_policy;
1553         rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
1554         if (!rc) {
1555                 LASSERT(req->rq_reqmsg);
1556                 LASSERT(req->rq_reqbuf || req->rq_clrbuf);
1557
1558                 /* zeroing preallocated buffer */
1559                 if (req->rq_pool)
1560                         memset(req->rq_reqmsg, 0, msgsize);
1561         }
1562
1563         return rc;
1564 }
1565
1566 /**
1567  * Used by ptlrpc client to free request buffer of \a req. After this
1568  * req->rq_reqmsg is set to NULL and should not be accessed anymore.
1569  */
1570 void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
1571 {
1572         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1573         struct ptlrpc_sec_policy *policy;
1574
1575         LASSERT(ctx);
1576         LASSERT(ctx->cc_sec);
1577         LASSERT(ctx->cc_sec->ps_policy);
1578         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1579
1580         if (req->rq_reqbuf == NULL && req->rq_clrbuf == NULL)
1581                 return;
1582
1583         policy = ctx->cc_sec->ps_policy;
1584         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1585         req->rq_reqmsg = NULL;
1586 }
1587
1588 /*
1589  * NOTE caller must guarantee the buffer size is enough for the enlargement
1590  */
1591 void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1592                                   int segment, int newsize)
1593 {
1594         void   *src, *dst;
1595         int     oldsize, oldmsg_size, movesize;
1596
1597         LASSERT(segment < msg->lm_bufcount);
1598         LASSERT(msg->lm_buflens[segment] <= newsize);
1599
1600         if (msg->lm_buflens[segment] == newsize)
1601                 return;
1602
1603         /* nothing to do if we are enlarging the last segment */
1604         if (segment == msg->lm_bufcount - 1) {
1605                 msg->lm_buflens[segment] = newsize;
1606                 return;
1607         }
1608
1609         oldsize = msg->lm_buflens[segment];
1610
1611         src = lustre_msg_buf(msg, segment + 1, 0);
1612         msg->lm_buflens[segment] = newsize;
1613         dst = lustre_msg_buf(msg, segment + 1, 0);
1614         msg->lm_buflens[segment] = oldsize;
1615
1616         /* move from segment + 1 to end segment */
1617         LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1618         oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1619         movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1620         LASSERT(movesize >= 0);
1621
1622         if (movesize)
1623                 memmove(dst, src, movesize);
1624
1625         /* note we don't clear the ares where old data live, not secret */
1626
1627         /* finally set new segment size */
1628         msg->lm_buflens[segment] = newsize;
1629 }
1630 EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1631
1632 /**
1633  * Used by ptlrpc client to enlarge the \a segment of request message pointed
1634  * by req->rq_reqmsg to size \a newsize, all previously filled-in data will be
1635  * preserved after the enlargement. this must be called after original request
1636  * buffer being allocated.
1637  *
1638  * \note after this be called, rq_reqmsg and rq_reqlen might have been changed,
1639  * so caller should refresh its local pointers if needed.
1640  */
1641 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1642                                int segment, int newsize)
1643 {
1644         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1645         struct ptlrpc_sec_cops   *cops;
1646         struct lustre_msg       *msg = req->rq_reqmsg;
1647
1648         LASSERT(ctx);
1649         LASSERT(msg);
1650         LASSERT(msg->lm_bufcount > segment);
1651         LASSERT(msg->lm_buflens[segment] <= newsize);
1652
1653         if (msg->lm_buflens[segment] == newsize)
1654                 return 0;
1655
1656         cops = ctx->cc_sec->ps_policy->sp_cops;
1657         LASSERT(cops->enlarge_reqbuf);
1658         return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1659 }
1660 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1661
1662 /**
1663  * Used by ptlrpc client to allocate reply buffer of \a req.
1664  *
1665  * \note After this, req->rq_repmsg is still not accessible.
1666  */
1667 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1668 {
1669         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1670         struct ptlrpc_sec_policy *policy;
1671         ENTRY;
1672
1673         LASSERT(ctx);
1674         LASSERT(ctx->cc_sec);
1675         LASSERT(ctx->cc_sec->ps_policy);
1676
1677         if (req->rq_repbuf)
1678                 RETURN(0);
1679
1680         policy = ctx->cc_sec->ps_policy;
1681         RETURN(policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize));
1682 }
1683
1684 /**
1685  * Used by ptlrpc client to free reply buffer of \a req. After this
1686  * req->rq_repmsg is set to NULL and should not be accessed anymore.
1687  */
1688 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1689 {
1690         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1691         struct ptlrpc_sec_policy *policy;
1692         ENTRY;
1693
1694         LASSERT(ctx);
1695         LASSERT(ctx->cc_sec);
1696         LASSERT(ctx->cc_sec->ps_policy);
1697         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1698
1699         if (req->rq_repbuf == NULL)
1700                 return;
1701         LASSERT(req->rq_repbuf_len);
1702
1703         policy = ctx->cc_sec->ps_policy;
1704         policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1705         req->rq_repmsg = NULL;
1706         EXIT;
1707 }
1708
1709 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1710                                 struct ptlrpc_cli_ctx *ctx)
1711 {
1712         struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1713
1714         if (!policy->sp_cops->install_rctx)
1715                 return 0;
1716         return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1717 }
1718
1719 int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1720                                 struct ptlrpc_svc_ctx *ctx)
1721 {
1722         struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1723
1724         if (!policy->sp_sops->install_rctx)
1725                 return 0;
1726         return policy->sp_sops->install_rctx(imp, ctx);
1727 }
1728
1729 /****************************************
1730  * server side security          *
1731  ****************************************/
1732
1733 static int flavor_allowed(struct sptlrpc_flavor *exp,
1734                           struct ptlrpc_request *req)
1735 {
1736         struct sptlrpc_flavor *flvr = &req->rq_flvr;
1737
1738         if (exp->sf_rpc == SPTLRPC_FLVR_ANY || exp->sf_rpc == flvr->sf_rpc)
1739                 return 1;
1740
1741         if ((req->rq_ctx_init || req->rq_ctx_fini) &&
1742             SPTLRPC_FLVR_POLICY(exp->sf_rpc) ==
1743             SPTLRPC_FLVR_POLICY(flvr->sf_rpc) &&
1744             SPTLRPC_FLVR_MECH(exp->sf_rpc) == SPTLRPC_FLVR_MECH(flvr->sf_rpc))
1745                 return 1;
1746
1747         return 0;
1748 }
1749
1750 #define EXP_FLVR_UPDATE_EXPIRE      (OBD_TIMEOUT_DEFAULT + 10)
1751
1752 /**
1753  * Given an export \a exp, check whether the flavor of incoming \a req
1754  * is allowed by the export \a exp. Main logic is about taking care of
1755  * changing configurations. Return 0 means success.
1756  */
1757 int sptlrpc_target_export_check(struct obd_export *exp,
1758                                 struct ptlrpc_request *req)
1759 {
1760         struct sptlrpc_flavor   flavor;
1761
1762         if (exp == NULL)
1763                 return 0;
1764
1765         /* client side export has no imp_reverse, skip
1766          * FIXME maybe we should check flavor this as well??? */
1767         if (exp->exp_imp_reverse == NULL)
1768                 return 0;
1769
1770         /* don't care about ctx fini rpc */
1771         if (req->rq_ctx_fini)
1772                 return 0;
1773
1774         spin_lock(&exp->exp_lock);
1775
1776         /* if flavor just changed (exp->exp_flvr_changed != 0), we wait for
1777          * the first req with the new flavor, then treat it as current flavor,
1778          * adapt reverse sec according to it.
1779          * note the first rpc with new flavor might not be with root ctx, in
1780          * which case delay the sec_adapt by leaving exp_flvr_adapt == 1. */
1781         if (unlikely(exp->exp_flvr_changed) &&
1782             flavor_allowed(&exp->exp_flvr_old[1], req)) {
1783                 /* make the new flavor as "current", and old ones as
1784                  * about-to-expire */
1785                 CDEBUG(D_SEC, "exp %p: just changed: %x->%x\n", exp,
1786                        exp->exp_flvr.sf_rpc, exp->exp_flvr_old[1].sf_rpc);
1787                 flavor = exp->exp_flvr_old[1];
1788                 exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
1789                 exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
1790                 exp->exp_flvr_old[0] = exp->exp_flvr;
1791                 exp->exp_flvr_expire[0] = cfs_time_current_sec() +
1792                                           EXP_FLVR_UPDATE_EXPIRE;
1793                 exp->exp_flvr = flavor;
1794
1795                 /* flavor change finished */
1796                 exp->exp_flvr_changed = 0;
1797                 LASSERT(exp->exp_flvr_adapt == 1);
1798
1799                 /* if it's gss, we only interested in root ctx init */
1800                 if (req->rq_auth_gss &&
1801                     !(req->rq_ctx_init &&
1802                       (req->rq_auth_usr_root || req->rq_auth_usr_mdt ||
1803                        req->rq_auth_usr_ost))) {
1804                         spin_unlock(&exp->exp_lock);
1805                         CDEBUG(D_SEC, "is good but not root(%d:%d:%d:%d:%d)\n",
1806                                req->rq_auth_gss, req->rq_ctx_init,
1807                                req->rq_auth_usr_root, req->rq_auth_usr_mdt,
1808                                req->rq_auth_usr_ost);
1809                         return 0;
1810                 }
1811
1812                 exp->exp_flvr_adapt = 0;
1813                 spin_unlock(&exp->exp_lock);
1814
1815                 return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1816                                                 req->rq_svc_ctx, &flavor);
1817         }
1818
1819         /* if it equals to the current flavor, we accept it, but need to
1820          * dealing with reverse sec/ctx */
1821         if (likely(flavor_allowed(&exp->exp_flvr, req))) {
1822                 /* most cases should return here, we only interested in
1823                  * gss root ctx init */
1824                 if (!req->rq_auth_gss || !req->rq_ctx_init ||
1825                     (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1826                      !req->rq_auth_usr_ost)) {
1827                         spin_unlock(&exp->exp_lock);
1828                         return 0;
1829                 }
1830
1831                 /* if flavor just changed, we should not proceed, just leave
1832                  * it and current flavor will be discovered and replaced
1833                  * shortly, and let _this_ rpc pass through */
1834                 if (exp->exp_flvr_changed) {
1835                         LASSERT(exp->exp_flvr_adapt);
1836                         spin_unlock(&exp->exp_lock);
1837                         return 0;
1838                 }
1839
1840                 if (exp->exp_flvr_adapt) {
1841                         exp->exp_flvr_adapt = 0;
1842                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): do delayed adapt\n",
1843                                exp, exp->exp_flvr.sf_rpc,
1844                                exp->exp_flvr_old[0].sf_rpc,
1845                                exp->exp_flvr_old[1].sf_rpc);
1846                         flavor = exp->exp_flvr;
1847                         spin_unlock(&exp->exp_lock);
1848
1849                         return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1850                                                         req->rq_svc_ctx,
1851                                                         &flavor);
1852                 } else {
1853                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
1854                                "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
1855                                exp->exp_flvr_old[0].sf_rpc,
1856                                exp->exp_flvr_old[1].sf_rpc);
1857                         spin_unlock(&exp->exp_lock);
1858
1859                         return sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse,
1860                                                            req->rq_svc_ctx);
1861                 }
1862         }
1863
1864         if (exp->exp_flvr_expire[0]) {
1865                 if (exp->exp_flvr_expire[0] >= cfs_time_current_sec()) {
1866                         if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
1867                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1868                                        "middle one ("CFS_DURATION_T")\n", exp,
1869                                        exp->exp_flvr.sf_rpc,
1870                                        exp->exp_flvr_old[0].sf_rpc,
1871                                        exp->exp_flvr_old[1].sf_rpc,
1872                                        exp->exp_flvr_expire[0] -
1873                                                 cfs_time_current_sec());
1874                                 spin_unlock(&exp->exp_lock);
1875                                 return 0;
1876                         }
1877                 } else {
1878                         CDEBUG(D_SEC, "mark middle expired\n");
1879                         exp->exp_flvr_expire[0] = 0;
1880                 }
1881                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match middle\n", exp,
1882                        exp->exp_flvr.sf_rpc,
1883                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1884                        req->rq_flvr.sf_rpc);
1885         }
1886
1887         /* now it doesn't match the current flavor, the only chance we can
1888          * accept it is match the old flavors which is not expired. */
1889         if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
1890                 if (exp->exp_flvr_expire[1] >= cfs_time_current_sec()) {
1891                         if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
1892                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1893                                        "oldest one ("CFS_DURATION_T")\n", exp,
1894                                        exp->exp_flvr.sf_rpc,
1895                                        exp->exp_flvr_old[0].sf_rpc,
1896                                        exp->exp_flvr_old[1].sf_rpc,
1897                                        exp->exp_flvr_expire[1] -
1898                                                 cfs_time_current_sec());
1899                                 spin_unlock(&exp->exp_lock);
1900                                 return 0;
1901                         }
1902                 } else {
1903                         CDEBUG(D_SEC, "mark oldest expired\n");
1904                         exp->exp_flvr_expire[1] = 0;
1905                 }
1906                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match found\n",
1907                        exp, exp->exp_flvr.sf_rpc,
1908                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1909                        req->rq_flvr.sf_rpc);
1910         } else {
1911                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): skip the last one\n",
1912                        exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc,
1913                        exp->exp_flvr_old[1].sf_rpc);
1914         }
1915
1916         spin_unlock(&exp->exp_lock);
1917
1918         CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with "
1919               "unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n",
1920               exp, exp->exp_obd->obd_name,
1921               req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
1922               req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost,
1923               req->rq_flvr.sf_rpc,
1924               exp->exp_flvr.sf_rpc,
1925               exp->exp_flvr_old[0].sf_rpc,
1926               exp->exp_flvr_expire[0] ?
1927               (unsigned long) (exp->exp_flvr_expire[0] -
1928                                cfs_time_current_sec()) : 0,
1929               exp->exp_flvr_old[1].sf_rpc,
1930               exp->exp_flvr_expire[1] ?
1931               (unsigned long) (exp->exp_flvr_expire[1] -
1932                                cfs_time_current_sec()) : 0);
1933         return -EACCES;
1934 }
1935 EXPORT_SYMBOL(sptlrpc_target_export_check);
1936
1937 void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
1938                                       struct sptlrpc_rule_set *rset)
1939 {
1940         struct obd_export       *exp;
1941         struct sptlrpc_flavor    new_flvr;
1942
1943         LASSERT(obd);
1944
1945         spin_lock(&obd->obd_dev_lock);
1946
1947         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1948                 if (exp->exp_connection == NULL)
1949                         continue;
1950
1951                 /* note if this export had just been updated flavor
1952                  * (exp_flvr_changed == 1), this will override the
1953                  * previous one. */
1954                 spin_lock(&exp->exp_lock);
1955                 sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer,
1956                                              exp->exp_connection->c_peer.nid,
1957                                              &new_flvr);
1958                 if (exp->exp_flvr_changed ||
1959                     !flavor_equal(&new_flvr, &exp->exp_flvr)) {
1960                         exp->exp_flvr_old[1] = new_flvr;
1961                         exp->exp_flvr_expire[1] = 0;
1962                         exp->exp_flvr_changed = 1;
1963                         exp->exp_flvr_adapt = 1;
1964
1965                         CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
1966                                exp, sptlrpc_part2name(exp->exp_sp_peer),
1967                                exp->exp_flvr.sf_rpc,
1968                                exp->exp_flvr_old[1].sf_rpc);
1969                 }
1970                 spin_unlock(&exp->exp_lock);
1971         }
1972
1973         spin_unlock(&obd->obd_dev_lock);
1974 }
1975 EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
1976
1977 static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
1978 {
1979         /* peer's claim is unreliable unless gss is being used */
1980         if (!req->rq_auth_gss || svc_rc == SECSVC_DROP)
1981                 return svc_rc;
1982
1983         switch (req->rq_sp_from) {
1984         case LUSTRE_SP_CLI:
1985                 if (req->rq_auth_usr_mdt || req->rq_auth_usr_ost) {
1986                         DEBUG_REQ(D_ERROR, req, "faked source CLI");
1987                         svc_rc = SECSVC_DROP;
1988                 }
1989                 break;
1990         case LUSTRE_SP_MDT:
1991                 if (!req->rq_auth_usr_mdt) {
1992                         DEBUG_REQ(D_ERROR, req, "faked source MDT");
1993                         svc_rc = SECSVC_DROP;
1994                 }
1995                 break;
1996         case LUSTRE_SP_OST:
1997                 if (!req->rq_auth_usr_ost) {
1998                         DEBUG_REQ(D_ERROR, req, "faked source OST");
1999                         svc_rc = SECSVC_DROP;
2000                 }
2001                 break;
2002         case LUSTRE_SP_MGS:
2003         case LUSTRE_SP_MGC:
2004                 if (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
2005                     !req->rq_auth_usr_ost) {
2006                         DEBUG_REQ(D_ERROR, req, "faked source MGC/MGS");
2007                         svc_rc = SECSVC_DROP;
2008                 }
2009                 break;
2010         case LUSTRE_SP_ANY:
2011         default:
2012                 DEBUG_REQ(D_ERROR, req, "invalid source %u", req->rq_sp_from);
2013                 svc_rc = SECSVC_DROP;
2014         }
2015
2016         return svc_rc;
2017 }
2018
2019 /**
2020  * Used by ptlrpc server, to perform transformation upon request message of
2021  * incoming \a req. This must be the first thing to do with a incoming
2022  * request in ptlrpc layer.
2023  *
2024  * \retval SECSVC_OK success, and req->rq_reqmsg point to request message in
2025  * clear text, size is req->rq_reqlen; also req->rq_svc_ctx is set.
2026  * \retval SECSVC_COMPLETE success, the request has been fully processed, and
2027  * reply message has been prepared.
2028  * \retval SECSVC_DROP failed, this request should be dropped.
2029  */
2030 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
2031 {
2032         struct ptlrpc_sec_policy *policy;
2033         struct lustre_msg       *msg = req->rq_reqbuf;
2034         int                    rc;
2035         ENTRY;
2036
2037         LASSERT(msg);
2038         LASSERT(req->rq_reqmsg == NULL);
2039         LASSERT(req->rq_repmsg == NULL);
2040         LASSERT(req->rq_svc_ctx == NULL);
2041
2042         req->rq_req_swab_mask = 0;
2043
2044         rc = __lustre_unpack_msg(msg, req->rq_reqdata_len);
2045         switch (rc) {
2046         case 1:
2047                 lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
2048         case 0:
2049                 break;
2050         default:
2051                 CERROR("error unpacking request from %s x"LPU64"\n",
2052                        libcfs_id2str(req->rq_peer), req->rq_xid);
2053                 RETURN(SECSVC_DROP);
2054         }
2055
2056         req->rq_flvr.sf_rpc = WIRE_FLVR(msg->lm_secflvr);
2057         req->rq_sp_from = LUSTRE_SP_ANY;
2058         req->rq_auth_uid = INVALID_UID;
2059         req->rq_auth_mapped_uid = INVALID_UID;
2060
2061         policy = sptlrpc_wireflavor2policy(req->rq_flvr.sf_rpc);
2062         if (!policy) {
2063                 CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
2064                 RETURN(SECSVC_DROP);
2065         }
2066
2067         LASSERT(policy->sp_sops->accept);
2068         rc = policy->sp_sops->accept(req);
2069         sptlrpc_policy_put(policy);
2070         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
2071         LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
2072
2073         /*
2074          * if it's not null flavor (which means embedded packing msg),
2075          * reset the swab mask for the comming inner msg unpacking.
2076          */
2077         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL)
2078                 req->rq_req_swab_mask = 0;
2079
2080         /* sanity check for the request source */
2081         rc = sptlrpc_svc_check_from(req, rc);
2082         RETURN(rc);
2083 }
2084
2085 /**
2086  * Used by ptlrpc server, to allocate reply buffer for \a req. If succeed,
2087  * req->rq_reply_state is set, and req->rq_reply_state->rs_msg point to
2088  * a buffer of \a msglen size.
2089  */
2090 int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
2091 {
2092         struct ptlrpc_sec_policy *policy;
2093         struct ptlrpc_reply_state *rs;
2094         int rc;
2095         ENTRY;
2096
2097         LASSERT(req->rq_svc_ctx);
2098         LASSERT(req->rq_svc_ctx->sc_policy);
2099
2100         policy = req->rq_svc_ctx->sc_policy;
2101         LASSERT(policy->sp_sops->alloc_rs);
2102
2103         rc = policy->sp_sops->alloc_rs(req, msglen);
2104         if (unlikely(rc == -ENOMEM)) {
2105                 /* failed alloc, try emergency pool */
2106                 rs = lustre_get_emerg_rs(req->rq_rqbd->rqbd_svcpt);
2107                 if (rs == NULL)
2108                         RETURN(-ENOMEM);
2109
2110                 req->rq_reply_state = rs;
2111                 rc = policy->sp_sops->alloc_rs(req, msglen);
2112                 if (rc) {
2113                         lustre_put_emerg_rs(rs);
2114                         req->rq_reply_state = NULL;
2115                 }
2116         }
2117
2118         LASSERT(rc != 0 ||
2119                 (req->rq_reply_state && req->rq_reply_state->rs_msg));
2120
2121         RETURN(rc);
2122 }
2123
2124 /**
2125  * Used by ptlrpc server, to perform transformation upon reply message.
2126  *
2127  * \post req->rq_reply_off is set to approriate server-controlled reply offset.
2128  * \post req->rq_repmsg and req->rq_reply_state->rs_msg becomes inaccessible.
2129  */
2130 int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
2131 {
2132         struct ptlrpc_sec_policy *policy;
2133         int rc;
2134         ENTRY;
2135
2136         LASSERT(req->rq_svc_ctx);
2137         LASSERT(req->rq_svc_ctx->sc_policy);
2138
2139         policy = req->rq_svc_ctx->sc_policy;
2140         LASSERT(policy->sp_sops->authorize);
2141
2142         rc = policy->sp_sops->authorize(req);
2143         LASSERT(rc || req->rq_reply_state->rs_repdata_len);
2144
2145         RETURN(rc);
2146 }
2147
2148 /**
2149  * Used by ptlrpc server, to free reply_state.
2150  */
2151 void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
2152 {
2153         struct ptlrpc_sec_policy *policy;
2154         unsigned int prealloc;
2155         ENTRY;
2156
2157         LASSERT(rs->rs_svc_ctx);
2158         LASSERT(rs->rs_svc_ctx->sc_policy);
2159
2160         policy = rs->rs_svc_ctx->sc_policy;
2161         LASSERT(policy->sp_sops->free_rs);
2162
2163         prealloc = rs->rs_prealloc;
2164         policy->sp_sops->free_rs(rs);
2165
2166         if (prealloc)
2167                 lustre_put_emerg_rs(rs);
2168         EXIT;
2169 }
2170
2171 void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
2172 {
2173         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2174
2175         if (ctx != NULL)
2176                 atomic_inc(&ctx->sc_refcount);
2177 }
2178
2179 void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
2180 {
2181         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2182
2183         if (ctx == NULL)
2184                 return;
2185
2186         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2187         if (atomic_dec_and_test(&ctx->sc_refcount)) {
2188                 if (ctx->sc_policy->sp_sops->free_ctx)
2189                         ctx->sc_policy->sp_sops->free_ctx(ctx);
2190         }
2191         req->rq_svc_ctx = NULL;
2192 }
2193
2194 void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
2195 {
2196         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2197
2198         if (ctx == NULL)
2199                 return;
2200
2201         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2202         if (ctx->sc_policy->sp_sops->invalidate_ctx)
2203                 ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
2204 }
2205 EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
2206
2207 /****************************************
2208  * bulk security                        *
2209  ****************************************/
2210
2211 /**
2212  * Perform transformation upon bulk data pointed by \a desc. This is called
2213  * before transforming the request message.
2214  */
2215 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
2216                           struct ptlrpc_bulk_desc *desc)
2217 {
2218         struct ptlrpc_cli_ctx *ctx;
2219
2220         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2221
2222         if (!req->rq_pack_bulk)
2223                 return 0;
2224
2225         ctx = req->rq_cli_ctx;
2226         if (ctx->cc_ops->wrap_bulk)
2227                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
2228         return 0;
2229 }
2230 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
2231
2232 /**
2233  * This is called after unwrap the reply message.
2234  * return nob of actual plain text size received, or error code.
2235  */
2236 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
2237                                  struct ptlrpc_bulk_desc *desc,
2238                                  int nob)
2239 {
2240         struct ptlrpc_cli_ctx  *ctx;
2241         int                  rc;
2242
2243         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
2244
2245         if (!req->rq_pack_bulk)
2246                 return desc->bd_nob_transferred;
2247
2248         ctx = req->rq_cli_ctx;
2249         if (ctx->cc_ops->unwrap_bulk) {
2250                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2251                 if (rc < 0)
2252                         return rc;
2253         }
2254         return desc->bd_nob_transferred;
2255 }
2256 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
2257
2258 /**
2259  * This is called after unwrap the reply message.
2260  * return 0 for success or error code.
2261  */
2262 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
2263                                   struct ptlrpc_bulk_desc *desc)
2264 {
2265         struct ptlrpc_cli_ctx  *ctx;
2266         int                  rc;
2267
2268         LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
2269
2270         if (!req->rq_pack_bulk)
2271                 return 0;
2272
2273         ctx = req->rq_cli_ctx;
2274         if (ctx->cc_ops->unwrap_bulk) {
2275                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2276                 if (rc < 0)
2277                         return rc;
2278         }
2279
2280         /*
2281          * if everything is going right, nob should equals to nob_transferred.
2282          * in case of privacy mode, nob_transferred needs to be adjusted.
2283          */
2284         if (desc->bd_nob != desc->bd_nob_transferred) {
2285                 CERROR("nob %d doesn't match transferred nob %d",
2286                        desc->bd_nob, desc->bd_nob_transferred);
2287                 return -EPROTO;
2288         }
2289
2290         return 0;
2291 }
2292 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
2293
2294
2295 /****************************************
2296  * user descriptor helpers            *
2297  ****************************************/
2298
2299 int sptlrpc_current_user_desc_size(void)
2300 {
2301         int ngroups;
2302
2303         ngroups = current_ngroups;
2304
2305         if (ngroups > LUSTRE_MAX_GROUPS)
2306                 ngroups = LUSTRE_MAX_GROUPS;
2307         return sptlrpc_user_desc_size(ngroups);
2308 }
2309 EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
2310
2311 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
2312 {
2313         struct ptlrpc_user_desc *pud;
2314
2315         pud = lustre_msg_buf(msg, offset, 0);
2316
2317         pud->pud_uid = current_uid();
2318         pud->pud_gid = current_gid();
2319         pud->pud_fsuid = current_fsuid();
2320         pud->pud_fsgid = current_fsgid();
2321         pud->pud_cap = cfs_curproc_cap_pack();
2322         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
2323
2324         task_lock(current);
2325         if (pud->pud_ngroups > current_ngroups)
2326                 pud->pud_ngroups = current_ngroups;
2327         memcpy(pud->pud_groups, current_cred()->group_info->blocks[0],
2328                pud->pud_ngroups * sizeof(__u32));
2329         task_unlock(current);
2330
2331         return 0;
2332 }
2333 EXPORT_SYMBOL(sptlrpc_pack_user_desc);
2334
2335 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset, int swabbed)
2336 {
2337         struct ptlrpc_user_desc *pud;
2338         int                   i;
2339
2340         pud = lustre_msg_buf(msg, offset, sizeof(*pud));
2341         if (!pud)
2342                 return -EINVAL;
2343
2344         if (swabbed) {
2345                 __swab32s(&pud->pud_uid);
2346                 __swab32s(&pud->pud_gid);
2347                 __swab32s(&pud->pud_fsuid);
2348                 __swab32s(&pud->pud_fsgid);
2349                 __swab32s(&pud->pud_cap);
2350                 __swab32s(&pud->pud_ngroups);
2351         }
2352
2353         if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
2354                 CERROR("%u groups is too large\n", pud->pud_ngroups);
2355                 return -EINVAL;
2356         }
2357
2358         if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
2359             msg->lm_buflens[offset]) {
2360                 CERROR("%u groups are claimed but bufsize only %u\n",
2361                        pud->pud_ngroups, msg->lm_buflens[offset]);
2362                 return -EINVAL;
2363         }
2364
2365         if (swabbed) {
2366                 for (i = 0; i < pud->pud_ngroups; i++)
2367                         __swab32s(&pud->pud_groups[i]);
2368         }
2369
2370         return 0;
2371 }
2372 EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
2373
2374 /****************************************
2375  * misc helpers                  *
2376  ****************************************/
2377
2378 const char * sec2target_str(struct ptlrpc_sec *sec)
2379 {
2380         if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
2381                 return "*";
2382         if (sec_is_reverse(sec))
2383                 return "c";
2384         return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
2385 }
2386 EXPORT_SYMBOL(sec2target_str);
2387
2388 /*
2389  * return true if the bulk data is protected
2390  */
2391 int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
2392 {
2393         switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
2394         case SPTLRPC_BULK_SVC_INTG:
2395         case SPTLRPC_BULK_SVC_PRIV:
2396                 return 1;
2397         default:
2398                 return 0;
2399         }
2400 }
2401 EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
2402
2403 /****************************************
2404  * crypto API helper/alloc blkciper     *
2405  ****************************************/
2406
2407 /****************************************
2408  * initialize/finalize            *
2409  ****************************************/
2410
2411 int sptlrpc_init(void)
2412 {
2413         int rc;
2414
2415         rwlock_init(&policy_lock);
2416
2417         rc = sptlrpc_gc_init();
2418         if (rc)
2419                 goto out;
2420
2421         rc = sptlrpc_conf_init();
2422         if (rc)
2423                 goto out_gc;
2424
2425         rc = sptlrpc_enc_pool_init();
2426         if (rc)
2427                 goto out_conf;
2428
2429         rc = sptlrpc_null_init();
2430         if (rc)
2431                 goto out_pool;
2432
2433         rc = sptlrpc_plain_init();
2434         if (rc)
2435                 goto out_null;
2436
2437         rc = sptlrpc_lproc_init();
2438         if (rc)
2439                 goto out_plain;
2440
2441         return 0;
2442
2443 out_plain:
2444         sptlrpc_plain_fini();
2445 out_null:
2446         sptlrpc_null_fini();
2447 out_pool:
2448         sptlrpc_enc_pool_fini();
2449 out_conf:
2450         sptlrpc_conf_fini();
2451 out_gc:
2452         sptlrpc_gc_fini();
2453 out:
2454         return rc;
2455 }
2456
2457 void sptlrpc_fini(void)
2458 {
2459         sptlrpc_lproc_fini();
2460         sptlrpc_plain_fini();
2461         sptlrpc_null_fini();
2462         sptlrpc_enc_pool_fini();
2463         sptlrpc_conf_fini();
2464         sptlrpc_gc_fini();
2465 }