Merge tag 'tty-3.17-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/gregkh/tty
[cascardo/linux.git] / drivers / staging / lustre / lustre / ptlrpc / sec.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/sec.c
37  *
38  * Author: Eric Mei <ericm@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_SEC
42
43 #include "../../include/linux/libcfs/libcfs.h"
44 #include <linux/crypto.h>
45 #include <linux/key.h>
46
47 #include "../include/obd.h"
48 #include "../include/obd_class.h"
49 #include "../include/obd_support.h"
50 #include "../include/lustre_net.h"
51 #include "../include/lustre_import.h"
52 #include "../include/lustre_dlm.h"
53 #include "../include/lustre_sec.h"
54
55 #include "ptlrpc_internal.h"
56
57 /***********************************************
58  * policy registers                         *
59  ***********************************************/
60
61 static rwlock_t policy_lock;
62 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
63         NULL,
64 };
65
66 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
67 {
68         __u16 number = policy->sp_policy;
69
70         LASSERT(policy->sp_name);
71         LASSERT(policy->sp_cops);
72         LASSERT(policy->sp_sops);
73
74         if (number >= SPTLRPC_POLICY_MAX)
75                 return -EINVAL;
76
77         write_lock(&policy_lock);
78         if (unlikely(policies[number])) {
79                 write_unlock(&policy_lock);
80                 return -EALREADY;
81         }
82         policies[number] = policy;
83         write_unlock(&policy_lock);
84
85         CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
86         return 0;
87 }
88 EXPORT_SYMBOL(sptlrpc_register_policy);
89
90 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
91 {
92         __u16 number = policy->sp_policy;
93
94         LASSERT(number < SPTLRPC_POLICY_MAX);
95
96         write_lock(&policy_lock);
97         if (unlikely(policies[number] == NULL)) {
98                 write_unlock(&policy_lock);
99                 CERROR("%s: already unregistered\n", policy->sp_name);
100                 return -EINVAL;
101         }
102
103         LASSERT(policies[number] == policy);
104         policies[number] = NULL;
105         write_unlock(&policy_lock);
106
107         CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
108         return 0;
109 }
110 EXPORT_SYMBOL(sptlrpc_unregister_policy);
111
112 static
113 struct ptlrpc_sec_policy * sptlrpc_wireflavor2policy(__u32 flavor)
114 {
115         static DEFINE_MUTEX(load_mutex);
116         static atomic_t       loaded = ATOMIC_INIT(0);
117         struct ptlrpc_sec_policy *policy;
118         __u16                number = SPTLRPC_FLVR_POLICY(flavor);
119         __u16                flag = 0;
120
121         if (number >= SPTLRPC_POLICY_MAX)
122                 return NULL;
123
124         while (1) {
125                 read_lock(&policy_lock);
126                 policy = policies[number];
127                 if (policy && !try_module_get(policy->sp_owner))
128                         policy = NULL;
129                 if (policy == NULL)
130                         flag = atomic_read(&loaded);
131                 read_unlock(&policy_lock);
132
133                 if (policy != NULL || flag != 0 ||
134                     number != SPTLRPC_POLICY_GSS)
135                         break;
136
137                 /* try to load gss module, once */
138                 mutex_lock(&load_mutex);
139                 if (atomic_read(&loaded) == 0) {
140                         if (request_module("ptlrpc_gss") == 0)
141                                 CDEBUG(D_SEC,
142                                        "module ptlrpc_gss loaded on demand\n");
143                         else
144                                 CERROR("Unable to load module ptlrpc_gss\n");
145
146                         atomic_set(&loaded, 1);
147                 }
148                 mutex_unlock(&load_mutex);
149         }
150
151         return policy;
152 }
153
154 __u32 sptlrpc_name2flavor_base(const char *name)
155 {
156         if (!strcmp(name, "null"))
157                 return SPTLRPC_FLVR_NULL;
158         if (!strcmp(name, "plain"))
159                 return SPTLRPC_FLVR_PLAIN;
160         if (!strcmp(name, "krb5n"))
161                 return SPTLRPC_FLVR_KRB5N;
162         if (!strcmp(name, "krb5a"))
163                 return SPTLRPC_FLVR_KRB5A;
164         if (!strcmp(name, "krb5i"))
165                 return SPTLRPC_FLVR_KRB5I;
166         if (!strcmp(name, "krb5p"))
167                 return SPTLRPC_FLVR_KRB5P;
168
169         return SPTLRPC_FLVR_INVALID;
170 }
171 EXPORT_SYMBOL(sptlrpc_name2flavor_base);
172
173 const char *sptlrpc_flavor2name_base(__u32 flvr)
174 {
175         __u32   base = SPTLRPC_FLVR_BASE(flvr);
176
177         if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL))
178                 return "null";
179         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN))
180                 return "plain";
181         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5N))
182                 return "krb5n";
183         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5A))
184                 return "krb5a";
185         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5I))
186                 return "krb5i";
187         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5P))
188                 return "krb5p";
189
190         CERROR("invalid wire flavor 0x%x\n", flvr);
191         return "invalid";
192 }
193 EXPORT_SYMBOL(sptlrpc_flavor2name_base);
194
195 char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
196                                char *buf, int bufsize)
197 {
198         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN)
199                 snprintf(buf, bufsize, "hash:%s",
200                          sptlrpc_get_hash_name(sf->u_bulk.hash.hash_alg));
201         else
202                 snprintf(buf, bufsize, "%s",
203                          sptlrpc_flavor2name_base(sf->sf_rpc));
204
205         buf[bufsize - 1] = '\0';
206         return buf;
207 }
208 EXPORT_SYMBOL(sptlrpc_flavor2name_bulk);
209
210 char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
211 {
212         snprintf(buf, bufsize, "%s", sptlrpc_flavor2name_base(sf->sf_rpc));
213
214         /*
215          * currently we don't support customized bulk specification for
216          * flavors other than plain
217          */
218         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN) {
219                 char bspec[16];
220
221                 bspec[0] = '-';
222                 sptlrpc_flavor2name_bulk(sf, &bspec[1], sizeof(bspec) - 1);
223                 strncat(buf, bspec, bufsize);
224         }
225
226         buf[bufsize - 1] = '\0';
227         return buf;
228 }
229 EXPORT_SYMBOL(sptlrpc_flavor2name);
230
231 char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
232 {
233         buf[0] = '\0';
234
235         if (flags & PTLRPC_SEC_FL_REVERSE)
236                 strlcat(buf, "reverse,", bufsize);
237         if (flags & PTLRPC_SEC_FL_ROOTONLY)
238                 strlcat(buf, "rootonly,", bufsize);
239         if (flags & PTLRPC_SEC_FL_UDESC)
240                 strlcat(buf, "udesc,", bufsize);
241         if (flags & PTLRPC_SEC_FL_BULK)
242                 strlcat(buf, "bulk,", bufsize);
243         if (buf[0] == '\0')
244                 strlcat(buf, "-,", bufsize);
245
246         return buf;
247 }
248 EXPORT_SYMBOL(sptlrpc_secflags2str);
249
250 /**************************************************
251  * client context APIs                      *
252  **************************************************/
253
254 static
255 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
256 {
257         struct vfs_cred vcred;
258         int create = 1, remove_dead = 1;
259
260         LASSERT(sec);
261         LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
262
263         if (sec->ps_flvr.sf_flags & (PTLRPC_SEC_FL_REVERSE |
264                                      PTLRPC_SEC_FL_ROOTONLY)) {
265                 vcred.vc_uid = 0;
266                 vcred.vc_gid = 0;
267                 if (sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_REVERSE) {
268                         create = 0;
269                         remove_dead = 0;
270                 }
271         } else {
272                 vcred.vc_uid = from_kuid(&init_user_ns, current_uid());
273                 vcred.vc_gid = from_kgid(&init_user_ns, current_gid());
274         }
275
276         return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
277                                                    create, remove_dead);
278 }
279
280 struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
281 {
282         atomic_inc(&ctx->cc_refcount);
283         return ctx;
284 }
285 EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
286
287 void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
288 {
289         struct ptlrpc_sec *sec = ctx->cc_sec;
290
291         LASSERT(sec);
292         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
293
294         if (!atomic_dec_and_test(&ctx->cc_refcount))
295                 return;
296
297         sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
298 }
299 EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
300
301 /**
302  * Expire the client context immediately.
303  *
304  * \pre Caller must hold at least 1 reference on the \a ctx.
305  */
306 void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
307 {
308         LASSERT(ctx->cc_ops->force_die);
309         ctx->cc_ops->force_die(ctx, 0);
310 }
311 EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
312
313 /**
314  * To wake up the threads who are waiting for this client context. Called
315  * after some status change happened on \a ctx.
316  */
317 void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
318 {
319         struct ptlrpc_request *req, *next;
320
321         spin_lock(&ctx->cc_lock);
322         list_for_each_entry_safe(req, next, &ctx->cc_req_list,
323                                      rq_ctx_chain) {
324                 list_del_init(&req->rq_ctx_chain);
325                 ptlrpc_client_wake_req(req);
326         }
327         spin_unlock(&ctx->cc_lock);
328 }
329 EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
330
331 int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
332 {
333         LASSERT(ctx->cc_ops);
334
335         if (ctx->cc_ops->display == NULL)
336                 return 0;
337
338         return ctx->cc_ops->display(ctx, buf, bufsize);
339 }
340
341 static int import_sec_check_expire(struct obd_import *imp)
342 {
343         int     adapt = 0;
344
345         spin_lock(&imp->imp_lock);
346         if (imp->imp_sec_expire &&
347             imp->imp_sec_expire < get_seconds()) {
348                 adapt = 1;
349                 imp->imp_sec_expire = 0;
350         }
351         spin_unlock(&imp->imp_lock);
352
353         if (!adapt)
354                 return 0;
355
356         CDEBUG(D_SEC, "found delayed sec adapt expired, do it now\n");
357         return sptlrpc_import_sec_adapt(imp, NULL, NULL);
358 }
359
360 static int import_sec_validate_get(struct obd_import *imp,
361                                    struct ptlrpc_sec **sec)
362 {
363         int     rc;
364
365         if (unlikely(imp->imp_sec_expire)) {
366                 rc = import_sec_check_expire(imp);
367                 if (rc)
368                         return rc;
369         }
370
371         *sec = sptlrpc_import_sec_ref(imp);
372         if (*sec == NULL) {
373                 CERROR("import %p (%s) with no sec\n",
374                        imp, ptlrpc_import_state_name(imp->imp_state));
375                 return -EACCES;
376         }
377
378         if (unlikely((*sec)->ps_dying)) {
379                 CERROR("attempt to use dying sec %p\n", sec);
380                 sptlrpc_sec_put(*sec);
381                 return -EACCES;
382         }
383
384         return 0;
385 }
386
387 /**
388  * Given a \a req, find or allocate a appropriate context for it.
389  * \pre req->rq_cli_ctx == NULL.
390  *
391  * \retval 0 succeed, and req->rq_cli_ctx is set.
392  * \retval -ev error number, and req->rq_cli_ctx == NULL.
393  */
394 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
395 {
396         struct obd_import *imp = req->rq_import;
397         struct ptlrpc_sec *sec;
398         int             rc;
399
400         LASSERT(!req->rq_cli_ctx);
401         LASSERT(imp);
402
403         rc = import_sec_validate_get(imp, &sec);
404         if (rc)
405                 return rc;
406
407         req->rq_cli_ctx = get_my_ctx(sec);
408
409         sptlrpc_sec_put(sec);
410
411         if (!req->rq_cli_ctx) {
412                 CERROR("req %p: fail to get context\n", req);
413                 return -ENOMEM;
414         }
415
416         return 0;
417 }
418
419 /**
420  * Drop the context for \a req.
421  * \pre req->rq_cli_ctx != NULL.
422  * \post req->rq_cli_ctx == NULL.
423  *
424  * If \a sync == 0, this function should return quickly without sleep;
425  * otherwise it might trigger and wait for the whole process of sending
426  * an context-destroying rpc to server.
427  */
428 void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
429 {
430         LASSERT(req);
431         LASSERT(req->rq_cli_ctx);
432
433         /* request might be asked to release earlier while still
434          * in the context waiting list.
435          */
436         if (!list_empty(&req->rq_ctx_chain)) {
437                 spin_lock(&req->rq_cli_ctx->cc_lock);
438                 list_del_init(&req->rq_ctx_chain);
439                 spin_unlock(&req->rq_cli_ctx->cc_lock);
440         }
441
442         sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
443         req->rq_cli_ctx = NULL;
444 }
445
446 static
447 int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
448                            struct ptlrpc_cli_ctx *oldctx,
449                            struct ptlrpc_cli_ctx *newctx)
450 {
451         struct sptlrpc_flavor   old_flvr;
452         char               *reqmsg = NULL; /* to workaround old gcc */
453         int                  reqmsg_size;
454         int                  rc = 0;
455
456         LASSERT(req->rq_reqmsg);
457         LASSERT(req->rq_reqlen);
458         LASSERT(req->rq_replen);
459
460         CDEBUG(D_SEC, "req %p: switch ctx %p(%u->%s) -> %p(%u->%s), "
461                "switch sec %p(%s) -> %p(%s)\n", req,
462                oldctx, oldctx->cc_vcred.vc_uid, sec2target_str(oldctx->cc_sec),
463                newctx, newctx->cc_vcred.vc_uid, sec2target_str(newctx->cc_sec),
464                oldctx->cc_sec, oldctx->cc_sec->ps_policy->sp_name,
465                newctx->cc_sec, newctx->cc_sec->ps_policy->sp_name);
466
467         /* save flavor */
468         old_flvr = req->rq_flvr;
469
470         /* save request message */
471         reqmsg_size = req->rq_reqlen;
472         if (reqmsg_size != 0) {
473                 OBD_ALLOC_LARGE(reqmsg, reqmsg_size);
474                 if (reqmsg == NULL)
475                         return -ENOMEM;
476                 memcpy(reqmsg, req->rq_reqmsg, reqmsg_size);
477         }
478
479         /* release old req/rep buf */
480         req->rq_cli_ctx = oldctx;
481         sptlrpc_cli_free_reqbuf(req);
482         sptlrpc_cli_free_repbuf(req);
483         req->rq_cli_ctx = newctx;
484
485         /* recalculate the flavor */
486         sptlrpc_req_set_flavor(req, 0);
487
488         /* alloc new request buffer
489          * we don't need to alloc reply buffer here, leave it to the
490          * rest procedure of ptlrpc */
491         if (reqmsg_size != 0) {
492                 rc = sptlrpc_cli_alloc_reqbuf(req, reqmsg_size);
493                 if (!rc) {
494                         LASSERT(req->rq_reqmsg);
495                         memcpy(req->rq_reqmsg, reqmsg, reqmsg_size);
496                 } else {
497                         CWARN("failed to alloc reqbuf: %d\n", rc);
498                         req->rq_flvr = old_flvr;
499                 }
500
501                 OBD_FREE_LARGE(reqmsg, reqmsg_size);
502         }
503         return rc;
504 }
505
506 /**
507  * If current context of \a req is dead somehow, e.g. we just switched flavor
508  * thus marked original contexts dead, we'll find a new context for it. if
509  * no switch is needed, \a req will end up with the same context.
510  *
511  * \note a request must have a context, to keep other parts of code happy.
512  * In any case of failure during the switching, we must restore the old one.
513  */
514 int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
515 {
516         struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
517         struct ptlrpc_cli_ctx *newctx;
518         int                 rc;
519
520         LASSERT(oldctx);
521
522         sptlrpc_cli_ctx_get(oldctx);
523         sptlrpc_req_put_ctx(req, 0);
524
525         rc = sptlrpc_req_get_ctx(req);
526         if (unlikely(rc)) {
527                 LASSERT(!req->rq_cli_ctx);
528
529                 /* restore old ctx */
530                 req->rq_cli_ctx = oldctx;
531                 return rc;
532         }
533
534         newctx = req->rq_cli_ctx;
535         LASSERT(newctx);
536
537         if (unlikely(newctx == oldctx &&
538                      test_bit(PTLRPC_CTX_DEAD_BIT, &oldctx->cc_flags))) {
539                 /*
540                  * still get the old dead ctx, usually means system too busy
541                  */
542                 CDEBUG(D_SEC,
543                        "ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
544                        newctx, newctx->cc_flags);
545
546                 set_current_state(TASK_INTERRUPTIBLE);
547                 schedule_timeout(HZ);
548         } else {
549                 /*
550                  * it's possible newctx == oldctx if we're switching
551                  * subflavor with the same sec.
552                  */
553                 rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
554                 if (rc) {
555                         /* restore old ctx */
556                         sptlrpc_req_put_ctx(req, 0);
557                         req->rq_cli_ctx = oldctx;
558                         return rc;
559                 }
560
561                 LASSERT(req->rq_cli_ctx == newctx);
562         }
563
564         sptlrpc_cli_ctx_put(oldctx, 1);
565         return 0;
566 }
567 EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
568
569 static
570 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
571 {
572         if (cli_ctx_is_refreshed(ctx))
573                 return 1;
574         return 0;
575 }
576
577 static
578 int ctx_refresh_timeout(void *data)
579 {
580         struct ptlrpc_request *req = data;
581         int rc;
582
583         /* conn_cnt is needed in expire_one_request */
584         lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
585
586         rc = ptlrpc_expire_one_request(req, 1);
587         /* if we started recovery, we should mark this ctx dead; otherwise
588          * in case of lgssd died nobody would retire this ctx, following
589          * connecting will still find the same ctx thus cause deadlock.
590          * there's an assumption that expire time of the request should be
591          * later than the context refresh expire time.
592          */
593         if (rc == 0)
594                 req->rq_cli_ctx->cc_ops->force_die(req->rq_cli_ctx, 0);
595         return rc;
596 }
597
598 static
599 void ctx_refresh_interrupt(void *data)
600 {
601         struct ptlrpc_request *req = data;
602
603         spin_lock(&req->rq_lock);
604         req->rq_intr = 1;
605         spin_unlock(&req->rq_lock);
606 }
607
608 static
609 void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
610 {
611         spin_lock(&ctx->cc_lock);
612         if (!list_empty(&req->rq_ctx_chain))
613                 list_del_init(&req->rq_ctx_chain);
614         spin_unlock(&ctx->cc_lock);
615 }
616
617 /**
618  * To refresh the context of \req, if it's not up-to-date.
619  * \param timeout
620  * - < 0: don't wait
621  * - = 0: wait until success or fatal error occur
622  * - > 0: timeout value (in seconds)
623  *
624  * The status of the context could be subject to be changed by other threads
625  * at any time. We allow this race, but once we return with 0, the caller will
626  * suppose it's uptodated and keep using it until the owning rpc is done.
627  *
628  * \retval 0 only if the context is uptodated.
629  * \retval -ev error number.
630  */
631 int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
632 {
633         struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
634         struct ptlrpc_sec      *sec;
635         struct l_wait_info      lwi;
636         int                  rc;
637
638         LASSERT(ctx);
639
640         if (req->rq_ctx_init || req->rq_ctx_fini)
641                 return 0;
642
643         /*
644          * during the process a request's context might change type even
645          * (e.g. from gss ctx to null ctx), so each loop we need to re-check
646          * everything
647          */
648 again:
649         rc = import_sec_validate_get(req->rq_import, &sec);
650         if (rc)
651                 return rc;
652
653         if (sec->ps_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
654                 CDEBUG(D_SEC, "req %p: flavor has changed %x -> %x\n",
655                       req, req->rq_flvr.sf_rpc, sec->ps_flvr.sf_rpc);
656                 req_off_ctx_list(req, ctx);
657                 sptlrpc_req_replace_dead_ctx(req);
658                 ctx = req->rq_cli_ctx;
659         }
660         sptlrpc_sec_put(sec);
661
662         if (cli_ctx_is_eternal(ctx))
663                 return 0;
664
665         if (unlikely(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags))) {
666                 LASSERT(ctx->cc_ops->refresh);
667                 ctx->cc_ops->refresh(ctx);
668         }
669         LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
670
671         LASSERT(ctx->cc_ops->validate);
672         if (ctx->cc_ops->validate(ctx) == 0) {
673                 req_off_ctx_list(req, ctx);
674                 return 0;
675         }
676
677         if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
678                 spin_lock(&req->rq_lock);
679                 req->rq_err = 1;
680                 spin_unlock(&req->rq_lock);
681                 req_off_ctx_list(req, ctx);
682                 return -EPERM;
683         }
684
685         /*
686          * There's a subtle issue for resending RPCs, suppose following
687          * situation:
688          *  1. the request was sent to server.
689          *  2. recovery was kicked start, after finished the request was
690          *     marked as resent.
691          *  3. resend the request.
692          *  4. old reply from server received, we accept and verify the reply.
693          *     this has to be success, otherwise the error will be aware
694          *     by application.
695          *  5. new reply from server received, dropped by LNet.
696          *
697          * Note the xid of old & new request is the same. We can't simply
698          * change xid for the resent request because the server replies on
699          * it for reply reconstruction.
700          *
701          * Commonly the original context should be uptodate because we
702          * have a expiry nice time; server will keep its context because
703          * we at least hold a ref of old context which prevent context
704          * destroying RPC being sent. So server still can accept the request
705          * and finish the RPC. But if that's not the case:
706          *  1. If server side context has been trimmed, a NO_CONTEXT will
707          *     be returned, gss_cli_ctx_verify/unseal will switch to new
708          *     context by force.
709          *  2. Current context never be refreshed, then we are fine: we
710          *     never really send request with old context before.
711          */
712         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
713             unlikely(req->rq_reqmsg) &&
714             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
715                 req_off_ctx_list(req, ctx);
716                 return 0;
717         }
718
719         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
720                 req_off_ctx_list(req, ctx);
721                 /*
722                  * don't switch ctx if import was deactivated
723                  */
724                 if (req->rq_import->imp_deactive) {
725                         spin_lock(&req->rq_lock);
726                         req->rq_err = 1;
727                         spin_unlock(&req->rq_lock);
728                         return -EINTR;
729                 }
730
731                 rc = sptlrpc_req_replace_dead_ctx(req);
732                 if (rc) {
733                         LASSERT(ctx == req->rq_cli_ctx);
734                         CERROR("req %p: failed to replace dead ctx %p: %d\n",
735                                req, ctx, rc);
736                         spin_lock(&req->rq_lock);
737                         req->rq_err = 1;
738                         spin_unlock(&req->rq_lock);
739                         return rc;
740                 }
741
742                 ctx = req->rq_cli_ctx;
743                 goto again;
744         }
745
746         /*
747          * Now we're sure this context is during upcall, add myself into
748          * waiting list
749          */
750         spin_lock(&ctx->cc_lock);
751         if (list_empty(&req->rq_ctx_chain))
752                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
753         spin_unlock(&ctx->cc_lock);
754
755         if (timeout < 0)
756                 return -EWOULDBLOCK;
757
758         /* Clear any flags that may be present from previous sends */
759         LASSERT(req->rq_receiving_reply == 0);
760         spin_lock(&req->rq_lock);
761         req->rq_err = 0;
762         req->rq_timedout = 0;
763         req->rq_resend = 0;
764         req->rq_restart = 0;
765         spin_unlock(&req->rq_lock);
766
767         lwi = LWI_TIMEOUT_INTR(timeout * HZ, ctx_refresh_timeout,
768                                ctx_refresh_interrupt, req);
769         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
770
771         /*
772          * following cases could lead us here:
773          * - successfully refreshed;
774          * - interrupted;
775          * - timedout, and we don't want recover from the failure;
776          * - timedout, and waked up upon recovery finished;
777          * - someone else mark this ctx dead by force;
778          * - someone invalidate the req and call ptlrpc_client_wake_req(),
779          *   e.g. ptlrpc_abort_inflight();
780          */
781         if (!cli_ctx_is_refreshed(ctx)) {
782                 /* timed out or interrupted */
783                 req_off_ctx_list(req, ctx);
784
785                 LASSERT(rc != 0);
786                 return rc;
787         }
788
789         goto again;
790 }
791
792 /**
793  * Initialize flavor settings for \a req, according to \a opcode.
794  *
795  * \note this could be called in two situations:
796  * - new request from ptlrpc_pre_req(), with proper @opcode
797  * - old request which changed ctx in the middle, with @opcode == 0
798  */
799 void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
800 {
801         struct ptlrpc_sec *sec;
802
803         LASSERT(req->rq_import);
804         LASSERT(req->rq_cli_ctx);
805         LASSERT(req->rq_cli_ctx->cc_sec);
806         LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
807
808         /* special security flags according to opcode */
809         switch (opcode) {
810         case OST_READ:
811         case MDS_READPAGE:
812         case MGS_CONFIG_READ:
813         case OBD_IDX_READ:
814                 req->rq_bulk_read = 1;
815                 break;
816         case OST_WRITE:
817         case MDS_WRITEPAGE:
818                 req->rq_bulk_write = 1;
819                 break;
820         case SEC_CTX_INIT:
821                 req->rq_ctx_init = 1;
822                 break;
823         case SEC_CTX_FINI:
824                 req->rq_ctx_fini = 1;
825                 break;
826         case 0:
827                 /* init/fini rpc won't be resend, so can't be here */
828                 LASSERT(req->rq_ctx_init == 0);
829                 LASSERT(req->rq_ctx_fini == 0);
830
831                 /* cleanup flags, which should be recalculated */
832                 req->rq_pack_udesc = 0;
833                 req->rq_pack_bulk = 0;
834                 break;
835         }
836
837         sec = req->rq_cli_ctx->cc_sec;
838
839         spin_lock(&sec->ps_lock);
840         req->rq_flvr = sec->ps_flvr;
841         spin_unlock(&sec->ps_lock);
842
843         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
844          * destruction rpc */
845         if (unlikely(req->rq_ctx_init))
846                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
847         else if (unlikely(req->rq_ctx_fini))
848                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
849
850         /* user descriptor flag, null security can't do it anyway */
851         if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
852             (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL))
853                 req->rq_pack_udesc = 1;
854
855         /* bulk security flag */
856         if ((req->rq_bulk_read || req->rq_bulk_write) &&
857             sptlrpc_flavor_has_bulk(&req->rq_flvr))
858                 req->rq_pack_bulk = 1;
859 }
860
861 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
862 {
863         if (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
864                 return;
865
866         LASSERT(req->rq_clrbuf);
867         if (req->rq_pool || !req->rq_reqbuf)
868                 return;
869
870         OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
871         req->rq_reqbuf = NULL;
872         req->rq_reqbuf_len = 0;
873 }
874
875 /**
876  * Given an import \a imp, check whether current user has a valid context
877  * or not. We may create a new context and try to refresh it, and try
878  * repeatedly try in case of non-fatal errors. Return 0 means success.
879  */
880 int sptlrpc_import_check_ctx(struct obd_import *imp)
881 {
882         struct ptlrpc_sec     *sec;
883         struct ptlrpc_cli_ctx *ctx;
884         struct ptlrpc_request *req = NULL;
885         int rc;
886
887         might_sleep();
888
889         sec = sptlrpc_import_sec_ref(imp);
890         ctx = get_my_ctx(sec);
891         sptlrpc_sec_put(sec);
892
893         if (!ctx)
894                 return -ENOMEM;
895
896         if (cli_ctx_is_eternal(ctx) ||
897             ctx->cc_ops->validate(ctx) == 0) {
898                 sptlrpc_cli_ctx_put(ctx, 1);
899                 return 0;
900         }
901
902         if (cli_ctx_is_error(ctx)) {
903                 sptlrpc_cli_ctx_put(ctx, 1);
904                 return -EACCES;
905         }
906
907         req = ptlrpc_request_cache_alloc(GFP_NOFS);
908         if (!req)
909                 return -ENOMEM;
910
911         spin_lock_init(&req->rq_lock);
912         atomic_set(&req->rq_refcount, 10000);
913         INIT_LIST_HEAD(&req->rq_ctx_chain);
914         init_waitqueue_head(&req->rq_reply_waitq);
915         init_waitqueue_head(&req->rq_set_waitq);
916         req->rq_import = imp;
917         req->rq_flvr = sec->ps_flvr;
918         req->rq_cli_ctx = ctx;
919
920         rc = sptlrpc_req_refresh_ctx(req, 0);
921         LASSERT(list_empty(&req->rq_ctx_chain));
922         sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
923         ptlrpc_request_cache_free(req);
924
925         return rc;
926 }
927
928 /**
929  * Used by ptlrpc client, to perform the pre-defined security transformation
930  * upon the request message of \a req. After this function called,
931  * req->rq_reqmsg is still accessible as clear text.
932  */
933 int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
934 {
935         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
936         int rc = 0;
937
938         LASSERT(ctx);
939         LASSERT(ctx->cc_sec);
940         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
941
942         /* we wrap bulk request here because now we can be sure
943          * the context is uptodate.
944          */
945         if (req->rq_bulk) {
946                 rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
947                 if (rc)
948                         return rc;
949         }
950
951         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
952         case SPTLRPC_SVC_NULL:
953         case SPTLRPC_SVC_AUTH:
954         case SPTLRPC_SVC_INTG:
955                 LASSERT(ctx->cc_ops->sign);
956                 rc = ctx->cc_ops->sign(ctx, req);
957                 break;
958         case SPTLRPC_SVC_PRIV:
959                 LASSERT(ctx->cc_ops->seal);
960                 rc = ctx->cc_ops->seal(ctx, req);
961                 break;
962         default:
963                 LBUG();
964         }
965
966         if (rc == 0) {
967                 LASSERT(req->rq_reqdata_len);
968                 LASSERT(req->rq_reqdata_len % 8 == 0);
969                 LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
970         }
971
972         return rc;
973 }
974
975 static int do_cli_unwrap_reply(struct ptlrpc_request *req)
976 {
977         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
978         int                 rc;
979
980         LASSERT(ctx);
981         LASSERT(ctx->cc_sec);
982         LASSERT(req->rq_repbuf);
983         LASSERT(req->rq_repdata);
984         LASSERT(req->rq_repmsg == NULL);
985
986         req->rq_rep_swab_mask = 0;
987
988         rc = __lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len);
989         switch (rc) {
990         case 1:
991                 lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
992         case 0:
993                 break;
994         default:
995                 CERROR("failed unpack reply: x%llu\n", req->rq_xid);
996                 return -EPROTO;
997         }
998
999         if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
1000                 CERROR("replied data length %d too small\n",
1001                        req->rq_repdata_len);
1002                 return -EPROTO;
1003         }
1004
1005         if (SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr) !=
1006             SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
1007                 CERROR("reply policy %u doesn't match request policy %u\n",
1008                        SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr),
1009                        SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc));
1010                 return -EPROTO;
1011         }
1012
1013         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
1014         case SPTLRPC_SVC_NULL:
1015         case SPTLRPC_SVC_AUTH:
1016         case SPTLRPC_SVC_INTG:
1017                 LASSERT(ctx->cc_ops->verify);
1018                 rc = ctx->cc_ops->verify(ctx, req);
1019                 break;
1020         case SPTLRPC_SVC_PRIV:
1021                 LASSERT(ctx->cc_ops->unseal);
1022                 rc = ctx->cc_ops->unseal(ctx, req);
1023                 break;
1024         default:
1025                 LBUG();
1026         }
1027         LASSERT(rc || req->rq_repmsg || req->rq_resend);
1028
1029         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL &&
1030             !req->rq_ctx_init)
1031                 req->rq_rep_swab_mask = 0;
1032         return rc;
1033 }
1034
1035 /**
1036  * Used by ptlrpc client, to perform security transformation upon the reply
1037  * message of \a req. After return successfully, req->rq_repmsg points to
1038  * the reply message in clear text.
1039  *
1040  * \pre the reply buffer should have been un-posted from LNet, so nothing is
1041  * going to change.
1042  */
1043 int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
1044 {
1045         LASSERT(req->rq_repbuf);
1046         LASSERT(req->rq_repdata == NULL);
1047         LASSERT(req->rq_repmsg == NULL);
1048         LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
1049
1050         if (req->rq_reply_off == 0 &&
1051             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1052                 CERROR("real reply with offset 0\n");
1053                 return -EPROTO;
1054         }
1055
1056         if (req->rq_reply_off % 8 != 0) {
1057                 CERROR("reply at odd offset %u\n", req->rq_reply_off);
1058                 return -EPROTO;
1059         }
1060
1061         req->rq_repdata = (struct lustre_msg *)
1062                                 (req->rq_repbuf + req->rq_reply_off);
1063         req->rq_repdata_len = req->rq_nob_received;
1064
1065         return do_cli_unwrap_reply(req);
1066 }
1067
1068 /**
1069  * Used by ptlrpc client, to perform security transformation upon the early
1070  * reply message of \a req. We expect the rq_reply_off is 0, and
1071  * rq_nob_received is the early reply size.
1072  *
1073  * Because the receive buffer might be still posted, the reply data might be
1074  * changed at any time, no matter we're holding rq_lock or not. For this reason
1075  * we allocate a separate ptlrpc_request and reply buffer for early reply
1076  * processing.
1077  *
1078  * \retval 0 success, \a req_ret is filled with a duplicated ptlrpc_request.
1079  * Later the caller must call sptlrpc_cli_finish_early_reply() on the returned
1080  * \a *req_ret to release it.
1081  * \retval -ev error number, and \a req_ret will not be set.
1082  */
1083 int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
1084                                    struct ptlrpc_request **req_ret)
1085 {
1086         struct ptlrpc_request  *early_req;
1087         char               *early_buf;
1088         int                  early_bufsz, early_size;
1089         int                  rc;
1090
1091         early_req = ptlrpc_request_cache_alloc(GFP_NOFS);
1092         if (early_req == NULL)
1093                 return -ENOMEM;
1094
1095         early_size = req->rq_nob_received;
1096         early_bufsz = size_roundup_power2(early_size);
1097         OBD_ALLOC_LARGE(early_buf, early_bufsz);
1098         if (early_buf == NULL)
1099                 GOTO(err_req, rc = -ENOMEM);
1100
1101         /* sanity checkings and copy data out, do it inside spinlock */
1102         spin_lock(&req->rq_lock);
1103
1104         if (req->rq_replied) {
1105                 spin_unlock(&req->rq_lock);
1106                 GOTO(err_buf, rc = -EALREADY);
1107         }
1108
1109         LASSERT(req->rq_repbuf);
1110         LASSERT(req->rq_repdata == NULL);
1111         LASSERT(req->rq_repmsg == NULL);
1112
1113         if (req->rq_reply_off != 0) {
1114                 CERROR("early reply with offset %u\n", req->rq_reply_off);
1115                 spin_unlock(&req->rq_lock);
1116                 GOTO(err_buf, rc = -EPROTO);
1117         }
1118
1119         if (req->rq_nob_received != early_size) {
1120                 /* even another early arrived the size should be the same */
1121                 CERROR("data size has changed from %u to %u\n",
1122                        early_size, req->rq_nob_received);
1123                 spin_unlock(&req->rq_lock);
1124                 GOTO(err_buf, rc = -EINVAL);
1125         }
1126
1127         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
1128                 CERROR("early reply length %d too small\n",
1129                        req->rq_nob_received);
1130                 spin_unlock(&req->rq_lock);
1131                 GOTO(err_buf, rc = -EALREADY);
1132         }
1133
1134         memcpy(early_buf, req->rq_repbuf, early_size);
1135         spin_unlock(&req->rq_lock);
1136
1137         spin_lock_init(&early_req->rq_lock);
1138         early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
1139         early_req->rq_flvr = req->rq_flvr;
1140         early_req->rq_repbuf = early_buf;
1141         early_req->rq_repbuf_len = early_bufsz;
1142         early_req->rq_repdata = (struct lustre_msg *) early_buf;
1143         early_req->rq_repdata_len = early_size;
1144         early_req->rq_early = 1;
1145         early_req->rq_reqmsg = req->rq_reqmsg;
1146
1147         rc = do_cli_unwrap_reply(early_req);
1148         if (rc) {
1149                 DEBUG_REQ(D_ADAPTTO, early_req,
1150                           "error %d unwrap early reply", rc);
1151                 GOTO(err_ctx, rc);
1152         }
1153
1154         LASSERT(early_req->rq_repmsg);
1155         *req_ret = early_req;
1156         return 0;
1157
1158 err_ctx:
1159         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1160 err_buf:
1161         OBD_FREE_LARGE(early_buf, early_bufsz);
1162 err_req:
1163         ptlrpc_request_cache_free(early_req);
1164         return rc;
1165 }
1166
1167 /**
1168  * Used by ptlrpc client, to release a processed early reply \a early_req.
1169  *
1170  * \pre \a early_req was obtained from calling sptlrpc_cli_unwrap_early_reply().
1171  */
1172 void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
1173 {
1174         LASSERT(early_req->rq_repbuf);
1175         LASSERT(early_req->rq_repdata);
1176         LASSERT(early_req->rq_repmsg);
1177
1178         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1179         OBD_FREE_LARGE(early_req->rq_repbuf, early_req->rq_repbuf_len);
1180         ptlrpc_request_cache_free(early_req);
1181 }
1182
1183 /**************************************************
1184  * sec ID                                        *
1185  **************************************************/
1186
1187 /*
1188  * "fixed" sec (e.g. null) use sec_id < 0
1189  */
1190 static atomic_t sptlrpc_sec_id = ATOMIC_INIT(1);
1191
1192 int sptlrpc_get_next_secid(void)
1193 {
1194         return atomic_inc_return(&sptlrpc_sec_id);
1195 }
1196 EXPORT_SYMBOL(sptlrpc_get_next_secid);
1197
1198 /**************************************************
1199  * client side high-level security APIs    *
1200  **************************************************/
1201
1202 static int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
1203                                    int grace, int force)
1204 {
1205         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1206
1207         LASSERT(policy->sp_cops);
1208         LASSERT(policy->sp_cops->flush_ctx_cache);
1209
1210         return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
1211 }
1212
1213 static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
1214 {
1215         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1216
1217         LASSERT_ATOMIC_ZERO(&sec->ps_refcount);
1218         LASSERT_ATOMIC_ZERO(&sec->ps_nctx);
1219         LASSERT(policy->sp_cops->destroy_sec);
1220
1221         CDEBUG(D_SEC, "%s@%p: being destroyed\n", sec->ps_policy->sp_name, sec);
1222
1223         policy->sp_cops->destroy_sec(sec);
1224         sptlrpc_policy_put(policy);
1225 }
1226
1227 void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
1228 {
1229         sec_cop_destroy_sec(sec);
1230 }
1231 EXPORT_SYMBOL(sptlrpc_sec_destroy);
1232
1233 static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
1234 {
1235         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1236
1237         if (sec->ps_policy->sp_cops->kill_sec) {
1238                 sec->ps_policy->sp_cops->kill_sec(sec);
1239
1240                 sec_cop_flush_ctx_cache(sec, -1, 1, 1);
1241         }
1242 }
1243
1244 struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
1245 {
1246         if (sec)
1247                 atomic_inc(&sec->ps_refcount);
1248
1249         return sec;
1250 }
1251 EXPORT_SYMBOL(sptlrpc_sec_get);
1252
1253 void sptlrpc_sec_put(struct ptlrpc_sec *sec)
1254 {
1255         if (sec) {
1256                 LASSERT_ATOMIC_POS(&sec->ps_refcount);
1257
1258                 if (atomic_dec_and_test(&sec->ps_refcount)) {
1259                         sptlrpc_gc_del_sec(sec);
1260                         sec_cop_destroy_sec(sec);
1261                 }
1262         }
1263 }
1264 EXPORT_SYMBOL(sptlrpc_sec_put);
1265
1266 /*
1267  * policy module is responsible for taking reference of import
1268  */
1269 static
1270 struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
1271                                        struct ptlrpc_svc_ctx *svc_ctx,
1272                                        struct sptlrpc_flavor *sf,
1273                                        enum lustre_sec_part sp)
1274 {
1275         struct ptlrpc_sec_policy *policy;
1276         struct ptlrpc_sec       *sec;
1277         char                  str[32];
1278
1279         if (svc_ctx) {
1280                 LASSERT(imp->imp_dlm_fake == 1);
1281
1282                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
1283                        imp->imp_obd->obd_type->typ_name,
1284                        imp->imp_obd->obd_name,
1285                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1286
1287                 policy = sptlrpc_policy_get(svc_ctx->sc_policy);
1288                 sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1289         } else {
1290                 LASSERT(imp->imp_dlm_fake == 0);
1291
1292                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
1293                        imp->imp_obd->obd_type->typ_name,
1294                        imp->imp_obd->obd_name,
1295                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1296
1297                 policy = sptlrpc_wireflavor2policy(sf->sf_rpc);
1298                 if (!policy) {
1299                         CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
1300                         return NULL;
1301                 }
1302         }
1303
1304         sec = policy->sp_cops->create_sec(imp, svc_ctx, sf);
1305         if (sec) {
1306                 atomic_inc(&sec->ps_refcount);
1307
1308                 sec->ps_part = sp;
1309
1310                 if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
1311                         sptlrpc_gc_add_sec(sec);
1312         } else {
1313                 sptlrpc_policy_put(policy);
1314         }
1315
1316         return sec;
1317 }
1318
1319 struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp)
1320 {
1321         struct ptlrpc_sec *sec;
1322
1323         spin_lock(&imp->imp_lock);
1324         sec = sptlrpc_sec_get(imp->imp_sec);
1325         spin_unlock(&imp->imp_lock);
1326
1327         return sec;
1328 }
1329 EXPORT_SYMBOL(sptlrpc_import_sec_ref);
1330
1331 static void sptlrpc_import_sec_install(struct obd_import *imp,
1332                                        struct ptlrpc_sec *sec)
1333 {
1334         struct ptlrpc_sec *old_sec;
1335
1336         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1337
1338         spin_lock(&imp->imp_lock);
1339         old_sec = imp->imp_sec;
1340         imp->imp_sec = sec;
1341         spin_unlock(&imp->imp_lock);
1342
1343         if (old_sec) {
1344                 sptlrpc_sec_kill(old_sec);
1345
1346                 /* balance the ref taken by this import */
1347                 sptlrpc_sec_put(old_sec);
1348         }
1349 }
1350
1351 static inline
1352 int flavor_equal(struct sptlrpc_flavor *sf1, struct sptlrpc_flavor *sf2)
1353 {
1354         return (memcmp(sf1, sf2, sizeof(*sf1)) == 0);
1355 }
1356
1357 static inline
1358 void flavor_copy(struct sptlrpc_flavor *dst, struct sptlrpc_flavor *src)
1359 {
1360         *dst = *src;
1361 }
1362
1363 static void sptlrpc_import_sec_adapt_inplace(struct obd_import *imp,
1364                                              struct ptlrpc_sec *sec,
1365                                              struct sptlrpc_flavor *sf)
1366 {
1367         char    str1[32], str2[32];
1368
1369         if (sec->ps_flvr.sf_flags != sf->sf_flags)
1370                 CDEBUG(D_SEC, "changing sec flags: %s -> %s\n",
1371                        sptlrpc_secflags2str(sec->ps_flvr.sf_flags,
1372                                             str1, sizeof(str1)),
1373                        sptlrpc_secflags2str(sf->sf_flags,
1374                                             str2, sizeof(str2)));
1375
1376         spin_lock(&sec->ps_lock);
1377         flavor_copy(&sec->ps_flvr, sf);
1378         spin_unlock(&sec->ps_lock);
1379 }
1380
1381 /**
1382  * To get an appropriate ptlrpc_sec for the \a imp, according to the current
1383  * configuration. Upon called, imp->imp_sec may or may not be NULL.
1384  *
1385  *  - regular import: \a svc_ctx should be NULL and \a flvr is ignored;
1386  *  - reverse import: \a svc_ctx and \a flvr are obtained from incoming request.
1387  */
1388 int sptlrpc_import_sec_adapt(struct obd_import *imp,
1389                              struct ptlrpc_svc_ctx *svc_ctx,
1390                              struct sptlrpc_flavor *flvr)
1391 {
1392         struct ptlrpc_connection   *conn;
1393         struct sptlrpc_flavor       sf;
1394         struct ptlrpc_sec         *sec, *newsec;
1395         enum lustre_sec_part    sp;
1396         char                    str[24];
1397         int                      rc = 0;
1398
1399         might_sleep();
1400
1401         if (imp == NULL)
1402                 return 0;
1403
1404         conn = imp->imp_connection;
1405
1406         if (svc_ctx == NULL) {
1407                 struct client_obd *cliobd = &imp->imp_obd->u.cli;
1408                 /*
1409                  * normal import, determine flavor from rule set, except
1410                  * for mgc the flavor is predetermined.
1411                  */
1412                 if (cliobd->cl_sp_me == LUSTRE_SP_MGC)
1413                         sf = cliobd->cl_flvr_mgc;
1414                 else
1415                         sptlrpc_conf_choose_flavor(cliobd->cl_sp_me,
1416                                                    cliobd->cl_sp_to,
1417                                                    &cliobd->cl_target_uuid,
1418                                                    conn->c_self, &sf);
1419
1420                 sp = imp->imp_obd->u.cli.cl_sp_me;
1421         } else {
1422                 /* reverse import, determine flavor from incoming request */
1423                 sf = *flvr;
1424
1425                 if (sf.sf_rpc != SPTLRPC_FLVR_NULL)
1426                         sf.sf_flags = PTLRPC_SEC_FL_REVERSE |
1427                                       PTLRPC_SEC_FL_ROOTONLY;
1428
1429                 sp = sptlrpc_target_sec_part(imp->imp_obd);
1430         }
1431
1432         sec = sptlrpc_import_sec_ref(imp);
1433         if (sec) {
1434                 char    str2[24];
1435
1436                 if (flavor_equal(&sf, &sec->ps_flvr))
1437                         GOTO(out, rc);
1438
1439                 CDEBUG(D_SEC, "import %s->%s: changing flavor %s -> %s\n",
1440                        imp->imp_obd->obd_name,
1441                        obd_uuid2str(&conn->c_remote_uuid),
1442                        sptlrpc_flavor2name(&sec->ps_flvr, str, sizeof(str)),
1443                        sptlrpc_flavor2name(&sf, str2, sizeof(str2)));
1444
1445                 if (SPTLRPC_FLVR_POLICY(sf.sf_rpc) ==
1446                     SPTLRPC_FLVR_POLICY(sec->ps_flvr.sf_rpc) &&
1447                     SPTLRPC_FLVR_MECH(sf.sf_rpc) ==
1448                     SPTLRPC_FLVR_MECH(sec->ps_flvr.sf_rpc)) {
1449                         sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
1450                         GOTO(out, rc);
1451                 }
1452         } else if (SPTLRPC_FLVR_BASE(sf.sf_rpc) !=
1453                    SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL)) {
1454                 CDEBUG(D_SEC, "import %s->%s netid %x: select flavor %s\n",
1455                        imp->imp_obd->obd_name,
1456                        obd_uuid2str(&conn->c_remote_uuid),
1457                        LNET_NIDNET(conn->c_self),
1458                        sptlrpc_flavor2name(&sf, str, sizeof(str)));
1459         }
1460
1461         mutex_lock(&imp->imp_sec_mutex);
1462
1463         newsec = sptlrpc_sec_create(imp, svc_ctx, &sf, sp);
1464         if (newsec) {
1465                 sptlrpc_import_sec_install(imp, newsec);
1466         } else {
1467                 CERROR("import %s->%s: failed to create new sec\n",
1468                        imp->imp_obd->obd_name,
1469                        obd_uuid2str(&conn->c_remote_uuid));
1470                 rc = -EPERM;
1471         }
1472
1473         mutex_unlock(&imp->imp_sec_mutex);
1474 out:
1475         sptlrpc_sec_put(sec);
1476         return rc;
1477 }
1478
1479 void sptlrpc_import_sec_put(struct obd_import *imp)
1480 {
1481         if (imp->imp_sec) {
1482                 sptlrpc_sec_kill(imp->imp_sec);
1483
1484                 sptlrpc_sec_put(imp->imp_sec);
1485                 imp->imp_sec = NULL;
1486         }
1487 }
1488
1489 static void import_flush_ctx_common(struct obd_import *imp,
1490                                     uid_t uid, int grace, int force)
1491 {
1492         struct ptlrpc_sec *sec;
1493
1494         if (imp == NULL)
1495                 return;
1496
1497         sec = sptlrpc_import_sec_ref(imp);
1498         if (sec == NULL)
1499                 return;
1500
1501         sec_cop_flush_ctx_cache(sec, uid, grace, force);
1502         sptlrpc_sec_put(sec);
1503 }
1504
1505 void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
1506 {
1507         /* it's important to use grace mode, see explain in
1508          * sptlrpc_req_refresh_ctx() */
1509         import_flush_ctx_common(imp, 0, 1, 1);
1510 }
1511
1512 void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
1513 {
1514         import_flush_ctx_common(imp, from_kuid(&init_user_ns, current_uid()),
1515                                 1, 1);
1516 }
1517 EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
1518
1519 void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
1520 {
1521         import_flush_ctx_common(imp, -1, 1, 1);
1522 }
1523 EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
1524
1525 /**
1526  * Used by ptlrpc client to allocate request buffer of \a req. Upon return
1527  * successfully, req->rq_reqmsg points to a buffer with size \a msgsize.
1528  */
1529 int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
1530 {
1531         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1532         struct ptlrpc_sec_policy *policy;
1533         int rc;
1534
1535         LASSERT(ctx);
1536         LASSERT(ctx->cc_sec);
1537         LASSERT(ctx->cc_sec->ps_policy);
1538         LASSERT(req->rq_reqmsg == NULL);
1539         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1540
1541         policy = ctx->cc_sec->ps_policy;
1542         rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
1543         if (!rc) {
1544                 LASSERT(req->rq_reqmsg);
1545                 LASSERT(req->rq_reqbuf || req->rq_clrbuf);
1546
1547                 /* zeroing preallocated buffer */
1548                 if (req->rq_pool)
1549                         memset(req->rq_reqmsg, 0, msgsize);
1550         }
1551
1552         return rc;
1553 }
1554
1555 /**
1556  * Used by ptlrpc client to free request buffer of \a req. After this
1557  * req->rq_reqmsg is set to NULL and should not be accessed anymore.
1558  */
1559 void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
1560 {
1561         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1562         struct ptlrpc_sec_policy *policy;
1563
1564         LASSERT(ctx);
1565         LASSERT(ctx->cc_sec);
1566         LASSERT(ctx->cc_sec->ps_policy);
1567         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1568
1569         if (req->rq_reqbuf == NULL && req->rq_clrbuf == NULL)
1570                 return;
1571
1572         policy = ctx->cc_sec->ps_policy;
1573         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1574         req->rq_reqmsg = NULL;
1575 }
1576
1577 /*
1578  * NOTE caller must guarantee the buffer size is enough for the enlargement
1579  */
1580 void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1581                                   int segment, int newsize)
1582 {
1583         void   *src, *dst;
1584         int     oldsize, oldmsg_size, movesize;
1585
1586         LASSERT(segment < msg->lm_bufcount);
1587         LASSERT(msg->lm_buflens[segment] <= newsize);
1588
1589         if (msg->lm_buflens[segment] == newsize)
1590                 return;
1591
1592         /* nothing to do if we are enlarging the last segment */
1593         if (segment == msg->lm_bufcount - 1) {
1594                 msg->lm_buflens[segment] = newsize;
1595                 return;
1596         }
1597
1598         oldsize = msg->lm_buflens[segment];
1599
1600         src = lustre_msg_buf(msg, segment + 1, 0);
1601         msg->lm_buflens[segment] = newsize;
1602         dst = lustre_msg_buf(msg, segment + 1, 0);
1603         msg->lm_buflens[segment] = oldsize;
1604
1605         /* move from segment + 1 to end segment */
1606         LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1607         oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1608         movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1609         LASSERT(movesize >= 0);
1610
1611         if (movesize)
1612                 memmove(dst, src, movesize);
1613
1614         /* note we don't clear the ares where old data live, not secret */
1615
1616         /* finally set new segment size */
1617         msg->lm_buflens[segment] = newsize;
1618 }
1619 EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1620
1621 /**
1622  * Used by ptlrpc client to enlarge the \a segment of request message pointed
1623  * by req->rq_reqmsg to size \a newsize, all previously filled-in data will be
1624  * preserved after the enlargement. this must be called after original request
1625  * buffer being allocated.
1626  *
1627  * \note after this be called, rq_reqmsg and rq_reqlen might have been changed,
1628  * so caller should refresh its local pointers if needed.
1629  */
1630 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1631                                int segment, int newsize)
1632 {
1633         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1634         struct ptlrpc_sec_cops   *cops;
1635         struct lustre_msg       *msg = req->rq_reqmsg;
1636
1637         LASSERT(ctx);
1638         LASSERT(msg);
1639         LASSERT(msg->lm_bufcount > segment);
1640         LASSERT(msg->lm_buflens[segment] <= newsize);
1641
1642         if (msg->lm_buflens[segment] == newsize)
1643                 return 0;
1644
1645         cops = ctx->cc_sec->ps_policy->sp_cops;
1646         LASSERT(cops->enlarge_reqbuf);
1647         return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1648 }
1649 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1650
1651 /**
1652  * Used by ptlrpc client to allocate reply buffer of \a req.
1653  *
1654  * \note After this, req->rq_repmsg is still not accessible.
1655  */
1656 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1657 {
1658         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1659         struct ptlrpc_sec_policy *policy;
1660
1661         LASSERT(ctx);
1662         LASSERT(ctx->cc_sec);
1663         LASSERT(ctx->cc_sec->ps_policy);
1664
1665         if (req->rq_repbuf)
1666                 return 0;
1667
1668         policy = ctx->cc_sec->ps_policy;
1669         return policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize);
1670 }
1671
1672 /**
1673  * Used by ptlrpc client to free reply buffer of \a req. After this
1674  * req->rq_repmsg is set to NULL and should not be accessed anymore.
1675  */
1676 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1677 {
1678         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1679         struct ptlrpc_sec_policy *policy;
1680
1681         LASSERT(ctx);
1682         LASSERT(ctx->cc_sec);
1683         LASSERT(ctx->cc_sec->ps_policy);
1684         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1685
1686         if (req->rq_repbuf == NULL)
1687                 return;
1688         LASSERT(req->rq_repbuf_len);
1689
1690         policy = ctx->cc_sec->ps_policy;
1691         policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1692         req->rq_repmsg = NULL;
1693 }
1694
1695 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1696                                 struct ptlrpc_cli_ctx *ctx)
1697 {
1698         struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1699
1700         if (!policy->sp_cops->install_rctx)
1701                 return 0;
1702         return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1703 }
1704
1705 int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1706                                 struct ptlrpc_svc_ctx *ctx)
1707 {
1708         struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1709
1710         if (!policy->sp_sops->install_rctx)
1711                 return 0;
1712         return policy->sp_sops->install_rctx(imp, ctx);
1713 }
1714
1715 /****************************************
1716  * server side security          *
1717  ****************************************/
1718
1719 static int flavor_allowed(struct sptlrpc_flavor *exp,
1720                           struct ptlrpc_request *req)
1721 {
1722         struct sptlrpc_flavor *flvr = &req->rq_flvr;
1723
1724         if (exp->sf_rpc == SPTLRPC_FLVR_ANY || exp->sf_rpc == flvr->sf_rpc)
1725                 return 1;
1726
1727         if ((req->rq_ctx_init || req->rq_ctx_fini) &&
1728             SPTLRPC_FLVR_POLICY(exp->sf_rpc) ==
1729             SPTLRPC_FLVR_POLICY(flvr->sf_rpc) &&
1730             SPTLRPC_FLVR_MECH(exp->sf_rpc) == SPTLRPC_FLVR_MECH(flvr->sf_rpc))
1731                 return 1;
1732
1733         return 0;
1734 }
1735
1736 #define EXP_FLVR_UPDATE_EXPIRE      (OBD_TIMEOUT_DEFAULT + 10)
1737
1738 /**
1739  * Given an export \a exp, check whether the flavor of incoming \a req
1740  * is allowed by the export \a exp. Main logic is about taking care of
1741  * changing configurations. Return 0 means success.
1742  */
1743 int sptlrpc_target_export_check(struct obd_export *exp,
1744                                 struct ptlrpc_request *req)
1745 {
1746         struct sptlrpc_flavor   flavor;
1747
1748         if (exp == NULL)
1749                 return 0;
1750
1751         /* client side export has no imp_reverse, skip
1752          * FIXME maybe we should check flavor this as well??? */
1753         if (exp->exp_imp_reverse == NULL)
1754                 return 0;
1755
1756         /* don't care about ctx fini rpc */
1757         if (req->rq_ctx_fini)
1758                 return 0;
1759
1760         spin_lock(&exp->exp_lock);
1761
1762         /* if flavor just changed (exp->exp_flvr_changed != 0), we wait for
1763          * the first req with the new flavor, then treat it as current flavor,
1764          * adapt reverse sec according to it.
1765          * note the first rpc with new flavor might not be with root ctx, in
1766          * which case delay the sec_adapt by leaving exp_flvr_adapt == 1. */
1767         if (unlikely(exp->exp_flvr_changed) &&
1768             flavor_allowed(&exp->exp_flvr_old[1], req)) {
1769                 /* make the new flavor as "current", and old ones as
1770                  * about-to-expire */
1771                 CDEBUG(D_SEC, "exp %p: just changed: %x->%x\n", exp,
1772                        exp->exp_flvr.sf_rpc, exp->exp_flvr_old[1].sf_rpc);
1773                 flavor = exp->exp_flvr_old[1];
1774                 exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
1775                 exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
1776                 exp->exp_flvr_old[0] = exp->exp_flvr;
1777                 exp->exp_flvr_expire[0] = get_seconds() +
1778                                           EXP_FLVR_UPDATE_EXPIRE;
1779                 exp->exp_flvr = flavor;
1780
1781                 /* flavor change finished */
1782                 exp->exp_flvr_changed = 0;
1783                 LASSERT(exp->exp_flvr_adapt == 1);
1784
1785                 /* if it's gss, we only interested in root ctx init */
1786                 if (req->rq_auth_gss &&
1787                     !(req->rq_ctx_init &&
1788                       (req->rq_auth_usr_root || req->rq_auth_usr_mdt ||
1789                        req->rq_auth_usr_ost))) {
1790                         spin_unlock(&exp->exp_lock);
1791                         CDEBUG(D_SEC, "is good but not root(%d:%d:%d:%d:%d)\n",
1792                                req->rq_auth_gss, req->rq_ctx_init,
1793                                req->rq_auth_usr_root, req->rq_auth_usr_mdt,
1794                                req->rq_auth_usr_ost);
1795                         return 0;
1796                 }
1797
1798                 exp->exp_flvr_adapt = 0;
1799                 spin_unlock(&exp->exp_lock);
1800
1801                 return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1802                                                 req->rq_svc_ctx, &flavor);
1803         }
1804
1805         /* if it equals to the current flavor, we accept it, but need to
1806          * dealing with reverse sec/ctx */
1807         if (likely(flavor_allowed(&exp->exp_flvr, req))) {
1808                 /* most cases should return here, we only interested in
1809                  * gss root ctx init */
1810                 if (!req->rq_auth_gss || !req->rq_ctx_init ||
1811                     (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1812                      !req->rq_auth_usr_ost)) {
1813                         spin_unlock(&exp->exp_lock);
1814                         return 0;
1815                 }
1816
1817                 /* if flavor just changed, we should not proceed, just leave
1818                  * it and current flavor will be discovered and replaced
1819                  * shortly, and let _this_ rpc pass through */
1820                 if (exp->exp_flvr_changed) {
1821                         LASSERT(exp->exp_flvr_adapt);
1822                         spin_unlock(&exp->exp_lock);
1823                         return 0;
1824                 }
1825
1826                 if (exp->exp_flvr_adapt) {
1827                         exp->exp_flvr_adapt = 0;
1828                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): do delayed adapt\n",
1829                                exp, exp->exp_flvr.sf_rpc,
1830                                exp->exp_flvr_old[0].sf_rpc,
1831                                exp->exp_flvr_old[1].sf_rpc);
1832                         flavor = exp->exp_flvr;
1833                         spin_unlock(&exp->exp_lock);
1834
1835                         return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1836                                                         req->rq_svc_ctx,
1837                                                         &flavor);
1838                 } else {
1839                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
1840                                "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
1841                                exp->exp_flvr_old[0].sf_rpc,
1842                                exp->exp_flvr_old[1].sf_rpc);
1843                         spin_unlock(&exp->exp_lock);
1844
1845                         return sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse,
1846                                                            req->rq_svc_ctx);
1847                 }
1848         }
1849
1850         if (exp->exp_flvr_expire[0]) {
1851                 if (exp->exp_flvr_expire[0] >= get_seconds()) {
1852                         if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
1853                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1854                                        "middle one ("CFS_DURATION_T")\n", exp,
1855                                        exp->exp_flvr.sf_rpc,
1856                                        exp->exp_flvr_old[0].sf_rpc,
1857                                        exp->exp_flvr_old[1].sf_rpc,
1858                                        exp->exp_flvr_expire[0] -
1859                                                 get_seconds());
1860                                 spin_unlock(&exp->exp_lock);
1861                                 return 0;
1862                         }
1863                 } else {
1864                         CDEBUG(D_SEC, "mark middle expired\n");
1865                         exp->exp_flvr_expire[0] = 0;
1866                 }
1867                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match middle\n", exp,
1868                        exp->exp_flvr.sf_rpc,
1869                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1870                        req->rq_flvr.sf_rpc);
1871         }
1872
1873         /* now it doesn't match the current flavor, the only chance we can
1874          * accept it is match the old flavors which is not expired. */
1875         if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
1876                 if (exp->exp_flvr_expire[1] >= get_seconds()) {
1877                         if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
1878                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1879                                        "oldest one ("CFS_DURATION_T")\n", exp,
1880                                        exp->exp_flvr.sf_rpc,
1881                                        exp->exp_flvr_old[0].sf_rpc,
1882                                        exp->exp_flvr_old[1].sf_rpc,
1883                                        exp->exp_flvr_expire[1] -
1884                                                 get_seconds());
1885                                 spin_unlock(&exp->exp_lock);
1886                                 return 0;
1887                         }
1888                 } else {
1889                         CDEBUG(D_SEC, "mark oldest expired\n");
1890                         exp->exp_flvr_expire[1] = 0;
1891                 }
1892                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match found\n",
1893                        exp, exp->exp_flvr.sf_rpc,
1894                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1895                        req->rq_flvr.sf_rpc);
1896         } else {
1897                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): skip the last one\n",
1898                        exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc,
1899                        exp->exp_flvr_old[1].sf_rpc);
1900         }
1901
1902         spin_unlock(&exp->exp_lock);
1903
1904         CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with "
1905               "unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n",
1906               exp, exp->exp_obd->obd_name,
1907               req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
1908               req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost,
1909               req->rq_flvr.sf_rpc,
1910               exp->exp_flvr.sf_rpc,
1911               exp->exp_flvr_old[0].sf_rpc,
1912               exp->exp_flvr_expire[0] ?
1913               (unsigned long) (exp->exp_flvr_expire[0] -
1914                                get_seconds()) : 0,
1915               exp->exp_flvr_old[1].sf_rpc,
1916               exp->exp_flvr_expire[1] ?
1917               (unsigned long) (exp->exp_flvr_expire[1] -
1918                                get_seconds()) : 0);
1919         return -EACCES;
1920 }
1921 EXPORT_SYMBOL(sptlrpc_target_export_check);
1922
1923 void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
1924                                       struct sptlrpc_rule_set *rset)
1925 {
1926         struct obd_export       *exp;
1927         struct sptlrpc_flavor    new_flvr;
1928
1929         LASSERT(obd);
1930
1931         spin_lock(&obd->obd_dev_lock);
1932
1933         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1934                 if (exp->exp_connection == NULL)
1935                         continue;
1936
1937                 /* note if this export had just been updated flavor
1938                  * (exp_flvr_changed == 1), this will override the
1939                  * previous one. */
1940                 spin_lock(&exp->exp_lock);
1941                 sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer,
1942                                              exp->exp_connection->c_peer.nid,
1943                                              &new_flvr);
1944                 if (exp->exp_flvr_changed ||
1945                     !flavor_equal(&new_flvr, &exp->exp_flvr)) {
1946                         exp->exp_flvr_old[1] = new_flvr;
1947                         exp->exp_flvr_expire[1] = 0;
1948                         exp->exp_flvr_changed = 1;
1949                         exp->exp_flvr_adapt = 1;
1950
1951                         CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
1952                                exp, sptlrpc_part2name(exp->exp_sp_peer),
1953                                exp->exp_flvr.sf_rpc,
1954                                exp->exp_flvr_old[1].sf_rpc);
1955                 }
1956                 spin_unlock(&exp->exp_lock);
1957         }
1958
1959         spin_unlock(&obd->obd_dev_lock);
1960 }
1961 EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
1962
1963 static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
1964 {
1965         /* peer's claim is unreliable unless gss is being used */
1966         if (!req->rq_auth_gss || svc_rc == SECSVC_DROP)
1967                 return svc_rc;
1968
1969         switch (req->rq_sp_from) {
1970         case LUSTRE_SP_CLI:
1971                 if (req->rq_auth_usr_mdt || req->rq_auth_usr_ost) {
1972                         DEBUG_REQ(D_ERROR, req, "faked source CLI");
1973                         svc_rc = SECSVC_DROP;
1974                 }
1975                 break;
1976         case LUSTRE_SP_MDT:
1977                 if (!req->rq_auth_usr_mdt) {
1978                         DEBUG_REQ(D_ERROR, req, "faked source MDT");
1979                         svc_rc = SECSVC_DROP;
1980                 }
1981                 break;
1982         case LUSTRE_SP_OST:
1983                 if (!req->rq_auth_usr_ost) {
1984                         DEBUG_REQ(D_ERROR, req, "faked source OST");
1985                         svc_rc = SECSVC_DROP;
1986                 }
1987                 break;
1988         case LUSTRE_SP_MGS:
1989         case LUSTRE_SP_MGC:
1990                 if (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1991                     !req->rq_auth_usr_ost) {
1992                         DEBUG_REQ(D_ERROR, req, "faked source MGC/MGS");
1993                         svc_rc = SECSVC_DROP;
1994                 }
1995                 break;
1996         case LUSTRE_SP_ANY:
1997         default:
1998                 DEBUG_REQ(D_ERROR, req, "invalid source %u", req->rq_sp_from);
1999                 svc_rc = SECSVC_DROP;
2000         }
2001
2002         return svc_rc;
2003 }
2004
2005 /**
2006  * Used by ptlrpc server, to perform transformation upon request message of
2007  * incoming \a req. This must be the first thing to do with a incoming
2008  * request in ptlrpc layer.
2009  *
2010  * \retval SECSVC_OK success, and req->rq_reqmsg point to request message in
2011  * clear text, size is req->rq_reqlen; also req->rq_svc_ctx is set.
2012  * \retval SECSVC_COMPLETE success, the request has been fully processed, and
2013  * reply message has been prepared.
2014  * \retval SECSVC_DROP failed, this request should be dropped.
2015  */
2016 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
2017 {
2018         struct ptlrpc_sec_policy *policy;
2019         struct lustre_msg       *msg = req->rq_reqbuf;
2020         int                    rc;
2021
2022         LASSERT(msg);
2023         LASSERT(req->rq_reqmsg == NULL);
2024         LASSERT(req->rq_repmsg == NULL);
2025         LASSERT(req->rq_svc_ctx == NULL);
2026
2027         req->rq_req_swab_mask = 0;
2028
2029         rc = __lustre_unpack_msg(msg, req->rq_reqdata_len);
2030         switch (rc) {
2031         case 1:
2032                 lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
2033         case 0:
2034                 break;
2035         default:
2036                 CERROR("error unpacking request from %s x%llu\n",
2037                        libcfs_id2str(req->rq_peer), req->rq_xid);
2038                 return SECSVC_DROP;
2039         }
2040
2041         req->rq_flvr.sf_rpc = WIRE_FLVR(msg->lm_secflvr);
2042         req->rq_sp_from = LUSTRE_SP_ANY;
2043         req->rq_auth_uid = -1;
2044         req->rq_auth_mapped_uid = -1;
2045
2046         policy = sptlrpc_wireflavor2policy(req->rq_flvr.sf_rpc);
2047         if (!policy) {
2048                 CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
2049                 return SECSVC_DROP;
2050         }
2051
2052         LASSERT(policy->sp_sops->accept);
2053         rc = policy->sp_sops->accept(req);
2054         sptlrpc_policy_put(policy);
2055         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
2056         LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
2057
2058         /*
2059          * if it's not null flavor (which means embedded packing msg),
2060          * reset the swab mask for the coming inner msg unpacking.
2061          */
2062         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL)
2063                 req->rq_req_swab_mask = 0;
2064
2065         /* sanity check for the request source */
2066         rc = sptlrpc_svc_check_from(req, rc);
2067         return rc;
2068 }
2069
2070 /**
2071  * Used by ptlrpc server, to allocate reply buffer for \a req. If succeed,
2072  * req->rq_reply_state is set, and req->rq_reply_state->rs_msg point to
2073  * a buffer of \a msglen size.
2074  */
2075 int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
2076 {
2077         struct ptlrpc_sec_policy *policy;
2078         struct ptlrpc_reply_state *rs;
2079         int rc;
2080
2081         LASSERT(req->rq_svc_ctx);
2082         LASSERT(req->rq_svc_ctx->sc_policy);
2083
2084         policy = req->rq_svc_ctx->sc_policy;
2085         LASSERT(policy->sp_sops->alloc_rs);
2086
2087         rc = policy->sp_sops->alloc_rs(req, msglen);
2088         if (unlikely(rc == -ENOMEM)) {
2089                 struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
2090                 if (svcpt->scp_service->srv_max_reply_size <
2091                    msglen + sizeof(struct ptlrpc_reply_state)) {
2092                         /* Just return failure if the size is too big */
2093                         CERROR("size of message is too big (%zd), %d allowed",
2094                                 msglen + sizeof(struct ptlrpc_reply_state),
2095                                 svcpt->scp_service->srv_max_reply_size);
2096                         return -ENOMEM;
2097                 }
2098
2099                 /* failed alloc, try emergency pool */
2100                 rs = lustre_get_emerg_rs(svcpt);
2101                 if (rs == NULL)
2102                         return -ENOMEM;
2103
2104                 req->rq_reply_state = rs;
2105                 rc = policy->sp_sops->alloc_rs(req, msglen);
2106                 if (rc) {
2107                         lustre_put_emerg_rs(rs);
2108                         req->rq_reply_state = NULL;
2109                 }
2110         }
2111
2112         LASSERT(rc != 0 ||
2113                 (req->rq_reply_state && req->rq_reply_state->rs_msg));
2114
2115         return rc;
2116 }
2117
2118 /**
2119  * Used by ptlrpc server, to perform transformation upon reply message.
2120  *
2121  * \post req->rq_reply_off is set to appropriate server-controlled reply offset.
2122  * \post req->rq_repmsg and req->rq_reply_state->rs_msg becomes inaccessible.
2123  */
2124 int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
2125 {
2126         struct ptlrpc_sec_policy *policy;
2127         int rc;
2128
2129         LASSERT(req->rq_svc_ctx);
2130         LASSERT(req->rq_svc_ctx->sc_policy);
2131
2132         policy = req->rq_svc_ctx->sc_policy;
2133         LASSERT(policy->sp_sops->authorize);
2134
2135         rc = policy->sp_sops->authorize(req);
2136         LASSERT(rc || req->rq_reply_state->rs_repdata_len);
2137
2138         return rc;
2139 }
2140
2141 /**
2142  * Used by ptlrpc server, to free reply_state.
2143  */
2144 void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
2145 {
2146         struct ptlrpc_sec_policy *policy;
2147         unsigned int prealloc;
2148
2149         LASSERT(rs->rs_svc_ctx);
2150         LASSERT(rs->rs_svc_ctx->sc_policy);
2151
2152         policy = rs->rs_svc_ctx->sc_policy;
2153         LASSERT(policy->sp_sops->free_rs);
2154
2155         prealloc = rs->rs_prealloc;
2156         policy->sp_sops->free_rs(rs);
2157
2158         if (prealloc)
2159                 lustre_put_emerg_rs(rs);
2160 }
2161
2162 void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
2163 {
2164         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2165
2166         if (ctx != NULL)
2167                 atomic_inc(&ctx->sc_refcount);
2168 }
2169
2170 void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
2171 {
2172         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2173
2174         if (ctx == NULL)
2175                 return;
2176
2177         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2178         if (atomic_dec_and_test(&ctx->sc_refcount)) {
2179                 if (ctx->sc_policy->sp_sops->free_ctx)
2180                         ctx->sc_policy->sp_sops->free_ctx(ctx);
2181         }
2182         req->rq_svc_ctx = NULL;
2183 }
2184
2185 void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
2186 {
2187         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2188
2189         if (ctx == NULL)
2190                 return;
2191
2192         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2193         if (ctx->sc_policy->sp_sops->invalidate_ctx)
2194                 ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
2195 }
2196 EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
2197
2198 /****************************************
2199  * bulk security                        *
2200  ****************************************/
2201
2202 /**
2203  * Perform transformation upon bulk data pointed by \a desc. This is called
2204  * before transforming the request message.
2205  */
2206 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
2207                           struct ptlrpc_bulk_desc *desc)
2208 {
2209         struct ptlrpc_cli_ctx *ctx;
2210
2211         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2212
2213         if (!req->rq_pack_bulk)
2214                 return 0;
2215
2216         ctx = req->rq_cli_ctx;
2217         if (ctx->cc_ops->wrap_bulk)
2218                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
2219         return 0;
2220 }
2221 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
2222
2223 /**
2224  * This is called after unwrap the reply message.
2225  * return nob of actual plain text size received, or error code.
2226  */
2227 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
2228                                  struct ptlrpc_bulk_desc *desc,
2229                                  int nob)
2230 {
2231         struct ptlrpc_cli_ctx  *ctx;
2232         int                  rc;
2233
2234         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
2235
2236         if (!req->rq_pack_bulk)
2237                 return desc->bd_nob_transferred;
2238
2239         ctx = req->rq_cli_ctx;
2240         if (ctx->cc_ops->unwrap_bulk) {
2241                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2242                 if (rc < 0)
2243                         return rc;
2244         }
2245         return desc->bd_nob_transferred;
2246 }
2247 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
2248
2249 /**
2250  * This is called after unwrap the reply message.
2251  * return 0 for success or error code.
2252  */
2253 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
2254                                   struct ptlrpc_bulk_desc *desc)
2255 {
2256         struct ptlrpc_cli_ctx  *ctx;
2257         int                  rc;
2258
2259         LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
2260
2261         if (!req->rq_pack_bulk)
2262                 return 0;
2263
2264         ctx = req->rq_cli_ctx;
2265         if (ctx->cc_ops->unwrap_bulk) {
2266                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2267                 if (rc < 0)
2268                         return rc;
2269         }
2270
2271         /*
2272          * if everything is going right, nob should equals to nob_transferred.
2273          * in case of privacy mode, nob_transferred needs to be adjusted.
2274          */
2275         if (desc->bd_nob != desc->bd_nob_transferred) {
2276                 CERROR("nob %d doesn't match transferred nob %d",
2277                        desc->bd_nob, desc->bd_nob_transferred);
2278                 return -EPROTO;
2279         }
2280
2281         return 0;
2282 }
2283 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
2284
2285
2286 /****************************************
2287  * user descriptor helpers            *
2288  ****************************************/
2289
2290 int sptlrpc_current_user_desc_size(void)
2291 {
2292         int ngroups;
2293
2294         ngroups = current_ngroups;
2295
2296         if (ngroups > LUSTRE_MAX_GROUPS)
2297                 ngroups = LUSTRE_MAX_GROUPS;
2298         return sptlrpc_user_desc_size(ngroups);
2299 }
2300 EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
2301
2302 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
2303 {
2304         struct ptlrpc_user_desc *pud;
2305
2306         pud = lustre_msg_buf(msg, offset, 0);
2307
2308         pud->pud_uid = from_kuid(&init_user_ns, current_uid());
2309         pud->pud_gid = from_kgid(&init_user_ns, current_gid());
2310         pud->pud_fsuid = from_kuid(&init_user_ns, current_fsuid());
2311         pud->pud_fsgid = from_kgid(&init_user_ns, current_fsgid());
2312         pud->pud_cap = cfs_curproc_cap_pack();
2313         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
2314
2315         task_lock(current);
2316         if (pud->pud_ngroups > current_ngroups)
2317                 pud->pud_ngroups = current_ngroups;
2318         memcpy(pud->pud_groups, current_cred()->group_info->blocks[0],
2319                pud->pud_ngroups * sizeof(__u32));
2320         task_unlock(current);
2321
2322         return 0;
2323 }
2324 EXPORT_SYMBOL(sptlrpc_pack_user_desc);
2325
2326 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset, int swabbed)
2327 {
2328         struct ptlrpc_user_desc *pud;
2329         int                   i;
2330
2331         pud = lustre_msg_buf(msg, offset, sizeof(*pud));
2332         if (!pud)
2333                 return -EINVAL;
2334
2335         if (swabbed) {
2336                 __swab32s(&pud->pud_uid);
2337                 __swab32s(&pud->pud_gid);
2338                 __swab32s(&pud->pud_fsuid);
2339                 __swab32s(&pud->pud_fsgid);
2340                 __swab32s(&pud->pud_cap);
2341                 __swab32s(&pud->pud_ngroups);
2342         }
2343
2344         if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
2345                 CERROR("%u groups is too large\n", pud->pud_ngroups);
2346                 return -EINVAL;
2347         }
2348
2349         if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
2350             msg->lm_buflens[offset]) {
2351                 CERROR("%u groups are claimed but bufsize only %u\n",
2352                        pud->pud_ngroups, msg->lm_buflens[offset]);
2353                 return -EINVAL;
2354         }
2355
2356         if (swabbed) {
2357                 for (i = 0; i < pud->pud_ngroups; i++)
2358                         __swab32s(&pud->pud_groups[i]);
2359         }
2360
2361         return 0;
2362 }
2363 EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
2364
2365 /****************************************
2366  * misc helpers                  *
2367  ****************************************/
2368
2369 const char * sec2target_str(struct ptlrpc_sec *sec)
2370 {
2371         if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
2372                 return "*";
2373         if (sec_is_reverse(sec))
2374                 return "c";
2375         return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
2376 }
2377 EXPORT_SYMBOL(sec2target_str);
2378
2379 /*
2380  * return true if the bulk data is protected
2381  */
2382 int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
2383 {
2384         switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
2385         case SPTLRPC_BULK_SVC_INTG:
2386         case SPTLRPC_BULK_SVC_PRIV:
2387                 return 1;
2388         default:
2389                 return 0;
2390         }
2391 }
2392 EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
2393
2394 /****************************************
2395  * crypto API helper/alloc blkciper     *
2396  ****************************************/
2397
2398 /****************************************
2399  * initialize/finalize            *
2400  ****************************************/
2401
2402 int sptlrpc_init(void)
2403 {
2404         int rc;
2405
2406         rwlock_init(&policy_lock);
2407
2408         rc = sptlrpc_gc_init();
2409         if (rc)
2410                 goto out;
2411
2412         rc = sptlrpc_conf_init();
2413         if (rc)
2414                 goto out_gc;
2415
2416         rc = sptlrpc_enc_pool_init();
2417         if (rc)
2418                 goto out_conf;
2419
2420         rc = sptlrpc_null_init();
2421         if (rc)
2422                 goto out_pool;
2423
2424         rc = sptlrpc_plain_init();
2425         if (rc)
2426                 goto out_null;
2427
2428         rc = sptlrpc_lproc_init();
2429         if (rc)
2430                 goto out_plain;
2431
2432         return 0;
2433
2434 out_plain:
2435         sptlrpc_plain_fini();
2436 out_null:
2437         sptlrpc_null_fini();
2438 out_pool:
2439         sptlrpc_enc_pool_fini();
2440 out_conf:
2441         sptlrpc_conf_fini();
2442 out_gc:
2443         sptlrpc_gc_fini();
2444 out:
2445         return rc;
2446 }
2447
2448 void sptlrpc_fini(void)
2449 {
2450         sptlrpc_lproc_fini();
2451         sptlrpc_plain_fini();
2452         sptlrpc_null_fini();
2453         sptlrpc_enc_pool_fini();
2454         sptlrpc_conf_fini();
2455         sptlrpc_gc_fini();
2456 }