4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include "../include/obd_class.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/lustre_kernelcomm.h"
43 spinlock_t obd_types_lock;
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
50 static struct list_head obd_zombie_imports;
51 static struct list_head obd_zombie_exports;
52 static spinlock_t obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
61 * support functions: we could use inter-module communication, but this
62 * is more portable to other OS's
64 static struct obd_device *obd_device_alloc(void)
66 struct obd_device *obd;
68 obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
70 obd->obd_magic = OBD_DEVICE_MAGIC;
74 static void obd_device_free(struct obd_device *obd)
76 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78 if (obd->obd_namespace) {
79 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80 obd, obd->obd_namespace, obd->obd_force);
83 lu_ref_fini(&obd->obd_reference);
84 kmem_cache_free(obd_device_cachep, obd);
87 static struct obd_type *class_search_type(const char *name)
89 struct list_head *tmp;
90 struct obd_type *type;
92 spin_lock(&obd_types_lock);
93 list_for_each(tmp, &obd_types) {
94 type = list_entry(tmp, struct obd_type, typ_chain);
95 if (strcmp(type->typ_name, name) == 0) {
96 spin_unlock(&obd_types_lock);
100 spin_unlock(&obd_types_lock);
104 static struct obd_type *class_get_type(const char *name)
106 struct obd_type *type = class_search_type(name);
109 const char *modname = name;
111 if (!request_module("%s", modname)) {
112 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113 type = class_search_type(name);
115 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
120 spin_lock(&type->obd_type_lock);
122 try_module_get(type->typ_dt_ops->owner);
123 spin_unlock(&type->obd_type_lock);
128 void class_put_type(struct obd_type *type)
131 spin_lock(&type->obd_type_lock);
133 module_put(type->typ_dt_ops->owner);
134 spin_unlock(&type->obd_type_lock);
136 EXPORT_SYMBOL(class_put_type);
138 #define CLASS_MAX_NAME 1024
140 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
142 struct lu_device_type *ldt)
144 struct obd_type *type;
148 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
150 if (class_search_type(name)) {
151 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
156 type = kzalloc(sizeof(*type), GFP_NOFS);
160 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
161 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
162 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
164 if (!type->typ_dt_ops ||
169 *(type->typ_dt_ops) = *dt_ops;
170 /* md_ops is optional */
172 *(type->typ_md_ops) = *md_ops;
173 strcpy(type->typ_name, name);
174 spin_lock_init(&type->obd_type_lock);
176 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
179 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
180 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
182 type->typ_debugfs_entry = NULL;
186 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
187 if (!type->typ_kobj) {
194 rc = lu_device_type_init(ldt);
199 spin_lock(&obd_types_lock);
200 list_add(&type->typ_chain, &obd_types);
201 spin_unlock(&obd_types_lock);
207 kobject_put(type->typ_kobj);
208 kfree(type->typ_name);
209 kfree(type->typ_md_ops);
210 kfree(type->typ_dt_ops);
214 EXPORT_SYMBOL(class_register_type);
216 int class_unregister_type(const char *name)
218 struct obd_type *type = class_search_type(name);
221 CERROR("unknown obd type\n");
225 if (type->typ_refcnt) {
226 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
227 /* This is a bad situation, let's make the best of it */
228 /* Remove ops, but leave the name for debugging */
229 kfree(type->typ_dt_ops);
230 kfree(type->typ_md_ops);
235 kobject_put(type->typ_kobj);
237 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
238 ldebugfs_remove(&type->typ_debugfs_entry);
241 lu_device_type_fini(type->typ_lu);
243 spin_lock(&obd_types_lock);
244 list_del(&type->typ_chain);
245 spin_unlock(&obd_types_lock);
246 kfree(type->typ_name);
247 kfree(type->typ_dt_ops);
248 kfree(type->typ_md_ops);
251 } /* class_unregister_type */
252 EXPORT_SYMBOL(class_unregister_type);
255 * Create a new obd device.
257 * Find an empty slot in ::obd_devs[], create a new obd device in it.
259 * \param[in] type_name obd device type string.
260 * \param[in] name obd device name.
262 * \retval NULL if create fails, otherwise return the obd device
265 struct obd_device *class_newdev(const char *type_name, const char *name)
267 struct obd_device *result = NULL;
268 struct obd_device *newdev;
269 struct obd_type *type = NULL;
271 int new_obd_minor = 0;
273 if (strlen(name) >= MAX_OBD_NAME) {
274 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
275 return ERR_PTR(-EINVAL);
278 type = class_get_type(type_name);
280 CERROR("OBD: unknown type: %s\n", type_name);
281 return ERR_PTR(-ENODEV);
284 newdev = obd_device_alloc();
286 result = ERR_PTR(-ENOMEM);
290 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
292 write_lock(&obd_dev_lock);
293 for (i = 0; i < class_devno_max(); i++) {
294 struct obd_device *obd = class_num2obd(i);
296 if (obd && (strcmp(name, obd->obd_name) == 0)) {
297 CERROR("Device %s already exists at %d, won't add\n",
300 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
301 "%p obd_magic %08x != %08x\n", result,
302 result->obd_magic, OBD_DEVICE_MAGIC);
303 LASSERTF(result->obd_minor == new_obd_minor,
304 "%p obd_minor %d != %d\n", result,
305 result->obd_minor, new_obd_minor);
307 obd_devs[result->obd_minor] = NULL;
308 result->obd_name[0] = '\0';
310 result = ERR_PTR(-EEXIST);
313 if (!result && !obd) {
315 result->obd_minor = i;
317 result->obd_type = type;
318 strncpy(result->obd_name, name,
319 sizeof(result->obd_name) - 1);
320 obd_devs[i] = result;
323 write_unlock(&obd_dev_lock);
325 if (!result && i >= class_devno_max()) {
326 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
328 result = ERR_PTR(-EOVERFLOW);
335 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
336 result->obd_name, result);
340 obd_device_free(newdev);
342 class_put_type(type);
346 void class_release_dev(struct obd_device *obd)
348 struct obd_type *obd_type = obd->obd_type;
350 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
351 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
352 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
353 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
356 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
357 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
359 write_lock(&obd_dev_lock);
360 obd_devs[obd->obd_minor] = NULL;
361 write_unlock(&obd_dev_lock);
362 obd_device_free(obd);
364 class_put_type(obd_type);
367 int class_name2dev(const char *name)
374 read_lock(&obd_dev_lock);
375 for (i = 0; i < class_devno_max(); i++) {
376 struct obd_device *obd = class_num2obd(i);
378 if (obd && strcmp(name, obd->obd_name) == 0) {
379 /* Make sure we finished attaching before we give
382 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
383 if (obd->obd_attached) {
384 read_unlock(&obd_dev_lock);
390 read_unlock(&obd_dev_lock);
394 EXPORT_SYMBOL(class_name2dev);
396 struct obd_device *class_name2obd(const char *name)
398 int dev = class_name2dev(name);
400 if (dev < 0 || dev > class_devno_max())
402 return class_num2obd(dev);
404 EXPORT_SYMBOL(class_name2obd);
406 int class_uuid2dev(struct obd_uuid *uuid)
410 read_lock(&obd_dev_lock);
411 for (i = 0; i < class_devno_max(); i++) {
412 struct obd_device *obd = class_num2obd(i);
414 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
415 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
416 read_unlock(&obd_dev_lock);
420 read_unlock(&obd_dev_lock);
424 EXPORT_SYMBOL(class_uuid2dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
453 EXPORT_SYMBOL(class_num2obd);
455 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
456 * specified, then only the client with that uuid is returned,
457 * otherwise any client connected to the tgt is returned.
459 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
460 const char *typ_name,
461 struct obd_uuid *grp_uuid)
465 read_lock(&obd_dev_lock);
466 for (i = 0; i < class_devno_max(); i++) {
467 struct obd_device *obd = class_num2obd(i);
471 if ((strncmp(obd->obd_type->typ_name, typ_name,
472 strlen(typ_name)) == 0)) {
473 if (obd_uuid_equals(tgt_uuid,
474 &obd->u.cli.cl_target_uuid) &&
475 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
476 &obd->obd_uuid) : 1)) {
477 read_unlock(&obd_dev_lock);
482 read_unlock(&obd_dev_lock);
486 EXPORT_SYMBOL(class_find_client_obd);
488 /* Iterate the obd_device list looking devices have grp_uuid. Start
489 * searching at *next, and if a device is found, the next index to look
490 * at is saved in *next. If next is NULL, then the first matching device
491 * will always be returned.
493 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
499 else if (*next >= 0 && *next < class_devno_max())
504 read_lock(&obd_dev_lock);
505 for (; i < class_devno_max(); i++) {
506 struct obd_device *obd = class_num2obd(i);
510 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
513 read_unlock(&obd_dev_lock);
517 read_unlock(&obd_dev_lock);
521 EXPORT_SYMBOL(class_devices_in_group);
524 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
525 * adjust sptlrpc settings accordingly.
527 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
529 struct obd_device *obd;
533 LASSERT(namelen > 0);
535 read_lock(&obd_dev_lock);
536 for (i = 0; i < class_devno_max(); i++) {
537 obd = class_num2obd(i);
539 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
542 /* only notify mdc, osc, mdt, ost */
543 type = obd->obd_type->typ_name;
544 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
545 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
546 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
547 strcmp(type, LUSTRE_OST_NAME) != 0)
550 if (strncmp(obd->obd_name, fsname, namelen))
553 class_incref(obd, __func__, obd);
554 read_unlock(&obd_dev_lock);
555 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
556 sizeof(KEY_SPTLRPC_CONF),
557 KEY_SPTLRPC_CONF, 0, NULL, NULL);
559 class_decref(obd, __func__, obd);
560 read_lock(&obd_dev_lock);
562 read_unlock(&obd_dev_lock);
565 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
567 void obd_cleanup_caches(void)
569 kmem_cache_destroy(obd_device_cachep);
570 obd_device_cachep = NULL;
571 kmem_cache_destroy(obdo_cachep);
573 kmem_cache_destroy(import_cachep);
574 import_cachep = NULL;
577 int obd_init_caches(void)
579 LASSERT(!obd_device_cachep);
580 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
581 sizeof(struct obd_device),
583 if (!obd_device_cachep)
586 LASSERT(!obdo_cachep);
587 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
592 LASSERT(!import_cachep);
593 import_cachep = kmem_cache_create("ll_import_cache",
594 sizeof(struct obd_import),
601 obd_cleanup_caches();
605 /* map connection to client */
606 struct obd_export *class_conn2export(struct lustre_handle *conn)
608 struct obd_export *export;
611 CDEBUG(D_CACHE, "looking for null handle\n");
615 if (conn->cookie == -1) { /* this means assign a new connection */
616 CDEBUG(D_CACHE, "want a new connection\n");
620 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
621 export = class_handle2object(conn->cookie);
624 EXPORT_SYMBOL(class_conn2export);
626 struct obd_device *class_exp2obd(struct obd_export *exp)
632 EXPORT_SYMBOL(class_exp2obd);
634 struct obd_import *class_exp2cliimp(struct obd_export *exp)
636 struct obd_device *obd = exp->exp_obd;
640 return obd->u.cli.cl_import;
642 EXPORT_SYMBOL(class_exp2cliimp);
644 /* Export management functions */
645 static void class_export_destroy(struct obd_export *exp)
647 struct obd_device *obd = exp->exp_obd;
649 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
652 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
653 exp->exp_client_uuid.uuid, obd->obd_name);
655 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
656 if (exp->exp_connection)
657 ptlrpc_put_connection_superhack(exp->exp_connection);
659 LASSERT(list_empty(&exp->exp_outstanding_replies));
660 LASSERT(list_empty(&exp->exp_uncommitted_replies));
661 LASSERT(list_empty(&exp->exp_req_replay_queue));
662 LASSERT(list_empty(&exp->exp_hp_rpcs));
663 obd_destroy_export(exp);
664 class_decref(obd, "export", exp);
666 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
669 static void export_handle_addref(void *export)
671 class_export_get(export);
674 static struct portals_handle_ops export_handle_ops = {
675 .hop_addref = export_handle_addref,
679 struct obd_export *class_export_get(struct obd_export *exp)
681 atomic_inc(&exp->exp_refcount);
682 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
683 atomic_read(&exp->exp_refcount));
686 EXPORT_SYMBOL(class_export_get);
688 void class_export_put(struct obd_export *exp)
690 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
691 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
692 atomic_read(&exp->exp_refcount) - 1);
694 if (atomic_dec_and_test(&exp->exp_refcount)) {
695 LASSERT(!list_empty(&exp->exp_obd_chain));
696 CDEBUG(D_IOCTL, "final put %p/%s\n",
697 exp, exp->exp_client_uuid.uuid);
699 /* release nid stat refererence */
700 lprocfs_exp_cleanup(exp);
702 obd_zombie_export_add(exp);
705 EXPORT_SYMBOL(class_export_put);
707 /* Creates a new export, adds it to the hash table, and returns a
708 * pointer to it. The refcount is 2: one for the hash reference, and
709 * one for the pointer returned by this function.
711 struct obd_export *class_new_export(struct obd_device *obd,
712 struct obd_uuid *cluuid)
714 struct obd_export *export;
715 struct cfs_hash *hash = NULL;
718 export = kzalloc(sizeof(*export), GFP_NOFS);
720 return ERR_PTR(-ENOMEM);
722 export->exp_conn_cnt = 0;
723 export->exp_lock_hash = NULL;
724 export->exp_flock_hash = NULL;
725 atomic_set(&export->exp_refcount, 2);
726 atomic_set(&export->exp_rpc_count, 0);
727 atomic_set(&export->exp_cb_count, 0);
728 atomic_set(&export->exp_locks_count, 0);
729 #if LUSTRE_TRACKS_LOCK_EXP_REFS
730 INIT_LIST_HEAD(&export->exp_locks_list);
731 spin_lock_init(&export->exp_locks_list_guard);
733 atomic_set(&export->exp_replay_count, 0);
734 export->exp_obd = obd;
735 INIT_LIST_HEAD(&export->exp_outstanding_replies);
736 spin_lock_init(&export->exp_uncommitted_replies_lock);
737 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
738 INIT_LIST_HEAD(&export->exp_req_replay_queue);
739 INIT_LIST_HEAD(&export->exp_handle.h_link);
740 INIT_LIST_HEAD(&export->exp_hp_rpcs);
741 class_handle_hash(&export->exp_handle, &export_handle_ops);
742 spin_lock_init(&export->exp_lock);
743 spin_lock_init(&export->exp_rpc_lock);
744 INIT_HLIST_NODE(&export->exp_uuid_hash);
745 spin_lock_init(&export->exp_bl_list_lock);
746 INIT_LIST_HEAD(&export->exp_bl_list);
748 export->exp_sp_peer = LUSTRE_SP_ANY;
749 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
750 export->exp_client_uuid = *cluuid;
751 obd_init_export(export);
753 spin_lock(&obd->obd_dev_lock);
754 /* shouldn't happen, but might race */
755 if (obd->obd_stopping) {
760 hash = cfs_hash_getref(obd->obd_uuid_hash);
765 spin_unlock(&obd->obd_dev_lock);
767 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
768 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
770 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
771 obd->obd_name, cluuid->uuid, rc);
777 spin_lock(&obd->obd_dev_lock);
778 if (obd->obd_stopping) {
779 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
784 class_incref(obd, "export", export);
785 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
786 export->exp_obd->obd_num_exports++;
787 spin_unlock(&obd->obd_dev_lock);
788 cfs_hash_putref(hash);
792 spin_unlock(&obd->obd_dev_lock);
795 cfs_hash_putref(hash);
796 class_handle_unhash(&export->exp_handle);
797 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
798 obd_destroy_export(export);
802 EXPORT_SYMBOL(class_new_export);
804 void class_unlink_export(struct obd_export *exp)
806 class_handle_unhash(&exp->exp_handle);
808 spin_lock(&exp->exp_obd->obd_dev_lock);
809 /* delete an uuid-export hashitem from hashtables */
810 if (!hlist_unhashed(&exp->exp_uuid_hash))
811 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
812 &exp->exp_client_uuid,
813 &exp->exp_uuid_hash);
815 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
816 exp->exp_obd->obd_num_exports--;
817 spin_unlock(&exp->exp_obd->obd_dev_lock);
818 class_export_put(exp);
820 EXPORT_SYMBOL(class_unlink_export);
822 /* Import management functions */
823 static void class_import_destroy(struct obd_import *imp)
825 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
826 imp->imp_obd->obd_name);
828 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
830 ptlrpc_put_connection_superhack(imp->imp_connection);
832 while (!list_empty(&imp->imp_conn_list)) {
833 struct obd_import_conn *imp_conn;
835 imp_conn = list_entry(imp->imp_conn_list.next,
836 struct obd_import_conn, oic_item);
837 list_del_init(&imp_conn->oic_item);
838 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
842 LASSERT(!imp->imp_sec);
843 class_decref(imp->imp_obd, "import", imp);
844 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
847 static void import_handle_addref(void *import)
849 class_import_get(import);
852 static struct portals_handle_ops import_handle_ops = {
853 .hop_addref = import_handle_addref,
857 struct obd_import *class_import_get(struct obd_import *import)
859 atomic_inc(&import->imp_refcount);
860 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
861 atomic_read(&import->imp_refcount),
862 import->imp_obd->obd_name);
865 EXPORT_SYMBOL(class_import_get);
867 void class_import_put(struct obd_import *imp)
869 LASSERT(list_empty(&imp->imp_zombie_chain));
870 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
872 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
873 atomic_read(&imp->imp_refcount) - 1,
874 imp->imp_obd->obd_name);
876 if (atomic_dec_and_test(&imp->imp_refcount)) {
877 CDEBUG(D_INFO, "final put import %p\n", imp);
878 obd_zombie_import_add(imp);
881 /* catch possible import put race */
882 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
884 EXPORT_SYMBOL(class_import_put);
886 static void init_imp_at(struct imp_at *at)
890 at_init(&at->iat_net_latency, 0, 0);
891 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892 /* max service estimates are tracked on the server side, so
893 * don't use the AT history here, just use the last reported
894 * val. (But keep hist for proc histogram, worst_ever)
896 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
901 struct obd_import *class_new_import(struct obd_device *obd)
903 struct obd_import *imp;
905 imp = kzalloc(sizeof(*imp), GFP_NOFS);
909 INIT_LIST_HEAD(&imp->imp_pinger_chain);
910 INIT_LIST_HEAD(&imp->imp_zombie_chain);
911 INIT_LIST_HEAD(&imp->imp_replay_list);
912 INIT_LIST_HEAD(&imp->imp_sending_list);
913 INIT_LIST_HEAD(&imp->imp_delayed_list);
914 INIT_LIST_HEAD(&imp->imp_committed_list);
915 imp->imp_replay_cursor = &imp->imp_committed_list;
916 spin_lock_init(&imp->imp_lock);
917 imp->imp_last_success_conn = 0;
918 imp->imp_state = LUSTRE_IMP_NEW;
919 imp->imp_obd = class_incref(obd, "import", imp);
920 mutex_init(&imp->imp_sec_mutex);
921 init_waitqueue_head(&imp->imp_recovery_waitq);
923 atomic_set(&imp->imp_refcount, 2);
924 atomic_set(&imp->imp_unregistering, 0);
925 atomic_set(&imp->imp_inflight, 0);
926 atomic_set(&imp->imp_replay_inflight, 0);
927 atomic_set(&imp->imp_inval_count, 0);
928 INIT_LIST_HEAD(&imp->imp_conn_list);
929 INIT_LIST_HEAD(&imp->imp_handle.h_link);
930 class_handle_hash(&imp->imp_handle, &import_handle_ops);
931 init_imp_at(&imp->imp_at);
933 /* the default magic is V2, will be used in connect RPC, and
934 * then adjusted according to the flags in request/reply.
936 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
940 EXPORT_SYMBOL(class_new_import);
942 void class_destroy_import(struct obd_import *import)
945 LASSERT(import != LP_POISON);
947 class_handle_unhash(&import->imp_handle);
949 spin_lock(&import->imp_lock);
950 import->imp_generation++;
951 spin_unlock(&import->imp_lock);
952 class_import_put(import);
954 EXPORT_SYMBOL(class_destroy_import);
956 #if LUSTRE_TRACKS_LOCK_EXP_REFS
958 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
960 spin_lock(&exp->exp_locks_list_guard);
962 LASSERT(lock->l_exp_refs_nr >= 0);
964 if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
965 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
966 exp, lock, lock->l_exp_refs_target);
968 if ((lock->l_exp_refs_nr++) == 0) {
969 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
970 lock->l_exp_refs_target = exp;
972 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
973 lock, exp, lock->l_exp_refs_nr);
974 spin_unlock(&exp->exp_locks_list_guard);
976 EXPORT_SYMBOL(__class_export_add_lock_ref);
978 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
980 spin_lock(&exp->exp_locks_list_guard);
981 LASSERT(lock->l_exp_refs_nr > 0);
982 if (lock->l_exp_refs_target != exp) {
983 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
984 lock, lock->l_exp_refs_target, exp);
986 if (-- lock->l_exp_refs_nr == 0) {
987 list_del_init(&lock->l_exp_refs_link);
988 lock->l_exp_refs_target = NULL;
990 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
991 lock, exp, lock->l_exp_refs_nr);
992 spin_unlock(&exp->exp_locks_list_guard);
994 EXPORT_SYMBOL(__class_export_del_lock_ref);
997 /* A connection defines an export context in which preallocation can
998 * be managed. This releases the export pointer reference, and returns
999 * the export handle, so the export refcount is 1 when this function
1002 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1003 struct obd_uuid *cluuid)
1005 struct obd_export *export;
1011 export = class_new_export(obd, cluuid);
1013 return PTR_ERR(export);
1015 conn->cookie = export->exp_handle.h_cookie;
1016 class_export_put(export);
1018 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1019 cluuid->uuid, conn->cookie);
1022 EXPORT_SYMBOL(class_connect);
1024 /* This function removes 1-3 references from the export:
1025 * 1 - for export pointer passed
1026 * and if disconnect really need
1027 * 2 - removing from hash
1028 * 3 - in client_unlink_export
1029 * The export pointer passed to this function can destroyed
1031 int class_disconnect(struct obd_export *export)
1033 int already_disconnected;
1036 CWARN("attempting to free NULL export %p\n", export);
1040 spin_lock(&export->exp_lock);
1041 already_disconnected = export->exp_disconnected;
1042 export->exp_disconnected = 1;
1043 spin_unlock(&export->exp_lock);
1045 /* class_cleanup(), abort_recovery(), and class_fail_export()
1046 * all end up in here, and if any of them race we shouldn't
1047 * call extra class_export_puts().
1049 if (already_disconnected)
1052 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1053 export->exp_handle.h_cookie);
1055 class_unlink_export(export);
1057 class_export_put(export);
1060 EXPORT_SYMBOL(class_disconnect);
1062 void class_fail_export(struct obd_export *exp)
1064 int rc, already_failed;
1066 spin_lock(&exp->exp_lock);
1067 already_failed = exp->exp_failed;
1068 exp->exp_failed = 1;
1069 spin_unlock(&exp->exp_lock);
1071 if (already_failed) {
1072 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1073 exp, exp->exp_client_uuid.uuid);
1077 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1078 exp, exp->exp_client_uuid.uuid);
1080 if (obd_dump_on_timeout)
1081 libcfs_debug_dumplog();
1083 /* need for safe call CDEBUG after obd_disconnect */
1084 class_export_get(exp);
1086 /* Most callers into obd_disconnect are removing their own reference
1087 * (request, for example) in addition to the one from the hash table.
1088 * We don't have such a reference here, so make one.
1090 class_export_get(exp);
1091 rc = obd_disconnect(exp);
1093 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1095 CDEBUG(D_HA, "disconnected export %p/%s\n",
1096 exp, exp->exp_client_uuid.uuid);
1097 class_export_put(exp);
1099 EXPORT_SYMBOL(class_fail_export);
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1102 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1103 EXPORT_SYMBOL(class_export_dump_hook);
1106 /* Total amount of zombies to be destroyed */
1107 static int zombies_count;
1110 * kill zombie imports and exports
1112 static void obd_zombie_impexp_cull(void)
1114 struct obd_import *import;
1115 struct obd_export *export;
1118 spin_lock(&obd_zombie_impexp_lock);
1121 if (!list_empty(&obd_zombie_imports)) {
1122 import = list_entry(obd_zombie_imports.next,
1125 list_del_init(&import->imp_zombie_chain);
1129 if (!list_empty(&obd_zombie_exports)) {
1130 export = list_entry(obd_zombie_exports.next,
1133 list_del_init(&export->exp_obd_chain);
1136 spin_unlock(&obd_zombie_impexp_lock);
1139 class_import_destroy(import);
1140 spin_lock(&obd_zombie_impexp_lock);
1142 spin_unlock(&obd_zombie_impexp_lock);
1146 class_export_destroy(export);
1147 spin_lock(&obd_zombie_impexp_lock);
1149 spin_unlock(&obd_zombie_impexp_lock);
1153 } while (import || export);
1156 static struct completion obd_zombie_start;
1157 static struct completion obd_zombie_stop;
1158 static unsigned long obd_zombie_flags;
1159 static wait_queue_head_t obd_zombie_waitq;
1160 static pid_t obd_zombie_pid;
1163 OBD_ZOMBIE_STOP = 0x0001,
1167 * check for work for kill zombie import/export thread.
1169 static int obd_zombie_impexp_check(void *arg)
1173 spin_lock(&obd_zombie_impexp_lock);
1174 rc = (zombies_count == 0) &&
1175 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1176 spin_unlock(&obd_zombie_impexp_lock);
1182 * Add export to the obd_zombie thread and notify it.
1184 static void obd_zombie_export_add(struct obd_export *exp)
1186 spin_lock(&exp->exp_obd->obd_dev_lock);
1187 LASSERT(!list_empty(&exp->exp_obd_chain));
1188 list_del_init(&exp->exp_obd_chain);
1189 spin_unlock(&exp->exp_obd->obd_dev_lock);
1190 spin_lock(&obd_zombie_impexp_lock);
1192 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1193 spin_unlock(&obd_zombie_impexp_lock);
1195 obd_zombie_impexp_notify();
1199 * Add import to the obd_zombie thread and notify it.
1201 static void obd_zombie_import_add(struct obd_import *imp)
1203 LASSERT(!imp->imp_sec);
1204 spin_lock(&obd_zombie_impexp_lock);
1205 LASSERT(list_empty(&imp->imp_zombie_chain));
1207 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1208 spin_unlock(&obd_zombie_impexp_lock);
1210 obd_zombie_impexp_notify();
1214 * notify import/export destroy thread about new zombie.
1216 static void obd_zombie_impexp_notify(void)
1219 * Make sure obd_zombie_impexp_thread get this notification.
1220 * It is possible this signal only get by obd_zombie_barrier, and
1221 * barrier gulps this notification and sleeps away and hangs ensues
1223 wake_up_all(&obd_zombie_waitq);
1227 * check whether obd_zombie is idle
1229 static int obd_zombie_is_idle(void)
1233 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1234 spin_lock(&obd_zombie_impexp_lock);
1235 rc = (zombies_count == 0);
1236 spin_unlock(&obd_zombie_impexp_lock);
1241 * wait when obd_zombie import/export queues become empty
1243 void obd_zombie_barrier(void)
1245 struct l_wait_info lwi = { 0 };
1247 if (obd_zombie_pid == current_pid())
1248 /* don't wait for myself */
1250 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1252 EXPORT_SYMBOL(obd_zombie_barrier);
1255 * destroy zombie export/import thread.
1257 static int obd_zombie_impexp_thread(void *unused)
1259 unshare_fs_struct();
1260 complete(&obd_zombie_start);
1262 obd_zombie_pid = current_pid();
1264 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1265 struct l_wait_info lwi = { 0 };
1267 l_wait_event(obd_zombie_waitq,
1268 !obd_zombie_impexp_check(NULL), &lwi);
1269 obd_zombie_impexp_cull();
1272 * Notify obd_zombie_barrier callers that queues
1275 wake_up(&obd_zombie_waitq);
1278 complete(&obd_zombie_stop);
1284 * start destroy zombie import/export thread
1286 int obd_zombie_impexp_init(void)
1288 struct task_struct *task;
1290 INIT_LIST_HEAD(&obd_zombie_imports);
1291 INIT_LIST_HEAD(&obd_zombie_exports);
1292 spin_lock_init(&obd_zombie_impexp_lock);
1293 init_completion(&obd_zombie_start);
1294 init_completion(&obd_zombie_stop);
1295 init_waitqueue_head(&obd_zombie_waitq);
1298 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1300 return PTR_ERR(task);
1302 wait_for_completion(&obd_zombie_start);
1307 * stop destroy zombie import/export thread
1309 void obd_zombie_impexp_stop(void)
1311 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1312 obd_zombie_impexp_notify();
1313 wait_for_completion(&obd_zombie_stop);