Merge tag 'for-linus-20160801' of git://git.infradead.org/linux-mtd
[cascardo/linux.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include "../include/obd_class.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/lustre_kernelcomm.h"
42
43 spinlock_t obd_types_lock;
44
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
49
50 static struct list_head      obd_zombie_imports;
51 static struct list_head      obd_zombie_exports;
52 static spinlock_t  obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
56
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
59
60 /*
61  * support functions: we could use inter-module communication, but this
62  * is more portable to other OS's
63  */
64 static struct obd_device *obd_device_alloc(void)
65 {
66         struct obd_device *obd;
67
68         obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
69         if (obd)
70                 obd->obd_magic = OBD_DEVICE_MAGIC;
71         return obd;
72 }
73
74 static void obd_device_free(struct obd_device *obd)
75 {
76         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78         if (obd->obd_namespace) {
79                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80                        obd, obd->obd_namespace, obd->obd_force);
81                 LBUG();
82         }
83         lu_ref_fini(&obd->obd_reference);
84         kmem_cache_free(obd_device_cachep, obd);
85 }
86
87 static struct obd_type *class_search_type(const char *name)
88 {
89         struct list_head *tmp;
90         struct obd_type *type;
91
92         spin_lock(&obd_types_lock);
93         list_for_each(tmp, &obd_types) {
94                 type = list_entry(tmp, struct obd_type, typ_chain);
95                 if (strcmp(type->typ_name, name) == 0) {
96                         spin_unlock(&obd_types_lock);
97                         return type;
98                 }
99         }
100         spin_unlock(&obd_types_lock);
101         return NULL;
102 }
103
104 static struct obd_type *class_get_type(const char *name)
105 {
106         struct obd_type *type = class_search_type(name);
107
108         if (!type) {
109                 const char *modname = name;
110
111                 if (!request_module("%s", modname)) {
112                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113                         type = class_search_type(name);
114                 } else {
115                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
116                                            modname);
117                 }
118         }
119         if (type) {
120                 spin_lock(&type->obd_type_lock);
121                 type->typ_refcnt++;
122                 try_module_get(type->typ_dt_ops->owner);
123                 spin_unlock(&type->obd_type_lock);
124         }
125         return type;
126 }
127
128 void class_put_type(struct obd_type *type)
129 {
130         LASSERT(type);
131         spin_lock(&type->obd_type_lock);
132         type->typ_refcnt--;
133         module_put(type->typ_dt_ops->owner);
134         spin_unlock(&type->obd_type_lock);
135 }
136 EXPORT_SYMBOL(class_put_type);
137
138 #define CLASS_MAX_NAME 1024
139
140 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
141                         const char *name,
142                         struct lu_device_type *ldt)
143 {
144         struct obd_type *type;
145         int rc;
146
147         /* sanity check */
148         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
149
150         if (class_search_type(name)) {
151                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
152                 return -EEXIST;
153         }
154
155         rc = -ENOMEM;
156         type = kzalloc(sizeof(*type), GFP_NOFS);
157         if (!type)
158                 return rc;
159
160         type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
161         type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
162         type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
163
164         if (!type->typ_dt_ops ||
165             !type->typ_md_ops ||
166             !type->typ_name)
167                 goto failed;
168
169         *(type->typ_dt_ops) = *dt_ops;
170         /* md_ops is optional */
171         if (md_ops)
172                 *(type->typ_md_ops) = *md_ops;
173         strcpy(type->typ_name, name);
174         spin_lock_init(&type->obd_type_lock);
175
176         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
177                                                     debugfs_lustre_root,
178                                                     NULL, type);
179         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
180                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
181                                              : -ENOMEM;
182                 type->typ_debugfs_entry = NULL;
183                 goto failed;
184         }
185
186         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
187         if (!type->typ_kobj) {
188                 rc = -ENOMEM;
189                 goto failed;
190         }
191
192         if (ldt) {
193                 type->typ_lu = ldt;
194                 rc = lu_device_type_init(ldt);
195                 if (rc != 0)
196                         goto failed;
197         }
198
199         spin_lock(&obd_types_lock);
200         list_add(&type->typ_chain, &obd_types);
201         spin_unlock(&obd_types_lock);
202
203         return 0;
204
205  failed:
206         if (type->typ_kobj)
207                 kobject_put(type->typ_kobj);
208         kfree(type->typ_name);
209         kfree(type->typ_md_ops);
210         kfree(type->typ_dt_ops);
211         kfree(type);
212         return rc;
213 }
214 EXPORT_SYMBOL(class_register_type);
215
216 int class_unregister_type(const char *name)
217 {
218         struct obd_type *type = class_search_type(name);
219
220         if (!type) {
221                 CERROR("unknown obd type\n");
222                 return -EINVAL;
223         }
224
225         if (type->typ_refcnt) {
226                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
227                 /* This is a bad situation, let's make the best of it */
228                 /* Remove ops, but leave the name for debugging */
229                 kfree(type->typ_dt_ops);
230                 kfree(type->typ_md_ops);
231                 return -EBUSY;
232         }
233
234         if (type->typ_kobj)
235                 kobject_put(type->typ_kobj);
236
237         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
238                 ldebugfs_remove(&type->typ_debugfs_entry);
239
240         if (type->typ_lu)
241                 lu_device_type_fini(type->typ_lu);
242
243         spin_lock(&obd_types_lock);
244         list_del(&type->typ_chain);
245         spin_unlock(&obd_types_lock);
246         kfree(type->typ_name);
247         kfree(type->typ_dt_ops);
248         kfree(type->typ_md_ops);
249         kfree(type);
250         return 0;
251 } /* class_unregister_type */
252 EXPORT_SYMBOL(class_unregister_type);
253
254 /**
255  * Create a new obd device.
256  *
257  * Find an empty slot in ::obd_devs[], create a new obd device in it.
258  *
259  * \param[in] type_name obd device type string.
260  * \param[in] name      obd device name.
261  *
262  * \retval NULL if create fails, otherwise return the obd device
263  *       pointer created.
264  */
265 struct obd_device *class_newdev(const char *type_name, const char *name)
266 {
267         struct obd_device *result = NULL;
268         struct obd_device *newdev;
269         struct obd_type *type = NULL;
270         int i;
271         int new_obd_minor = 0;
272
273         if (strlen(name) >= MAX_OBD_NAME) {
274                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
275                 return ERR_PTR(-EINVAL);
276         }
277
278         type = class_get_type(type_name);
279         if (!type) {
280                 CERROR("OBD: unknown type: %s\n", type_name);
281                 return ERR_PTR(-ENODEV);
282         }
283
284         newdev = obd_device_alloc();
285         if (!newdev) {
286                 result = ERR_PTR(-ENOMEM);
287                 goto out_type;
288         }
289
290         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
291
292         write_lock(&obd_dev_lock);
293         for (i = 0; i < class_devno_max(); i++) {
294                 struct obd_device *obd = class_num2obd(i);
295
296                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
297                         CERROR("Device %s already exists at %d, won't add\n",
298                                name, i);
299                         if (result) {
300                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
301                                          "%p obd_magic %08x != %08x\n", result,
302                                          result->obd_magic, OBD_DEVICE_MAGIC);
303                                 LASSERTF(result->obd_minor == new_obd_minor,
304                                          "%p obd_minor %d != %d\n", result,
305                                          result->obd_minor, new_obd_minor);
306
307                                 obd_devs[result->obd_minor] = NULL;
308                                 result->obd_name[0] = '\0';
309                          }
310                         result = ERR_PTR(-EEXIST);
311                         break;
312                 }
313                 if (!result && !obd) {
314                         result = newdev;
315                         result->obd_minor = i;
316                         new_obd_minor = i;
317                         result->obd_type = type;
318                         strncpy(result->obd_name, name,
319                                 sizeof(result->obd_name) - 1);
320                         obd_devs[i] = result;
321                 }
322         }
323         write_unlock(&obd_dev_lock);
324
325         if (!result && i >= class_devno_max()) {
326                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
327                        class_devno_max());
328                 result = ERR_PTR(-EOVERFLOW);
329                 goto out;
330         }
331
332         if (IS_ERR(result))
333                 goto out;
334
335         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
336                result->obd_name, result);
337
338         return result;
339 out:
340         obd_device_free(newdev);
341 out_type:
342         class_put_type(type);
343         return result;
344 }
345
346 void class_release_dev(struct obd_device *obd)
347 {
348         struct obd_type *obd_type = obd->obd_type;
349
350         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
351                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
352         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
353                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
354         LASSERT(obd_type);
355
356         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
357                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
358
359         write_lock(&obd_dev_lock);
360         obd_devs[obd->obd_minor] = NULL;
361         write_unlock(&obd_dev_lock);
362         obd_device_free(obd);
363
364         class_put_type(obd_type);
365 }
366
367 int class_name2dev(const char *name)
368 {
369         int i;
370
371         if (!name)
372                 return -1;
373
374         read_lock(&obd_dev_lock);
375         for (i = 0; i < class_devno_max(); i++) {
376                 struct obd_device *obd = class_num2obd(i);
377
378                 if (obd && strcmp(name, obd->obd_name) == 0) {
379                         /* Make sure we finished attaching before we give
380                          * out any references
381                          */
382                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
383                         if (obd->obd_attached) {
384                                 read_unlock(&obd_dev_lock);
385                                 return i;
386                         }
387                         break;
388                 }
389         }
390         read_unlock(&obd_dev_lock);
391
392         return -1;
393 }
394 EXPORT_SYMBOL(class_name2dev);
395
396 struct obd_device *class_name2obd(const char *name)
397 {
398         int dev = class_name2dev(name);
399
400         if (dev < 0 || dev > class_devno_max())
401                 return NULL;
402         return class_num2obd(dev);
403 }
404 EXPORT_SYMBOL(class_name2obd);
405
406 int class_uuid2dev(struct obd_uuid *uuid)
407 {
408         int i;
409
410         read_lock(&obd_dev_lock);
411         for (i = 0; i < class_devno_max(); i++) {
412                 struct obd_device *obd = class_num2obd(i);
413
414                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
415                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
416                         read_unlock(&obd_dev_lock);
417                         return i;
418                 }
419         }
420         read_unlock(&obd_dev_lock);
421
422         return -1;
423 }
424 EXPORT_SYMBOL(class_uuid2dev);
425
426 /**
427  * Get obd device from ::obd_devs[]
428  *
429  * \param num [in] array index
430  *
431  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432  *       otherwise return the obd device there.
433  */
434 struct obd_device *class_num2obd(int num)
435 {
436         struct obd_device *obd = NULL;
437
438         if (num < class_devno_max()) {
439                 obd = obd_devs[num];
440                 if (!obd)
441                         return NULL;
442
443                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444                          "%p obd_magic %08x != %08x\n",
445                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446                 LASSERTF(obd->obd_minor == num,
447                          "%p obd_minor %0d != %0d\n",
448                          obd, obd->obd_minor, num);
449         }
450
451         return obd;
452 }
453 EXPORT_SYMBOL(class_num2obd);
454
455 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
456  * specified, then only the client with that uuid is returned,
457  * otherwise any client connected to the tgt is returned.
458  */
459 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
460                                          const char *typ_name,
461                                          struct obd_uuid *grp_uuid)
462 {
463         int i;
464
465         read_lock(&obd_dev_lock);
466         for (i = 0; i < class_devno_max(); i++) {
467                 struct obd_device *obd = class_num2obd(i);
468
469                 if (!obd)
470                         continue;
471                 if ((strncmp(obd->obd_type->typ_name, typ_name,
472                              strlen(typ_name)) == 0)) {
473                         if (obd_uuid_equals(tgt_uuid,
474                                             &obd->u.cli.cl_target_uuid) &&
475                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
476                                                          &obd->obd_uuid) : 1)) {
477                                 read_unlock(&obd_dev_lock);
478                                 return obd;
479                         }
480                 }
481         }
482         read_unlock(&obd_dev_lock);
483
484         return NULL;
485 }
486 EXPORT_SYMBOL(class_find_client_obd);
487
488 /* Iterate the obd_device list looking devices have grp_uuid. Start
489  * searching at *next, and if a device is found, the next index to look
490  * at is saved in *next. If next is NULL, then the first matching device
491  * will always be returned.
492  */
493 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
494 {
495         int i;
496
497         if (!next)
498                 i = 0;
499         else if (*next >= 0 && *next < class_devno_max())
500                 i = *next;
501         else
502                 return NULL;
503
504         read_lock(&obd_dev_lock);
505         for (; i < class_devno_max(); i++) {
506                 struct obd_device *obd = class_num2obd(i);
507
508                 if (!obd)
509                         continue;
510                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
511                         if (next)
512                                 *next = i+1;
513                         read_unlock(&obd_dev_lock);
514                         return obd;
515                 }
516         }
517         read_unlock(&obd_dev_lock);
518
519         return NULL;
520 }
521 EXPORT_SYMBOL(class_devices_in_group);
522
523 /**
524  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
525  * adjust sptlrpc settings accordingly.
526  */
527 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
528 {
529         struct obd_device  *obd;
530         const char       *type;
531         int              i, rc = 0, rc2;
532
533         LASSERT(namelen > 0);
534
535         read_lock(&obd_dev_lock);
536         for (i = 0; i < class_devno_max(); i++) {
537                 obd = class_num2obd(i);
538
539                 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
540                         continue;
541
542                 /* only notify mdc, osc, mdt, ost */
543                 type = obd->obd_type->typ_name;
544                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
545                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
546                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
547                     strcmp(type, LUSTRE_OST_NAME) != 0)
548                         continue;
549
550                 if (strncmp(obd->obd_name, fsname, namelen))
551                         continue;
552
553                 class_incref(obd, __func__, obd);
554                 read_unlock(&obd_dev_lock);
555                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
556                                          sizeof(KEY_SPTLRPC_CONF),
557                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
558                 rc = rc ? rc : rc2;
559                 class_decref(obd, __func__, obd);
560                 read_lock(&obd_dev_lock);
561         }
562         read_unlock(&obd_dev_lock);
563         return rc;
564 }
565 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
566
567 void obd_cleanup_caches(void)
568 {
569         kmem_cache_destroy(obd_device_cachep);
570         obd_device_cachep = NULL;
571         kmem_cache_destroy(obdo_cachep);
572         obdo_cachep = NULL;
573         kmem_cache_destroy(import_cachep);
574         import_cachep = NULL;
575 }
576
577 int obd_init_caches(void)
578 {
579         LASSERT(!obd_device_cachep);
580         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
581                                               sizeof(struct obd_device),
582                                               0, 0, NULL);
583         if (!obd_device_cachep)
584                 goto out;
585
586         LASSERT(!obdo_cachep);
587         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
588                                         0, 0, NULL);
589         if (!obdo_cachep)
590                 goto out;
591
592         LASSERT(!import_cachep);
593         import_cachep = kmem_cache_create("ll_import_cache",
594                                           sizeof(struct obd_import),
595                                           0, 0, NULL);
596         if (!import_cachep)
597                 goto out;
598
599         return 0;
600  out:
601         obd_cleanup_caches();
602         return -ENOMEM;
603 }
604
605 /* map connection to client */
606 struct obd_export *class_conn2export(struct lustre_handle *conn)
607 {
608         struct obd_export *export;
609
610         if (!conn) {
611                 CDEBUG(D_CACHE, "looking for null handle\n");
612                 return NULL;
613         }
614
615         if (conn->cookie == -1) {  /* this means assign a new connection */
616                 CDEBUG(D_CACHE, "want a new connection\n");
617                 return NULL;
618         }
619
620         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
621         export = class_handle2object(conn->cookie);
622         return export;
623 }
624 EXPORT_SYMBOL(class_conn2export);
625
626 struct obd_device *class_exp2obd(struct obd_export *exp)
627 {
628         if (exp)
629                 return exp->exp_obd;
630         return NULL;
631 }
632 EXPORT_SYMBOL(class_exp2obd);
633
634 struct obd_import *class_exp2cliimp(struct obd_export *exp)
635 {
636         struct obd_device *obd = exp->exp_obd;
637
638         if (!obd)
639                 return NULL;
640         return obd->u.cli.cl_import;
641 }
642 EXPORT_SYMBOL(class_exp2cliimp);
643
644 /* Export management functions */
645 static void class_export_destroy(struct obd_export *exp)
646 {
647         struct obd_device *obd = exp->exp_obd;
648
649         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
650         LASSERT(obd);
651
652         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
653                exp->exp_client_uuid.uuid, obd->obd_name);
654
655         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
656         if (exp->exp_connection)
657                 ptlrpc_put_connection_superhack(exp->exp_connection);
658
659         LASSERT(list_empty(&exp->exp_outstanding_replies));
660         LASSERT(list_empty(&exp->exp_uncommitted_replies));
661         LASSERT(list_empty(&exp->exp_req_replay_queue));
662         LASSERT(list_empty(&exp->exp_hp_rpcs));
663         obd_destroy_export(exp);
664         class_decref(obd, "export", exp);
665
666         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
667 }
668
669 static void export_handle_addref(void *export)
670 {
671         class_export_get(export);
672 }
673
674 static struct portals_handle_ops export_handle_ops = {
675         .hop_addref = export_handle_addref,
676         .hop_free   = NULL,
677 };
678
679 struct obd_export *class_export_get(struct obd_export *exp)
680 {
681         atomic_inc(&exp->exp_refcount);
682         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
683                atomic_read(&exp->exp_refcount));
684         return exp;
685 }
686 EXPORT_SYMBOL(class_export_get);
687
688 void class_export_put(struct obd_export *exp)
689 {
690         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
691         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
692                atomic_read(&exp->exp_refcount) - 1);
693
694         if (atomic_dec_and_test(&exp->exp_refcount)) {
695                 LASSERT(!list_empty(&exp->exp_obd_chain));
696                 CDEBUG(D_IOCTL, "final put %p/%s\n",
697                        exp, exp->exp_client_uuid.uuid);
698
699                 /* release nid stat refererence */
700                 lprocfs_exp_cleanup(exp);
701
702                 obd_zombie_export_add(exp);
703         }
704 }
705 EXPORT_SYMBOL(class_export_put);
706
707 /* Creates a new export, adds it to the hash table, and returns a
708  * pointer to it. The refcount is 2: one for the hash reference, and
709  * one for the pointer returned by this function.
710  */
711 struct obd_export *class_new_export(struct obd_device *obd,
712                                     struct obd_uuid *cluuid)
713 {
714         struct obd_export *export;
715         struct cfs_hash *hash = NULL;
716         int rc = 0;
717
718         export = kzalloc(sizeof(*export), GFP_NOFS);
719         if (!export)
720                 return ERR_PTR(-ENOMEM);
721
722         export->exp_conn_cnt = 0;
723         export->exp_lock_hash = NULL;
724         export->exp_flock_hash = NULL;
725         atomic_set(&export->exp_refcount, 2);
726         atomic_set(&export->exp_rpc_count, 0);
727         atomic_set(&export->exp_cb_count, 0);
728         atomic_set(&export->exp_locks_count, 0);
729 #if LUSTRE_TRACKS_LOCK_EXP_REFS
730         INIT_LIST_HEAD(&export->exp_locks_list);
731         spin_lock_init(&export->exp_locks_list_guard);
732 #endif
733         atomic_set(&export->exp_replay_count, 0);
734         export->exp_obd = obd;
735         INIT_LIST_HEAD(&export->exp_outstanding_replies);
736         spin_lock_init(&export->exp_uncommitted_replies_lock);
737         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
738         INIT_LIST_HEAD(&export->exp_req_replay_queue);
739         INIT_LIST_HEAD(&export->exp_handle.h_link);
740         INIT_LIST_HEAD(&export->exp_hp_rpcs);
741         class_handle_hash(&export->exp_handle, &export_handle_ops);
742         spin_lock_init(&export->exp_lock);
743         spin_lock_init(&export->exp_rpc_lock);
744         INIT_HLIST_NODE(&export->exp_uuid_hash);
745         spin_lock_init(&export->exp_bl_list_lock);
746         INIT_LIST_HEAD(&export->exp_bl_list);
747
748         export->exp_sp_peer = LUSTRE_SP_ANY;
749         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
750         export->exp_client_uuid = *cluuid;
751         obd_init_export(export);
752
753         spin_lock(&obd->obd_dev_lock);
754         /* shouldn't happen, but might race */
755         if (obd->obd_stopping) {
756                 rc = -ENODEV;
757                 goto exit_unlock;
758         }
759
760         hash = cfs_hash_getref(obd->obd_uuid_hash);
761         if (!hash) {
762                 rc = -ENODEV;
763                 goto exit_unlock;
764         }
765         spin_unlock(&obd->obd_dev_lock);
766
767         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
768                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
769                 if (rc != 0) {
770                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
771                                       obd->obd_name, cluuid->uuid, rc);
772                         rc = -EALREADY;
773                         goto exit_err;
774                 }
775         }
776
777         spin_lock(&obd->obd_dev_lock);
778         if (obd->obd_stopping) {
779                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
780                 rc = -ENODEV;
781                 goto exit_unlock;
782         }
783
784         class_incref(obd, "export", export);
785         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
786         export->exp_obd->obd_num_exports++;
787         spin_unlock(&obd->obd_dev_lock);
788         cfs_hash_putref(hash);
789         return export;
790
791 exit_unlock:
792         spin_unlock(&obd->obd_dev_lock);
793 exit_err:
794         if (hash)
795                 cfs_hash_putref(hash);
796         class_handle_unhash(&export->exp_handle);
797         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
798         obd_destroy_export(export);
799         kfree(export);
800         return ERR_PTR(rc);
801 }
802 EXPORT_SYMBOL(class_new_export);
803
804 void class_unlink_export(struct obd_export *exp)
805 {
806         class_handle_unhash(&exp->exp_handle);
807
808         spin_lock(&exp->exp_obd->obd_dev_lock);
809         /* delete an uuid-export hashitem from hashtables */
810         if (!hlist_unhashed(&exp->exp_uuid_hash))
811                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
812                              &exp->exp_client_uuid,
813                              &exp->exp_uuid_hash);
814
815         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
816         exp->exp_obd->obd_num_exports--;
817         spin_unlock(&exp->exp_obd->obd_dev_lock);
818         class_export_put(exp);
819 }
820 EXPORT_SYMBOL(class_unlink_export);
821
822 /* Import management functions */
823 static void class_import_destroy(struct obd_import *imp)
824 {
825         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
826                imp->imp_obd->obd_name);
827
828         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
829
830         ptlrpc_put_connection_superhack(imp->imp_connection);
831
832         while (!list_empty(&imp->imp_conn_list)) {
833                 struct obd_import_conn *imp_conn;
834
835                 imp_conn = list_entry(imp->imp_conn_list.next,
836                                       struct obd_import_conn, oic_item);
837                 list_del_init(&imp_conn->oic_item);
838                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
839                 kfree(imp_conn);
840         }
841
842         LASSERT(!imp->imp_sec);
843         class_decref(imp->imp_obd, "import", imp);
844         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
845 }
846
847 static void import_handle_addref(void *import)
848 {
849         class_import_get(import);
850 }
851
852 static struct portals_handle_ops import_handle_ops = {
853         .hop_addref = import_handle_addref,
854         .hop_free   = NULL,
855 };
856
857 struct obd_import *class_import_get(struct obd_import *import)
858 {
859         atomic_inc(&import->imp_refcount);
860         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
861                atomic_read(&import->imp_refcount),
862                import->imp_obd->obd_name);
863         return import;
864 }
865 EXPORT_SYMBOL(class_import_get);
866
867 void class_import_put(struct obd_import *imp)
868 {
869         LASSERT(list_empty(&imp->imp_zombie_chain));
870         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
871
872         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
873                atomic_read(&imp->imp_refcount) - 1,
874                imp->imp_obd->obd_name);
875
876         if (atomic_dec_and_test(&imp->imp_refcount)) {
877                 CDEBUG(D_INFO, "final put import %p\n", imp);
878                 obd_zombie_import_add(imp);
879         }
880
881         /* catch possible import put race */
882         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
883 }
884 EXPORT_SYMBOL(class_import_put);
885
886 static void init_imp_at(struct imp_at *at)
887 {
888         int i;
889
890         at_init(&at->iat_net_latency, 0, 0);
891         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892                 /* max service estimates are tracked on the server side, so
893                  * don't use the AT history here, just use the last reported
894                  * val. (But keep hist for proc histogram, worst_ever)
895                  */
896                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
897                         AT_FLG_NOHIST);
898         }
899 }
900
901 struct obd_import *class_new_import(struct obd_device *obd)
902 {
903         struct obd_import *imp;
904
905         imp = kzalloc(sizeof(*imp), GFP_NOFS);
906         if (!imp)
907                 return NULL;
908
909         INIT_LIST_HEAD(&imp->imp_pinger_chain);
910         INIT_LIST_HEAD(&imp->imp_zombie_chain);
911         INIT_LIST_HEAD(&imp->imp_replay_list);
912         INIT_LIST_HEAD(&imp->imp_sending_list);
913         INIT_LIST_HEAD(&imp->imp_delayed_list);
914         INIT_LIST_HEAD(&imp->imp_committed_list);
915         imp->imp_replay_cursor = &imp->imp_committed_list;
916         spin_lock_init(&imp->imp_lock);
917         imp->imp_last_success_conn = 0;
918         imp->imp_state = LUSTRE_IMP_NEW;
919         imp->imp_obd = class_incref(obd, "import", imp);
920         mutex_init(&imp->imp_sec_mutex);
921         init_waitqueue_head(&imp->imp_recovery_waitq);
922
923         atomic_set(&imp->imp_refcount, 2);
924         atomic_set(&imp->imp_unregistering, 0);
925         atomic_set(&imp->imp_inflight, 0);
926         atomic_set(&imp->imp_replay_inflight, 0);
927         atomic_set(&imp->imp_inval_count, 0);
928         INIT_LIST_HEAD(&imp->imp_conn_list);
929         INIT_LIST_HEAD(&imp->imp_handle.h_link);
930         class_handle_hash(&imp->imp_handle, &import_handle_ops);
931         init_imp_at(&imp->imp_at);
932
933         /* the default magic is V2, will be used in connect RPC, and
934          * then adjusted according to the flags in request/reply.
935          */
936         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
937
938         return imp;
939 }
940 EXPORT_SYMBOL(class_new_import);
941
942 void class_destroy_import(struct obd_import *import)
943 {
944         LASSERT(import);
945         LASSERT(import != LP_POISON);
946
947         class_handle_unhash(&import->imp_handle);
948
949         spin_lock(&import->imp_lock);
950         import->imp_generation++;
951         spin_unlock(&import->imp_lock);
952         class_import_put(import);
953 }
954 EXPORT_SYMBOL(class_destroy_import);
955
956 #if LUSTRE_TRACKS_LOCK_EXP_REFS
957
958 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
959 {
960         spin_lock(&exp->exp_locks_list_guard);
961
962         LASSERT(lock->l_exp_refs_nr >= 0);
963
964         if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
965                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
966                               exp, lock, lock->l_exp_refs_target);
967         }
968         if ((lock->l_exp_refs_nr++) == 0) {
969                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
970                 lock->l_exp_refs_target = exp;
971         }
972         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
973                lock, exp, lock->l_exp_refs_nr);
974         spin_unlock(&exp->exp_locks_list_guard);
975 }
976 EXPORT_SYMBOL(__class_export_add_lock_ref);
977
978 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
979 {
980         spin_lock(&exp->exp_locks_list_guard);
981         LASSERT(lock->l_exp_refs_nr > 0);
982         if (lock->l_exp_refs_target != exp) {
983                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
984                               lock, lock->l_exp_refs_target, exp);
985         }
986         if (-- lock->l_exp_refs_nr == 0) {
987                 list_del_init(&lock->l_exp_refs_link);
988                 lock->l_exp_refs_target = NULL;
989         }
990         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
991                lock, exp, lock->l_exp_refs_nr);
992         spin_unlock(&exp->exp_locks_list_guard);
993 }
994 EXPORT_SYMBOL(__class_export_del_lock_ref);
995 #endif
996
997 /* A connection defines an export context in which preallocation can
998  * be managed. This releases the export pointer reference, and returns
999  * the export handle, so the export refcount is 1 when this function
1000  * returns.
1001  */
1002 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1003                   struct obd_uuid *cluuid)
1004 {
1005         struct obd_export *export;
1006
1007         LASSERT(conn);
1008         LASSERT(obd);
1009         LASSERT(cluuid);
1010
1011         export = class_new_export(obd, cluuid);
1012         if (IS_ERR(export))
1013                 return PTR_ERR(export);
1014
1015         conn->cookie = export->exp_handle.h_cookie;
1016         class_export_put(export);
1017
1018         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1019                cluuid->uuid, conn->cookie);
1020         return 0;
1021 }
1022 EXPORT_SYMBOL(class_connect);
1023
1024 /* This function removes 1-3 references from the export:
1025  * 1 - for export pointer passed
1026  * and if disconnect really need
1027  * 2 - removing from hash
1028  * 3 - in client_unlink_export
1029  * The export pointer passed to this function can destroyed
1030  */
1031 int class_disconnect(struct obd_export *export)
1032 {
1033         int already_disconnected;
1034
1035         if (!export) {
1036                 CWARN("attempting to free NULL export %p\n", export);
1037                 return -EINVAL;
1038         }
1039
1040         spin_lock(&export->exp_lock);
1041         already_disconnected = export->exp_disconnected;
1042         export->exp_disconnected = 1;
1043         spin_unlock(&export->exp_lock);
1044
1045         /* class_cleanup(), abort_recovery(), and class_fail_export()
1046          * all end up in here, and if any of them race we shouldn't
1047          * call extra class_export_puts().
1048          */
1049         if (already_disconnected)
1050                 goto no_disconn;
1051
1052         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1053                export->exp_handle.h_cookie);
1054
1055         class_unlink_export(export);
1056 no_disconn:
1057         class_export_put(export);
1058         return 0;
1059 }
1060 EXPORT_SYMBOL(class_disconnect);
1061
1062 void class_fail_export(struct obd_export *exp)
1063 {
1064         int rc, already_failed;
1065
1066         spin_lock(&exp->exp_lock);
1067         already_failed = exp->exp_failed;
1068         exp->exp_failed = 1;
1069         spin_unlock(&exp->exp_lock);
1070
1071         if (already_failed) {
1072                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1073                        exp, exp->exp_client_uuid.uuid);
1074                 return;
1075         }
1076
1077         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1078                exp, exp->exp_client_uuid.uuid);
1079
1080         if (obd_dump_on_timeout)
1081                 libcfs_debug_dumplog();
1082
1083         /* need for safe call CDEBUG after obd_disconnect */
1084         class_export_get(exp);
1085
1086         /* Most callers into obd_disconnect are removing their own reference
1087          * (request, for example) in addition to the one from the hash table.
1088          * We don't have such a reference here, so make one.
1089          */
1090         class_export_get(exp);
1091         rc = obd_disconnect(exp);
1092         if (rc)
1093                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1094         else
1095                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1096                        exp, exp->exp_client_uuid.uuid);
1097         class_export_put(exp);
1098 }
1099 EXPORT_SYMBOL(class_fail_export);
1100
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1102 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1103 EXPORT_SYMBOL(class_export_dump_hook);
1104 #endif
1105
1106 /* Total amount of zombies to be destroyed */
1107 static int zombies_count;
1108
1109 /**
1110  * kill zombie imports and exports
1111  */
1112 static void obd_zombie_impexp_cull(void)
1113 {
1114         struct obd_import *import;
1115         struct obd_export *export;
1116
1117         do {
1118                 spin_lock(&obd_zombie_impexp_lock);
1119
1120                 import = NULL;
1121                 if (!list_empty(&obd_zombie_imports)) {
1122                         import = list_entry(obd_zombie_imports.next,
1123                                             struct obd_import,
1124                                             imp_zombie_chain);
1125                         list_del_init(&import->imp_zombie_chain);
1126                 }
1127
1128                 export = NULL;
1129                 if (!list_empty(&obd_zombie_exports)) {
1130                         export = list_entry(obd_zombie_exports.next,
1131                                             struct obd_export,
1132                                             exp_obd_chain);
1133                         list_del_init(&export->exp_obd_chain);
1134                 }
1135
1136                 spin_unlock(&obd_zombie_impexp_lock);
1137
1138                 if (import) {
1139                         class_import_destroy(import);
1140                         spin_lock(&obd_zombie_impexp_lock);
1141                         zombies_count--;
1142                         spin_unlock(&obd_zombie_impexp_lock);
1143                 }
1144
1145                 if (export) {
1146                         class_export_destroy(export);
1147                         spin_lock(&obd_zombie_impexp_lock);
1148                         zombies_count--;
1149                         spin_unlock(&obd_zombie_impexp_lock);
1150                 }
1151
1152                 cond_resched();
1153         } while (import || export);
1154 }
1155
1156 static struct completion        obd_zombie_start;
1157 static struct completion        obd_zombie_stop;
1158 static unsigned long            obd_zombie_flags;
1159 static wait_queue_head_t                obd_zombie_waitq;
1160 static pid_t                    obd_zombie_pid;
1161
1162 enum {
1163         OBD_ZOMBIE_STOP         = 0x0001,
1164 };
1165
1166 /**
1167  * check for work for kill zombie import/export thread.
1168  */
1169 static int obd_zombie_impexp_check(void *arg)
1170 {
1171         int rc;
1172
1173         spin_lock(&obd_zombie_impexp_lock);
1174         rc = (zombies_count == 0) &&
1175              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1176         spin_unlock(&obd_zombie_impexp_lock);
1177
1178         return rc;
1179 }
1180
1181 /**
1182  * Add export to the obd_zombie thread and notify it.
1183  */
1184 static void obd_zombie_export_add(struct obd_export *exp)
1185 {
1186         spin_lock(&exp->exp_obd->obd_dev_lock);
1187         LASSERT(!list_empty(&exp->exp_obd_chain));
1188         list_del_init(&exp->exp_obd_chain);
1189         spin_unlock(&exp->exp_obd->obd_dev_lock);
1190         spin_lock(&obd_zombie_impexp_lock);
1191         zombies_count++;
1192         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1193         spin_unlock(&obd_zombie_impexp_lock);
1194
1195         obd_zombie_impexp_notify();
1196 }
1197
1198 /**
1199  * Add import to the obd_zombie thread and notify it.
1200  */
1201 static void obd_zombie_import_add(struct obd_import *imp)
1202 {
1203         LASSERT(!imp->imp_sec);
1204         spin_lock(&obd_zombie_impexp_lock);
1205         LASSERT(list_empty(&imp->imp_zombie_chain));
1206         zombies_count++;
1207         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1208         spin_unlock(&obd_zombie_impexp_lock);
1209
1210         obd_zombie_impexp_notify();
1211 }
1212
1213 /**
1214  * notify import/export destroy thread about new zombie.
1215  */
1216 static void obd_zombie_impexp_notify(void)
1217 {
1218         /*
1219          * Make sure obd_zombie_impexp_thread get this notification.
1220          * It is possible this signal only get by obd_zombie_barrier, and
1221          * barrier gulps this notification and sleeps away and hangs ensues
1222          */
1223         wake_up_all(&obd_zombie_waitq);
1224 }
1225
1226 /**
1227  * check whether obd_zombie is idle
1228  */
1229 static int obd_zombie_is_idle(void)
1230 {
1231         int rc;
1232
1233         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1234         spin_lock(&obd_zombie_impexp_lock);
1235         rc = (zombies_count == 0);
1236         spin_unlock(&obd_zombie_impexp_lock);
1237         return rc;
1238 }
1239
1240 /**
1241  * wait when obd_zombie import/export queues become empty
1242  */
1243 void obd_zombie_barrier(void)
1244 {
1245         struct l_wait_info lwi = { 0 };
1246
1247         if (obd_zombie_pid == current_pid())
1248                 /* don't wait for myself */
1249                 return;
1250         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1251 }
1252 EXPORT_SYMBOL(obd_zombie_barrier);
1253
1254 /**
1255  * destroy zombie export/import thread.
1256  */
1257 static int obd_zombie_impexp_thread(void *unused)
1258 {
1259         unshare_fs_struct();
1260         complete(&obd_zombie_start);
1261
1262         obd_zombie_pid = current_pid();
1263
1264         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1265                 struct l_wait_info lwi = { 0 };
1266
1267                 l_wait_event(obd_zombie_waitq,
1268                              !obd_zombie_impexp_check(NULL), &lwi);
1269                 obd_zombie_impexp_cull();
1270
1271                 /*
1272                  * Notify obd_zombie_barrier callers that queues
1273                  * may be empty.
1274                  */
1275                 wake_up(&obd_zombie_waitq);
1276         }
1277
1278         complete(&obd_zombie_stop);
1279
1280         return 0;
1281 }
1282
1283 /**
1284  * start destroy zombie import/export thread
1285  */
1286 int obd_zombie_impexp_init(void)
1287 {
1288         struct task_struct *task;
1289
1290         INIT_LIST_HEAD(&obd_zombie_imports);
1291         INIT_LIST_HEAD(&obd_zombie_exports);
1292         spin_lock_init(&obd_zombie_impexp_lock);
1293         init_completion(&obd_zombie_start);
1294         init_completion(&obd_zombie_stop);
1295         init_waitqueue_head(&obd_zombie_waitq);
1296         obd_zombie_pid = 0;
1297
1298         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1299         if (IS_ERR(task))
1300                 return PTR_ERR(task);
1301
1302         wait_for_completion(&obd_zombie_start);
1303         return 0;
1304 }
1305
1306 /**
1307  * stop destroy zombie import/export thread
1308  */
1309 void obd_zombie_impexp_stop(void)
1310 {
1311         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1312         obd_zombie_impexp_notify();
1313         wait_for_completion(&obd_zombie_stop);
1314 }