Merge 3.18-rc3 into staging-next
[cascardo/linux.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
48
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
53
54 struct list_head      obd_zombie_imports;
55 struct list_head      obd_zombie_exports;
56 spinlock_t  obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137         if (type) {
138                 spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 try_module_get(type->typ_dt_ops->o_owner);
141                 spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         module_put(type->typ_dt_ops->o_owner);
153         spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165
166         /* sanity check */
167         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168
169         if (class_search_type(name)) {
170                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171                 return -EEXIST;
172         }
173
174         rc = -ENOMEM;
175         OBD_ALLOC(type, sizeof(*type));
176         if (type == NULL)
177                 return rc;
178
179         OBD_ALLOC_PTR(type->typ_dt_ops);
180         OBD_ALLOC_PTR(type->typ_md_ops);
181         OBD_ALLOC(type->typ_name, strlen(name) + 1);
182
183         if (type->typ_dt_ops == NULL ||
184             type->typ_md_ops == NULL ||
185             type->typ_name == NULL)
186                 goto failed;
187
188         *(type->typ_dt_ops) = *dt_ops;
189         /* md_ops is optional */
190         if (md_ops)
191                 *(type->typ_md_ops) = *md_ops;
192         strcpy(type->typ_name, name);
193         spin_lock_init(&type->obd_type_lock);
194
195         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196                                               vars, type);
197         if (IS_ERR(type->typ_procroot)) {
198                 rc = PTR_ERR(type->typ_procroot);
199                 type->typ_procroot = NULL;
200                 goto failed;
201         }
202
203         if (ldt != NULL) {
204                 type->typ_lu = ldt;
205                 rc = lu_device_type_init(ldt);
206                 if (rc != 0)
207                         goto failed;
208         }
209
210         spin_lock(&obd_types_lock);
211         list_add(&type->typ_chain, &obd_types);
212         spin_unlock(&obd_types_lock);
213
214         return 0;
215
216  failed:
217         if (type->typ_name != NULL)
218                 OBD_FREE(type->typ_name, strlen(name) + 1);
219         if (type->typ_md_ops != NULL)
220                 OBD_FREE_PTR(type->typ_md_ops);
221         if (type->typ_dt_ops != NULL)
222                 OBD_FREE_PTR(type->typ_dt_ops);
223         OBD_FREE(type, sizeof(*type));
224         return rc;
225 }
226 EXPORT_SYMBOL(class_register_type);
227
228 int class_unregister_type(const char *name)
229 {
230         struct obd_type *type = class_search_type(name);
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 return -EINVAL;
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 return -EBUSY;
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         spin_lock(&obd_types_lock);
254         list_del(&type->typ_chain);
255         spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         return 0;
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *       pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 return ERR_PTR(-EINVAL);
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 return ERR_PTR(-ENODEV);
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 result = ERR_PTR(-ENOMEM);
299                 goto out_type;
300         }
301
302         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304         write_lock(&obd_dev_lock);
305         for (i = 0; i < class_devno_max(); i++) {
306                 struct obd_device *obd = class_num2obd(i);
307
308                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0] = '\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 result = ERR_PTR(-EOVERFLOW);
341                 goto out;
342         }
343
344         if (IS_ERR(result))
345                 goto out;
346
347         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                result->obd_name, result);
349
350         return result;
351 out:
352         obd_device_free(newdev);
353 out_type:
354         class_put_type(type);
355         return result;
356 }
357
358 void class_release_dev(struct obd_device *obd)
359 {
360         struct obd_type *obd_type = obd->obd_type;
361
362         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366         LASSERT(obd_type != NULL);
367
368         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370
371         write_lock(&obd_dev_lock);
372         obd_devs[obd->obd_minor] = NULL;
373         write_unlock(&obd_dev_lock);
374         obd_device_free(obd);
375
376         class_put_type(obd_type);
377 }
378
379 int class_name2dev(const char *name)
380 {
381         int i;
382
383         if (!name)
384                 return -1;
385
386         read_lock(&obd_dev_lock);
387         for (i = 0; i < class_devno_max(); i++) {
388                 struct obd_device *obd = class_num2obd(i);
389
390                 if (obd && strcmp(name, obd->obd_name) == 0) {
391                         /* Make sure we finished attaching before we give
392                            out any references */
393                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394                         if (obd->obd_attached) {
395                                 read_unlock(&obd_dev_lock);
396                                 return i;
397                         }
398                         break;
399                 }
400         }
401         read_unlock(&obd_dev_lock);
402
403         return -1;
404 }
405 EXPORT_SYMBOL(class_name2dev);
406
407 struct obd_device *class_name2obd(const char *name)
408 {
409         int dev = class_name2dev(name);
410
411         if (dev < 0 || dev > class_devno_max())
412                 return NULL;
413         return class_num2obd(dev);
414 }
415 EXPORT_SYMBOL(class_name2obd);
416
417 int class_uuid2dev(struct obd_uuid *uuid)
418 {
419         int i;
420
421         read_lock(&obd_dev_lock);
422         for (i = 0; i < class_devno_max(); i++) {
423                 struct obd_device *obd = class_num2obd(i);
424
425                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427                         read_unlock(&obd_dev_lock);
428                         return i;
429                 }
430         }
431         read_unlock(&obd_dev_lock);
432
433         return -1;
434 }
435 EXPORT_SYMBOL(class_uuid2dev);
436
437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438 {
439         int dev = class_uuid2dev(uuid);
440         if (dev < 0)
441                 return NULL;
442         return class_num2obd(dev);
443 }
444 EXPORT_SYMBOL(class_uuid2obd);
445
446 /**
447  * Get obd device from ::obd_devs[]
448  *
449  * \param num [in] array index
450  *
451  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452  *       otherwise return the obd device there.
453  */
454 struct obd_device *class_num2obd(int num)
455 {
456         struct obd_device *obd = NULL;
457
458         if (num < class_devno_max()) {
459                 obd = obd_devs[num];
460                 if (obd == NULL)
461                         return NULL;
462
463                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464                          "%p obd_magic %08x != %08x\n",
465                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466                 LASSERTF(obd->obd_minor == num,
467                          "%p obd_minor %0d != %0d\n",
468                          obd, obd->obd_minor, num);
469         }
470
471         return obd;
472 }
473 EXPORT_SYMBOL(class_num2obd);
474
475 /**
476  * Get obd devices count. Device in any
477  *    state are counted
478  * \retval obd device count
479  */
480 int get_devices_count(void)
481 {
482         int index, max_index = class_devno_max(), dev_count = 0;
483
484         read_lock(&obd_dev_lock);
485         for (index = 0; index <= max_index; index++) {
486                 struct obd_device *obd = class_num2obd(index);
487                 if (obd != NULL)
488                         dev_count++;
489         }
490         read_unlock(&obd_dev_lock);
491
492         return dev_count;
493 }
494 EXPORT_SYMBOL(get_devices_count);
495
496 void class_obd_list(void)
497 {
498         char *status;
499         int i;
500
501         read_lock(&obd_dev_lock);
502         for (i = 0; i < class_devno_max(); i++) {
503                 struct obd_device *obd = class_num2obd(i);
504
505                 if (obd == NULL)
506                         continue;
507                 if (obd->obd_stopping)
508                         status = "ST";
509                 else if (obd->obd_set_up)
510                         status = "UP";
511                 else if (obd->obd_attached)
512                         status = "AT";
513                 else
514                         status = "--";
515                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516                          i, status, obd->obd_type->typ_name,
517                          obd->obd_name, obd->obd_uuid.uuid,
518                          atomic_read(&obd->obd_refcount));
519         }
520         read_unlock(&obd_dev_lock);
521         return;
522 }
523
524 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
525    specified, then only the client with that uuid is returned,
526    otherwise any client connected to the tgt is returned. */
527 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
528                                           const char *typ_name,
529                                           struct obd_uuid *grp_uuid)
530 {
531         int i;
532
533         read_lock(&obd_dev_lock);
534         for (i = 0; i < class_devno_max(); i++) {
535                 struct obd_device *obd = class_num2obd(i);
536
537                 if (obd == NULL)
538                         continue;
539                 if ((strncmp(obd->obd_type->typ_name, typ_name,
540                              strlen(typ_name)) == 0)) {
541                         if (obd_uuid_equals(tgt_uuid,
542                                             &obd->u.cli.cl_target_uuid) &&
543                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
544                                                          &obd->obd_uuid) : 1)) {
545                                 read_unlock(&obd_dev_lock);
546                                 return obd;
547                         }
548                 }
549         }
550         read_unlock(&obd_dev_lock);
551
552         return NULL;
553 }
554 EXPORT_SYMBOL(class_find_client_obd);
555
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557    searching at *next, and if a device is found, the next index to look
558    at is saved in *next. If next is NULL, then the first matching device
559    will always be returned. */
560 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
561 {
562         int i;
563
564         if (next == NULL)
565                 i = 0;
566         else if (*next >= 0 && *next < class_devno_max())
567                 i = *next;
568         else
569                 return NULL;
570
571         read_lock(&obd_dev_lock);
572         for (; i < class_devno_max(); i++) {
573                 struct obd_device *obd = class_num2obd(i);
574
575                 if (obd == NULL)
576                         continue;
577                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578                         if (next != NULL)
579                                 *next = i+1;
580                         read_unlock(&obd_dev_lock);
581                         return obd;
582                 }
583         }
584         read_unlock(&obd_dev_lock);
585
586         return NULL;
587 }
588 EXPORT_SYMBOL(class_devices_in_group);
589
590 /**
591  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592  * adjust sptlrpc settings accordingly.
593  */
594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595 {
596         struct obd_device  *obd;
597         const char       *type;
598         int              i, rc = 0, rc2;
599
600         LASSERT(namelen > 0);
601
602         read_lock(&obd_dev_lock);
603         for (i = 0; i < class_devno_max(); i++) {
604                 obd = class_num2obd(i);
605
606                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607                         continue;
608
609                 /* only notify mdc, osc, mdt, ost */
610                 type = obd->obd_type->typ_name;
611                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614                     strcmp(type, LUSTRE_OST_NAME) != 0)
615                         continue;
616
617                 if (strncmp(obd->obd_name, fsname, namelen))
618                         continue;
619
620                 class_incref(obd, __func__, obd);
621                 read_unlock(&obd_dev_lock);
622                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623                                          sizeof(KEY_SPTLRPC_CONF),
624                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
625                 rc = rc ? rc : rc2;
626                 class_decref(obd, __func__, obd);
627                 read_lock(&obd_dev_lock);
628         }
629         read_unlock(&obd_dev_lock);
630         return rc;
631 }
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633
634 void obd_cleanup_caches(void)
635 {
636         if (obd_device_cachep) {
637                 kmem_cache_destroy(obd_device_cachep);
638                 obd_device_cachep = NULL;
639         }
640         if (obdo_cachep) {
641                 kmem_cache_destroy(obdo_cachep);
642                 obdo_cachep = NULL;
643         }
644         if (import_cachep) {
645                 kmem_cache_destroy(import_cachep);
646                 import_cachep = NULL;
647         }
648         if (capa_cachep) {
649                 kmem_cache_destroy(capa_cachep);
650                 capa_cachep = NULL;
651         }
652 }
653
654 int obd_init_caches(void)
655 {
656         LASSERT(obd_device_cachep == NULL);
657         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658                                                  sizeof(struct obd_device),
659                                                  0, 0, NULL);
660         if (!obd_device_cachep)
661                 goto out;
662
663         LASSERT(obdo_cachep == NULL);
664         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665                                            0, 0, NULL);
666         if (!obdo_cachep)
667                 goto out;
668
669         LASSERT(import_cachep == NULL);
670         import_cachep = kmem_cache_create("ll_import_cache",
671                                              sizeof(struct obd_import),
672                                              0, 0, NULL);
673         if (!import_cachep)
674                 goto out;
675
676         LASSERT(capa_cachep == NULL);
677         capa_cachep = kmem_cache_create("capa_cache",
678                                            sizeof(struct obd_capa), 0, 0, NULL);
679         if (!capa_cachep)
680                 goto out;
681
682         return 0;
683  out:
684         obd_cleanup_caches();
685         return -ENOMEM;
686
687 }
688
689 /* map connection to client */
690 struct obd_export *class_conn2export(struct lustre_handle *conn)
691 {
692         struct obd_export *export;
693
694         if (!conn) {
695                 CDEBUG(D_CACHE, "looking for null handle\n");
696                 return NULL;
697         }
698
699         if (conn->cookie == -1) {  /* this means assign a new connection */
700                 CDEBUG(D_CACHE, "want a new connection\n");
701                 return NULL;
702         }
703
704         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705         export = class_handle2object(conn->cookie);
706         return export;
707 }
708 EXPORT_SYMBOL(class_conn2export);
709
710 struct obd_device *class_exp2obd(struct obd_export *exp)
711 {
712         if (exp)
713                 return exp->exp_obd;
714         return NULL;
715 }
716 EXPORT_SYMBOL(class_exp2obd);
717
718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
719 {
720         struct obd_export *export;
721         export = class_conn2export(conn);
722         if (export) {
723                 struct obd_device *obd = export->exp_obd;
724                 class_export_put(export);
725                 return obd;
726         }
727         return NULL;
728 }
729 EXPORT_SYMBOL(class_conn2obd);
730
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 {
733         struct obd_device *obd = exp->exp_obd;
734         if (obd == NULL)
735                 return NULL;
736         return obd->u.cli.cl_import;
737 }
738 EXPORT_SYMBOL(class_exp2cliimp);
739
740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741 {
742         struct obd_device *obd = class_conn2obd(conn);
743         if (obd == NULL)
744                 return NULL;
745         return obd->u.cli.cl_import;
746 }
747 EXPORT_SYMBOL(class_conn2cliimp);
748
749 /* Export management functions */
750 static void class_export_destroy(struct obd_export *exp)
751 {
752         struct obd_device *obd = exp->exp_obd;
753
754         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755         LASSERT(obd != NULL);
756
757         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758                exp->exp_client_uuid.uuid, obd->obd_name);
759
760         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761         if (exp->exp_connection)
762                 ptlrpc_put_connection_superhack(exp->exp_connection);
763
764         LASSERT(list_empty(&exp->exp_outstanding_replies));
765         LASSERT(list_empty(&exp->exp_uncommitted_replies));
766         LASSERT(list_empty(&exp->exp_req_replay_queue));
767         LASSERT(list_empty(&exp->exp_hp_rpcs));
768         obd_destroy_export(exp);
769         class_decref(obd, "export", exp);
770
771         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772 }
773
774 static void export_handle_addref(void *export)
775 {
776         class_export_get(export);
777 }
778
779 static struct portals_handle_ops export_handle_ops = {
780         .hop_addref = export_handle_addref,
781         .hop_free   = NULL,
782 };
783
784 struct obd_export *class_export_get(struct obd_export *exp)
785 {
786         atomic_inc(&exp->exp_refcount);
787         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788                atomic_read(&exp->exp_refcount));
789         return exp;
790 }
791 EXPORT_SYMBOL(class_export_get);
792
793 void class_export_put(struct obd_export *exp)
794 {
795         LASSERT(exp != NULL);
796         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798                atomic_read(&exp->exp_refcount) - 1);
799
800         if (atomic_dec_and_test(&exp->exp_refcount)) {
801                 LASSERT(!list_empty(&exp->exp_obd_chain));
802                 CDEBUG(D_IOCTL, "final put %p/%s\n",
803                        exp, exp->exp_client_uuid.uuid);
804
805                 /* release nid stat refererence */
806                 lprocfs_exp_cleanup(exp);
807
808                 obd_zombie_export_add(exp);
809         }
810 }
811 EXPORT_SYMBOL(class_export_put);
812
813 /* Creates a new export, adds it to the hash table, and returns a
814  * pointer to it. The refcount is 2: one for the hash reference, and
815  * one for the pointer returned by this function. */
816 struct obd_export *class_new_export(struct obd_device *obd,
817                                     struct obd_uuid *cluuid)
818 {
819         struct obd_export *export;
820         struct cfs_hash *hash = NULL;
821         int rc = 0;
822
823         OBD_ALLOC_PTR(export);
824         if (!export)
825                 return ERR_PTR(-ENOMEM);
826
827         export->exp_conn_cnt = 0;
828         export->exp_lock_hash = NULL;
829         export->exp_flock_hash = NULL;
830         atomic_set(&export->exp_refcount, 2);
831         atomic_set(&export->exp_rpc_count, 0);
832         atomic_set(&export->exp_cb_count, 0);
833         atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835         INIT_LIST_HEAD(&export->exp_locks_list);
836         spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838         atomic_set(&export->exp_replay_count, 0);
839         export->exp_obd = obd;
840         INIT_LIST_HEAD(&export->exp_outstanding_replies);
841         spin_lock_init(&export->exp_uncommitted_replies_lock);
842         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843         INIT_LIST_HEAD(&export->exp_req_replay_queue);
844         INIT_LIST_HEAD(&export->exp_handle.h_link);
845         INIT_LIST_HEAD(&export->exp_hp_rpcs);
846         class_handle_hash(&export->exp_handle, &export_handle_ops);
847         export->exp_last_request_time = get_seconds();
848         spin_lock_init(&export->exp_lock);
849         spin_lock_init(&export->exp_rpc_lock);
850         INIT_HLIST_NODE(&export->exp_uuid_hash);
851         INIT_HLIST_NODE(&export->exp_nid_hash);
852         spin_lock_init(&export->exp_bl_list_lock);
853         INIT_LIST_HEAD(&export->exp_bl_list);
854
855         export->exp_sp_peer = LUSTRE_SP_ANY;
856         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857         export->exp_client_uuid = *cluuid;
858         obd_init_export(export);
859
860         spin_lock(&obd->obd_dev_lock);
861         /* shouldn't happen, but might race */
862         if (obd->obd_stopping) {
863                 rc = -ENODEV;
864                 goto exit_unlock;
865         }
866
867         hash = cfs_hash_getref(obd->obd_uuid_hash);
868         if (hash == NULL) {
869                 rc = -ENODEV;
870                 goto exit_unlock;
871         }
872         spin_unlock(&obd->obd_dev_lock);
873
874         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876                 if (rc != 0) {
877                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878                                       obd->obd_name, cluuid->uuid, rc);
879                         rc = -EALREADY;
880                         goto exit_err;
881                 }
882         }
883
884         spin_lock(&obd->obd_dev_lock);
885         if (obd->obd_stopping) {
886                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
887                 rc = -ENODEV;
888                 goto exit_unlock;
889         }
890
891         class_incref(obd, "export", export);
892         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893         list_add_tail(&export->exp_obd_chain_timed,
894                           &export->exp_obd->obd_exports_timed);
895         export->exp_obd->obd_num_exports++;
896         spin_unlock(&obd->obd_dev_lock);
897         cfs_hash_putref(hash);
898         return export;
899
900 exit_unlock:
901         spin_unlock(&obd->obd_dev_lock);
902 exit_err:
903         if (hash)
904                 cfs_hash_putref(hash);
905         class_handle_unhash(&export->exp_handle);
906         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907         obd_destroy_export(export);
908         OBD_FREE_PTR(export);
909         return ERR_PTR(rc);
910 }
911 EXPORT_SYMBOL(class_new_export);
912
913 void class_unlink_export(struct obd_export *exp)
914 {
915         class_handle_unhash(&exp->exp_handle);
916
917         spin_lock(&exp->exp_obd->obd_dev_lock);
918         /* delete an uuid-export hashitem from hashtables */
919         if (!hlist_unhashed(&exp->exp_uuid_hash))
920                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921                              &exp->exp_client_uuid,
922                              &exp->exp_uuid_hash);
923
924         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925         list_del_init(&exp->exp_obd_chain_timed);
926         exp->exp_obd->obd_num_exports--;
927         spin_unlock(&exp->exp_obd->obd_dev_lock);
928         class_export_put(exp);
929 }
930 EXPORT_SYMBOL(class_unlink_export);
931
932 /* Import management functions */
933 void class_import_destroy(struct obd_import *imp)
934 {
935         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936                 imp->imp_obd->obd_name);
937
938         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939
940         ptlrpc_put_connection_superhack(imp->imp_connection);
941
942         while (!list_empty(&imp->imp_conn_list)) {
943                 struct obd_import_conn *imp_conn;
944
945                 imp_conn = list_entry(imp->imp_conn_list.next,
946                                           struct obd_import_conn, oic_item);
947                 list_del_init(&imp_conn->oic_item);
948                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949                 OBD_FREE(imp_conn, sizeof(*imp_conn));
950         }
951
952         LASSERT(imp->imp_sec == NULL);
953         class_decref(imp->imp_obd, "import", imp);
954         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
955 }
956
957 static void import_handle_addref(void *import)
958 {
959         class_import_get(import);
960 }
961
962 static struct portals_handle_ops import_handle_ops = {
963         .hop_addref = import_handle_addref,
964         .hop_free   = NULL,
965 };
966
967 struct obd_import *class_import_get(struct obd_import *import)
968 {
969         atomic_inc(&import->imp_refcount);
970         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971                atomic_read(&import->imp_refcount),
972                import->imp_obd->obd_name);
973         return import;
974 }
975 EXPORT_SYMBOL(class_import_get);
976
977 void class_import_put(struct obd_import *imp)
978 {
979         LASSERT(list_empty(&imp->imp_zombie_chain));
980         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981
982         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983                atomic_read(&imp->imp_refcount) - 1,
984                imp->imp_obd->obd_name);
985
986         if (atomic_dec_and_test(&imp->imp_refcount)) {
987                 CDEBUG(D_INFO, "final put import %p\n", imp);
988                 obd_zombie_import_add(imp);
989         }
990
991         /* catch possible import put race */
992         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993 }
994 EXPORT_SYMBOL(class_import_put);
995
996 static void init_imp_at(struct imp_at *at) {
997         int i;
998         at_init(&at->iat_net_latency, 0, 0);
999         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000                 /* max service estimates are tracked on the server side, so
1001                    don't use the AT history here, just use the last reported
1002                    val. (But keep hist for proc histogram, worst_ever) */
1003                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004                         AT_FLG_NOHIST);
1005         }
1006 }
1007
1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010         struct obd_import *imp;
1011
1012         OBD_ALLOC(imp, sizeof(*imp));
1013         if (imp == NULL)
1014                 return NULL;
1015
1016         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018         INIT_LIST_HEAD(&imp->imp_replay_list);
1019         INIT_LIST_HEAD(&imp->imp_sending_list);
1020         INIT_LIST_HEAD(&imp->imp_delayed_list);
1021         INIT_LIST_HEAD(&imp->imp_committed_list);
1022         imp->imp_replay_cursor = &imp->imp_committed_list;
1023         spin_lock_init(&imp->imp_lock);
1024         imp->imp_last_success_conn = 0;
1025         imp->imp_state = LUSTRE_IMP_NEW;
1026         imp->imp_obd = class_incref(obd, "import", imp);
1027         mutex_init(&imp->imp_sec_mutex);
1028         init_waitqueue_head(&imp->imp_recovery_waitq);
1029
1030         atomic_set(&imp->imp_refcount, 2);
1031         atomic_set(&imp->imp_unregistering, 0);
1032         atomic_set(&imp->imp_inflight, 0);
1033         atomic_set(&imp->imp_replay_inflight, 0);
1034         atomic_set(&imp->imp_inval_count, 0);
1035         INIT_LIST_HEAD(&imp->imp_conn_list);
1036         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038         init_imp_at(&imp->imp_at);
1039
1040         /* the default magic is V2, will be used in connect RPC, and
1041          * then adjusted according to the flags in request/reply. */
1042         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043
1044         return imp;
1045 }
1046 EXPORT_SYMBOL(class_new_import);
1047
1048 void class_destroy_import(struct obd_import *import)
1049 {
1050         LASSERT(import != NULL);
1051         LASSERT(import != LP_POISON);
1052
1053         class_handle_unhash(&import->imp_handle);
1054
1055         spin_lock(&import->imp_lock);
1056         import->imp_generation++;
1057         spin_unlock(&import->imp_lock);
1058         class_import_put(import);
1059 }
1060 EXPORT_SYMBOL(class_destroy_import);
1061
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1063
1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065 {
1066         spin_lock(&exp->exp_locks_list_guard);
1067
1068         LASSERT(lock->l_exp_refs_nr >= 0);
1069
1070         if (lock->l_exp_refs_target != NULL &&
1071             lock->l_exp_refs_target != exp) {
1072                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073                               exp, lock, lock->l_exp_refs_target);
1074         }
1075         if ((lock->l_exp_refs_nr ++) == 0) {
1076                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077                 lock->l_exp_refs_target = exp;
1078         }
1079         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080                lock, exp, lock->l_exp_refs_nr);
1081         spin_unlock(&exp->exp_locks_list_guard);
1082 }
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1084
1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087         spin_lock(&exp->exp_locks_list_guard);
1088         LASSERT(lock->l_exp_refs_nr > 0);
1089         if (lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("lock %p, "
1091                               "mismatching export pointers: %p, %p\n",
1092                               lock, lock->l_exp_refs_target, exp);
1093         }
1094         if (-- lock->l_exp_refs_nr == 0) {
1095                 list_del_init(&lock->l_exp_refs_link);
1096                 lock->l_exp_refs_target = NULL;
1097         }
1098         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099                lock, exp, lock->l_exp_refs_nr);
1100         spin_unlock(&exp->exp_locks_list_guard);
1101 }
1102 EXPORT_SYMBOL(__class_export_del_lock_ref);
1103 #endif
1104
1105 /* A connection defines an export context in which preallocation can
1106    be managed. This releases the export pointer reference, and returns
1107    the export handle, so the export refcount is 1 when this function
1108    returns. */
1109 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1110                   struct obd_uuid *cluuid)
1111 {
1112         struct obd_export *export;
1113         LASSERT(conn != NULL);
1114         LASSERT(obd != NULL);
1115         LASSERT(cluuid != NULL);
1116
1117         export = class_new_export(obd, cluuid);
1118         if (IS_ERR(export))
1119                 return PTR_ERR(export);
1120
1121         conn->cookie = export->exp_handle.h_cookie;
1122         class_export_put(export);
1123
1124         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1125                cluuid->uuid, conn->cookie);
1126         return 0;
1127 }
1128 EXPORT_SYMBOL(class_connect);
1129
1130 /* if export is involved in recovery then clean up related things */
1131 void class_export_recovery_cleanup(struct obd_export *exp)
1132 {
1133         struct obd_device *obd = exp->exp_obd;
1134
1135         spin_lock(&obd->obd_recovery_task_lock);
1136         if (exp->exp_delayed)
1137                 obd->obd_delayed_clients--;
1138         if (obd->obd_recovering) {
1139                 if (exp->exp_in_recovery) {
1140                         spin_lock(&exp->exp_lock);
1141                         exp->exp_in_recovery = 0;
1142                         spin_unlock(&exp->exp_lock);
1143                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1144                         atomic_dec(&obd->obd_connected_clients);
1145                 }
1146
1147                 /* if called during recovery then should update
1148                  * obd_stale_clients counter,
1149                  * lightweight exports are not counted */
1150                 if (exp->exp_failed &&
1151                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1152                         exp->exp_obd->obd_stale_clients++;
1153         }
1154         spin_unlock(&obd->obd_recovery_task_lock);
1155         /** Cleanup req replay fields */
1156         if (exp->exp_req_replay_needed) {
1157                 spin_lock(&exp->exp_lock);
1158                 exp->exp_req_replay_needed = 0;
1159                 spin_unlock(&exp->exp_lock);
1160                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1161                 atomic_dec(&obd->obd_req_replay_clients);
1162         }
1163         /** Cleanup lock replay data */
1164         if (exp->exp_lock_replay_needed) {
1165                 spin_lock(&exp->exp_lock);
1166                 exp->exp_lock_replay_needed = 0;
1167                 spin_unlock(&exp->exp_lock);
1168                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1169                 atomic_dec(&obd->obd_lock_replay_clients);
1170         }
1171 }
1172
1173 /* This function removes 1-3 references from the export:
1174  * 1 - for export pointer passed
1175  * and if disconnect really need
1176  * 2 - removing from hash
1177  * 3 - in client_unlink_export
1178  * The export pointer passed to this function can destroyed */
1179 int class_disconnect(struct obd_export *export)
1180 {
1181         int already_disconnected;
1182
1183         if (export == NULL) {
1184                 CWARN("attempting to free NULL export %p\n", export);
1185                 return -EINVAL;
1186         }
1187
1188         spin_lock(&export->exp_lock);
1189         already_disconnected = export->exp_disconnected;
1190         export->exp_disconnected = 1;
1191         spin_unlock(&export->exp_lock);
1192
1193         /* class_cleanup(), abort_recovery(), and class_fail_export()
1194          * all end up in here, and if any of them race we shouldn't
1195          * call extra class_export_puts(). */
1196         if (already_disconnected) {
1197                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198                 goto no_disconn;
1199         }
1200
1201         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202                export->exp_handle.h_cookie);
1203
1204         if (!hlist_unhashed(&export->exp_nid_hash))
1205                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1206                              &export->exp_connection->c_peer.nid,
1207                              &export->exp_nid_hash);
1208
1209         class_export_recovery_cleanup(export);
1210         class_unlink_export(export);
1211 no_disconn:
1212         class_export_put(export);
1213         return 0;
1214 }
1215 EXPORT_SYMBOL(class_disconnect);
1216
1217 /* Return non-zero for a fully connected export */
1218 int class_connected_export(struct obd_export *exp)
1219 {
1220         if (exp) {
1221                 int connected;
1222                 spin_lock(&exp->exp_lock);
1223                 connected = (exp->exp_conn_cnt > 0);
1224                 spin_unlock(&exp->exp_lock);
1225                 return connected;
1226         }
1227         return 0;
1228 }
1229 EXPORT_SYMBOL(class_connected_export);
1230
1231 static void class_disconnect_export_list(struct list_head *list,
1232                                          enum obd_option flags)
1233 {
1234         int rc;
1235         struct obd_export *exp;
1236
1237         /* It's possible that an export may disconnect itself, but
1238          * nothing else will be added to this list. */
1239         while (!list_empty(list)) {
1240                 exp = list_entry(list->next, struct obd_export,
1241                                      exp_obd_chain);
1242                 /* need for safe call CDEBUG after obd_disconnect */
1243                 class_export_get(exp);
1244
1245                 spin_lock(&exp->exp_lock);
1246                 exp->exp_flags = flags;
1247                 spin_unlock(&exp->exp_lock);
1248
1249                 if (obd_uuid_equals(&exp->exp_client_uuid,
1250                                     &exp->exp_obd->obd_uuid)) {
1251                         CDEBUG(D_HA,
1252                                "exp %p export uuid == obd uuid, don't discon\n",
1253                                exp);
1254                         /* Need to delete this now so we don't end up pointing
1255                          * to work_list later when this export is cleaned up. */
1256                         list_del_init(&exp->exp_obd_chain);
1257                         class_export_put(exp);
1258                         continue;
1259                 }
1260
1261                 class_export_get(exp);
1262                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1263                        "last request at "CFS_TIME_T"\n",
1264                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265                        exp, exp->exp_last_request_time);
1266                 /* release one export reference anyway */
1267                 rc = obd_disconnect(exp);
1268
1269                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270                        obd_export_nid2str(exp), exp, rc);
1271                 class_export_put(exp);
1272         }
1273 }
1274
1275 void class_disconnect_exports(struct obd_device *obd)
1276 {
1277         struct list_head work_list;
1278
1279         /* Move all of the exports from obd_exports to a work list, en masse. */
1280         INIT_LIST_HEAD(&work_list);
1281         spin_lock(&obd->obd_dev_lock);
1282         list_splice_init(&obd->obd_exports, &work_list);
1283         list_splice_init(&obd->obd_delayed_exports, &work_list);
1284         spin_unlock(&obd->obd_dev_lock);
1285
1286         if (!list_empty(&work_list)) {
1287                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1288                        "disconnecting them\n", obd->obd_minor, obd);
1289                 class_disconnect_export_list(&work_list,
1290                                              exp_flags_from_obd(obd));
1291         } else
1292                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1293                        obd->obd_minor, obd);
1294 }
1295 EXPORT_SYMBOL(class_disconnect_exports);
1296
1297 /* Remove exports that have not completed recovery.
1298  */
1299 void class_disconnect_stale_exports(struct obd_device *obd,
1300                                     int (*test_export)(struct obd_export *))
1301 {
1302         struct list_head work_list;
1303         struct obd_export *exp, *n;
1304         int evicted = 0;
1305
1306         INIT_LIST_HEAD(&work_list);
1307         spin_lock(&obd->obd_dev_lock);
1308         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1309                                      exp_obd_chain) {
1310                 /* don't count self-export as client */
1311                 if (obd_uuid_equals(&exp->exp_client_uuid,
1312                                     &exp->exp_obd->obd_uuid))
1313                         continue;
1314
1315                 /* don't evict clients which have no slot in last_rcvd
1316                  * (e.g. lightweight connection) */
1317                 if (exp->exp_target_data.ted_lr_idx == -1)
1318                         continue;
1319
1320                 spin_lock(&exp->exp_lock);
1321                 if (exp->exp_failed || test_export(exp)) {
1322                         spin_unlock(&exp->exp_lock);
1323                         continue;
1324                 }
1325                 exp->exp_failed = 1;
1326                 spin_unlock(&exp->exp_lock);
1327
1328                 list_move(&exp->exp_obd_chain, &work_list);
1329                 evicted++;
1330                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1331                        obd->obd_name, exp->exp_client_uuid.uuid,
1332                        exp->exp_connection == NULL ? "<unknown>" :
1333                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1334                 print_export_data(exp, "EVICTING", 0);
1335         }
1336         spin_unlock(&obd->obd_dev_lock);
1337
1338         if (evicted)
1339                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1340                               obd->obd_name, evicted);
1341
1342         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1343                                                  OBD_OPT_ABORT_RECOV);
1344 }
1345 EXPORT_SYMBOL(class_disconnect_stale_exports);
1346
1347 void class_fail_export(struct obd_export *exp)
1348 {
1349         int rc, already_failed;
1350
1351         spin_lock(&exp->exp_lock);
1352         already_failed = exp->exp_failed;
1353         exp->exp_failed = 1;
1354         spin_unlock(&exp->exp_lock);
1355
1356         if (already_failed) {
1357                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1358                        exp, exp->exp_client_uuid.uuid);
1359                 return;
1360         }
1361
1362         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1363                exp, exp->exp_client_uuid.uuid);
1364
1365         if (obd_dump_on_timeout)
1366                 libcfs_debug_dumplog();
1367
1368         /* need for safe call CDEBUG after obd_disconnect */
1369         class_export_get(exp);
1370
1371         /* Most callers into obd_disconnect are removing their own reference
1372          * (request, for example) in addition to the one from the hash table.
1373          * We don't have such a reference here, so make one. */
1374         class_export_get(exp);
1375         rc = obd_disconnect(exp);
1376         if (rc)
1377                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1378         else
1379                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1380                        exp, exp->exp_client_uuid.uuid);
1381         class_export_put(exp);
1382 }
1383 EXPORT_SYMBOL(class_fail_export);
1384
1385 char *obd_export_nid2str(struct obd_export *exp)
1386 {
1387         if (exp->exp_connection != NULL)
1388                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1389
1390         return "(no nid)";
1391 }
1392 EXPORT_SYMBOL(obd_export_nid2str);
1393
1394 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1395 {
1396         struct cfs_hash *nid_hash;
1397         struct obd_export *doomed_exp = NULL;
1398         int exports_evicted = 0;
1399
1400         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1401
1402         spin_lock(&obd->obd_dev_lock);
1403         /* umount has run already, so evict thread should leave
1404          * its task to umount thread now */
1405         if (obd->obd_stopping) {
1406                 spin_unlock(&obd->obd_dev_lock);
1407                 return exports_evicted;
1408         }
1409         nid_hash = obd->obd_nid_hash;
1410         cfs_hash_getref(nid_hash);
1411         spin_unlock(&obd->obd_dev_lock);
1412
1413         do {
1414                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1415                 if (doomed_exp == NULL)
1416                         break;
1417
1418                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1419                          "nid %s found, wanted nid %s, requested nid %s\n",
1420                          obd_export_nid2str(doomed_exp),
1421                          libcfs_nid2str(nid_key), nid);
1422                 LASSERTF(doomed_exp != obd->obd_self_export,
1423                          "self-export is hashed by NID?\n");
1424                 exports_evicted++;
1425                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1426                               "request\n", obd->obd_name,
1427                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1428                               obd_export_nid2str(doomed_exp));
1429                 class_fail_export(doomed_exp);
1430                 class_export_put(doomed_exp);
1431         } while (1);
1432
1433         cfs_hash_putref(nid_hash);
1434
1435         if (!exports_evicted)
1436                 CDEBUG(D_HA,
1437                        "%s: can't disconnect NID '%s': no exports found\n",
1438                        obd->obd_name, nid);
1439         return exports_evicted;
1440 }
1441 EXPORT_SYMBOL(obd_export_evict_by_nid);
1442
1443 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1444 {
1445         struct cfs_hash *uuid_hash;
1446         struct obd_export *doomed_exp = NULL;
1447         struct obd_uuid doomed_uuid;
1448         int exports_evicted = 0;
1449
1450         spin_lock(&obd->obd_dev_lock);
1451         if (obd->obd_stopping) {
1452                 spin_unlock(&obd->obd_dev_lock);
1453                 return exports_evicted;
1454         }
1455         uuid_hash = obd->obd_uuid_hash;
1456         cfs_hash_getref(uuid_hash);
1457         spin_unlock(&obd->obd_dev_lock);
1458
1459         obd_str2uuid(&doomed_uuid, uuid);
1460         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1461                 CERROR("%s: can't evict myself\n", obd->obd_name);
1462                 cfs_hash_putref(uuid_hash);
1463                 return exports_evicted;
1464         }
1465
1466         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1467
1468         if (doomed_exp == NULL) {
1469                 CERROR("%s: can't disconnect %s: no exports found\n",
1470                        obd->obd_name, uuid);
1471         } else {
1472                 CWARN("%s: evicting %s at administrative request\n",
1473                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1474                 class_fail_export(doomed_exp);
1475                 class_export_put(doomed_exp);
1476                 exports_evicted++;
1477         }
1478         cfs_hash_putref(uuid_hash);
1479
1480         return exports_evicted;
1481 }
1482 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1483
1484 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1485 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1486 EXPORT_SYMBOL(class_export_dump_hook);
1487 #endif
1488
1489 static void print_export_data(struct obd_export *exp, const char *status,
1490                               int locks)
1491 {
1492         struct ptlrpc_reply_state *rs;
1493         struct ptlrpc_reply_state *first_reply = NULL;
1494         int nreplies = 0;
1495
1496         spin_lock(&exp->exp_lock);
1497         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1498                                 rs_exp_list) {
1499                 if (nreplies == 0)
1500                         first_reply = rs;
1501                 nreplies++;
1502         }
1503         spin_unlock(&exp->exp_lock);
1504
1505         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1506                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1507                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1508                atomic_read(&exp->exp_rpc_count),
1509                atomic_read(&exp->exp_cb_count),
1510                atomic_read(&exp->exp_locks_count),
1511                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1512                nreplies, first_reply, nreplies > 3 ? "..." : "",
1513                exp->exp_last_committed);
1514 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1515         if (locks && class_export_dump_hook != NULL)
1516                 class_export_dump_hook(exp);
1517 #endif
1518 }
1519
1520 void dump_exports(struct obd_device *obd, int locks)
1521 {
1522         struct obd_export *exp;
1523
1524         spin_lock(&obd->obd_dev_lock);
1525         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1526                 print_export_data(exp, "ACTIVE", locks);
1527         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1528                 print_export_data(exp, "UNLINKED", locks);
1529         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1530                 print_export_data(exp, "DELAYED", locks);
1531         spin_unlock(&obd->obd_dev_lock);
1532         spin_lock(&obd_zombie_impexp_lock);
1533         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1534                 print_export_data(exp, "ZOMBIE", locks);
1535         spin_unlock(&obd_zombie_impexp_lock);
1536 }
1537 EXPORT_SYMBOL(dump_exports);
1538
1539 void obd_exports_barrier(struct obd_device *obd)
1540 {
1541         int waited = 2;
1542         LASSERT(list_empty(&obd->obd_exports));
1543         spin_lock(&obd->obd_dev_lock);
1544         while (!list_empty(&obd->obd_unlinked_exports)) {
1545                 spin_unlock(&obd->obd_dev_lock);
1546                 set_current_state(TASK_UNINTERRUPTIBLE);
1547                 schedule_timeout(cfs_time_seconds(waited));
1548                 if (waited > 5 && IS_PO2(waited)) {
1549                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1550                                       "more than %d seconds. "
1551                                       "The obd refcount = %d. Is it stuck?\n",
1552                                       obd->obd_name, waited,
1553                                       atomic_read(&obd->obd_refcount));
1554                         dump_exports(obd, 1);
1555                 }
1556                 waited *= 2;
1557                 spin_lock(&obd->obd_dev_lock);
1558         }
1559         spin_unlock(&obd->obd_dev_lock);
1560 }
1561 EXPORT_SYMBOL(obd_exports_barrier);
1562
1563 /* Total amount of zombies to be destroyed */
1564 static int zombies_count = 0;
1565
1566 /**
1567  * kill zombie imports and exports
1568  */
1569 void obd_zombie_impexp_cull(void)
1570 {
1571         struct obd_import *import;
1572         struct obd_export *export;
1573
1574         do {
1575                 spin_lock(&obd_zombie_impexp_lock);
1576
1577                 import = NULL;
1578                 if (!list_empty(&obd_zombie_imports)) {
1579                         import = list_entry(obd_zombie_imports.next,
1580                                                 struct obd_import,
1581                                                 imp_zombie_chain);
1582                         list_del_init(&import->imp_zombie_chain);
1583                 }
1584
1585                 export = NULL;
1586                 if (!list_empty(&obd_zombie_exports)) {
1587                         export = list_entry(obd_zombie_exports.next,
1588                                                 struct obd_export,
1589                                                 exp_obd_chain);
1590                         list_del_init(&export->exp_obd_chain);
1591                 }
1592
1593                 spin_unlock(&obd_zombie_impexp_lock);
1594
1595                 if (import != NULL) {
1596                         class_import_destroy(import);
1597                         spin_lock(&obd_zombie_impexp_lock);
1598                         zombies_count--;
1599                         spin_unlock(&obd_zombie_impexp_lock);
1600                 }
1601
1602                 if (export != NULL) {
1603                         class_export_destroy(export);
1604                         spin_lock(&obd_zombie_impexp_lock);
1605                         zombies_count--;
1606                         spin_unlock(&obd_zombie_impexp_lock);
1607                 }
1608
1609                 cond_resched();
1610         } while (import != NULL || export != NULL);
1611 }
1612
1613 static struct completion        obd_zombie_start;
1614 static struct completion        obd_zombie_stop;
1615 static unsigned long            obd_zombie_flags;
1616 static wait_queue_head_t                obd_zombie_waitq;
1617 static pid_t                    obd_zombie_pid;
1618
1619 enum {
1620         OBD_ZOMBIE_STOP         = 0x0001,
1621 };
1622
1623 /**
1624  * check for work for kill zombie import/export thread.
1625  */
1626 static int obd_zombie_impexp_check(void *arg)
1627 {
1628         int rc;
1629
1630         spin_lock(&obd_zombie_impexp_lock);
1631         rc = (zombies_count == 0) &&
1632              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1633         spin_unlock(&obd_zombie_impexp_lock);
1634
1635         return rc;
1636 }
1637
1638 /**
1639  * Add export to the obd_zombie thread and notify it.
1640  */
1641 static void obd_zombie_export_add(struct obd_export *exp) {
1642         spin_lock(&exp->exp_obd->obd_dev_lock);
1643         LASSERT(!list_empty(&exp->exp_obd_chain));
1644         list_del_init(&exp->exp_obd_chain);
1645         spin_unlock(&exp->exp_obd->obd_dev_lock);
1646         spin_lock(&obd_zombie_impexp_lock);
1647         zombies_count++;
1648         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1649         spin_unlock(&obd_zombie_impexp_lock);
1650
1651         obd_zombie_impexp_notify();
1652 }
1653
1654 /**
1655  * Add import to the obd_zombie thread and notify it.
1656  */
1657 static void obd_zombie_import_add(struct obd_import *imp) {
1658         LASSERT(imp->imp_sec == NULL);
1659         LASSERT(imp->imp_rq_pool == NULL);
1660         spin_lock(&obd_zombie_impexp_lock);
1661         LASSERT(list_empty(&imp->imp_zombie_chain));
1662         zombies_count++;
1663         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1664         spin_unlock(&obd_zombie_impexp_lock);
1665
1666         obd_zombie_impexp_notify();
1667 }
1668
1669 /**
1670  * notify import/export destroy thread about new zombie.
1671  */
1672 static void obd_zombie_impexp_notify(void)
1673 {
1674         /*
1675          * Make sure obd_zombie_impexp_thread get this notification.
1676          * It is possible this signal only get by obd_zombie_barrier, and
1677          * barrier gulps this notification and sleeps away and hangs ensues
1678          */
1679         wake_up_all(&obd_zombie_waitq);
1680 }
1681
1682 /**
1683  * check whether obd_zombie is idle
1684  */
1685 static int obd_zombie_is_idle(void)
1686 {
1687         int rc;
1688
1689         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1690         spin_lock(&obd_zombie_impexp_lock);
1691         rc = (zombies_count == 0);
1692         spin_unlock(&obd_zombie_impexp_lock);
1693         return rc;
1694 }
1695
1696 /**
1697  * wait when obd_zombie import/export queues become empty
1698  */
1699 void obd_zombie_barrier(void)
1700 {
1701         struct l_wait_info lwi = { 0 };
1702
1703         if (obd_zombie_pid == current_pid())
1704                 /* don't wait for myself */
1705                 return;
1706         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1707 }
1708 EXPORT_SYMBOL(obd_zombie_barrier);
1709
1710
1711 /**
1712  * destroy zombie export/import thread.
1713  */
1714 static int obd_zombie_impexp_thread(void *unused)
1715 {
1716         unshare_fs_struct();
1717         complete(&obd_zombie_start);
1718
1719         obd_zombie_pid = current_pid();
1720
1721         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1722                 struct l_wait_info lwi = { 0 };
1723
1724                 l_wait_event(obd_zombie_waitq,
1725                              !obd_zombie_impexp_check(NULL), &lwi);
1726                 obd_zombie_impexp_cull();
1727
1728                 /*
1729                  * Notify obd_zombie_barrier callers that queues
1730                  * may be empty.
1731                  */
1732                 wake_up(&obd_zombie_waitq);
1733         }
1734
1735         complete(&obd_zombie_stop);
1736
1737         return 0;
1738 }
1739
1740
1741 /**
1742  * start destroy zombie import/export thread
1743  */
1744 int obd_zombie_impexp_init(void)
1745 {
1746         struct task_struct *task;
1747
1748         INIT_LIST_HEAD(&obd_zombie_imports);
1749         INIT_LIST_HEAD(&obd_zombie_exports);
1750         spin_lock_init(&obd_zombie_impexp_lock);
1751         init_completion(&obd_zombie_start);
1752         init_completion(&obd_zombie_stop);
1753         init_waitqueue_head(&obd_zombie_waitq);
1754         obd_zombie_pid = 0;
1755
1756         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1757         if (IS_ERR(task))
1758                 return PTR_ERR(task);
1759
1760         wait_for_completion(&obd_zombie_start);
1761         return 0;
1762 }
1763 /**
1764  * stop destroy zombie import/export thread
1765  */
1766 void obd_zombie_impexp_stop(void)
1767 {
1768         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1769         obd_zombie_impexp_notify();
1770         wait_for_completion(&obd_zombie_stop);
1771 }
1772
1773 /***** Kernel-userspace comm helpers *******/
1774
1775 /* Get length of entire message, including header */
1776 int kuc_len(int payload_len)
1777 {
1778         return sizeof(struct kuc_hdr) + payload_len;
1779 }
1780 EXPORT_SYMBOL(kuc_len);
1781
1782 /* Get a pointer to kuc header, given a ptr to the payload
1783  * @param p Pointer to payload area
1784  * @returns Pointer to kuc header
1785  */
1786 struct kuc_hdr *kuc_ptr(void *p)
1787 {
1788         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1789         LASSERT(lh->kuc_magic == KUC_MAGIC);
1790         return lh;
1791 }
1792 EXPORT_SYMBOL(kuc_ptr);
1793
1794 /* Test if payload is part of kuc message
1795  * @param p Pointer to payload area
1796  * @returns boolean
1797  */
1798 int kuc_ispayload(void *p)
1799 {
1800         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1801
1802         if (kh->kuc_magic == KUC_MAGIC)
1803                 return 1;
1804         else
1805                 return 0;
1806 }
1807 EXPORT_SYMBOL(kuc_ispayload);
1808
1809 /* Alloc space for a message, and fill in header
1810  * @return Pointer to payload area
1811  */
1812 void *kuc_alloc(int payload_len, int transport, int type)
1813 {
1814         struct kuc_hdr *lh;
1815         int len = kuc_len(payload_len);
1816
1817         OBD_ALLOC(lh, len);
1818         if (lh == NULL)
1819                 return ERR_PTR(-ENOMEM);
1820
1821         lh->kuc_magic = KUC_MAGIC;
1822         lh->kuc_transport = transport;
1823         lh->kuc_msgtype = type;
1824         lh->kuc_msglen = len;
1825
1826         return (void *)(lh + 1);
1827 }
1828 EXPORT_SYMBOL(kuc_alloc);
1829
1830 /* Takes pointer to payload area */
1831 inline void kuc_free(void *p, int payload_len)
1832 {
1833         struct kuc_hdr *lh = kuc_ptr(p);
1834         OBD_FREE(lh, kuc_len(payload_len));
1835 }
1836 EXPORT_SYMBOL(kuc_free);