ceph: mount non-default filesystem by name
authorYan, Zheng <zyan@redhat.com>
Fri, 8 Jul 2016 03:25:38 +0000 (11:25 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 28 Jul 2016 01:00:40 +0000 (03:00 +0200)
To mount non-default filesytem, user currently needs to provide mds
namespace ID. This is inconvenience.

This patch makes user be able to mount filesystem by name. If user
wants to mount non-default filesystem. Client first subscribes to
fsmap.user. Subscribe to mdsmap.<ID> after getting ID of filesystem.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h

index 78a3495..e555745 100644 (file)
@@ -2166,6 +2166,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
        mds = __choose_mds(mdsc, req);
        if (mds < 0 ||
            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+               if (mdsc->mdsmap_err) {
+                       err = mdsc->mdsmap_err;
+                       dout("do_request mdsmap err %d\n", err);
+                       goto finish;
+               }
                dout("do_request no mds or not active, waiting for map\n");
                list_add(&req->r_wait, &mdsc->waiting_for_map);
                goto out;
@@ -3683,11 +3688,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
        dout("mdsc_destroy %p done\n", mdsc);
 }
 
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+       struct ceph_fs_client *fsc = mdsc->fsc;
+       const char *mds_namespace = fsc->mount_options->mds_namespace;
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
+       u32 epoch;
+       u32 map_len;
+       u32 num_fs;
+       u32 mount_fscid = (u32)-1;
+       u8 struct_v, struct_cv;
+       int err = -EINVAL;
+
+       ceph_decode_need(&p, end, sizeof(u32), bad);
+       epoch = ceph_decode_32(&p);
+
+       dout("handle_fsmap epoch %u\n", epoch);
+
+       ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+       struct_v = ceph_decode_8(&p);
+       struct_cv = ceph_decode_8(&p);
+       map_len = ceph_decode_32(&p);
+
+       ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+       p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+       num_fs = ceph_decode_32(&p);
+       while (num_fs-- > 0) {
+               void *info_p, *info_end;
+               u32 info_len;
+               u8 info_v, info_cv;
+               u32 fscid, namelen;
+
+               ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+               info_v = ceph_decode_8(&p);
+               info_cv = ceph_decode_8(&p);
+               info_len = ceph_decode_32(&p);
+               ceph_decode_need(&p, end, info_len, bad);
+               info_p = p;
+               info_end = p + info_len;
+               p = info_end;
+
+               ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+               fscid = ceph_decode_32(&info_p);
+               namelen = ceph_decode_32(&info_p);
+               ceph_decode_need(&info_p, info_end, namelen, bad);
+
+               if (mds_namespace &&
+                   strlen(mds_namespace) == namelen &&
+                   !strncmp(mds_namespace, (char *)info_p, namelen)) {
+                       mount_fscid = fscid;
+                       break;
+               }
+       }
+
+       ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+       if (mount_fscid != (u32)-1) {
+               fsc->client->monc.fs_cluster_id = mount_fscid;
+               ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+                                  0, true);
+               ceph_monc_renew_subs(&fsc->client->monc);
+       } else {
+               err = -ENOENT;
+               goto err_out;
+       }
+       return;
+bad:
+       pr_err("error decoding fsmap\n");
+err_out:
+       mutex_lock(&mdsc->mutex);
+       mdsc->mdsmap_err = -ENOENT;
+       __wake_requests(mdsc, &mdsc->waiting_for_map);
+       mutex_unlock(&mdsc->mutex);
+       return;
+}
 
 /*
  * handle mds map update.
  */
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        u32 epoch;
        u32 maplen;
@@ -3794,7 +3874,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 
        switch (type) {
        case CEPH_MSG_MDS_MAP:
-               ceph_mdsc_handle_map(mdsc, msg);
+               ceph_mdsc_handle_mdsmap(mdsc, msg);
+               break;
+       case CEPH_MSG_FS_MAP_USER:
+               ceph_mdsc_handle_fsmap(mdsc, msg);
                break;
        case CEPH_MSG_CLIENT_SESSION:
                handle_session(s, msg);
index 9dd2c82..3c154b8 100644 (file)
@@ -293,6 +293,7 @@ struct ceph_mds_client {
        struct completion       safe_umount_waiters;
        wait_queue_head_t       session_close_wq;
        struct list_head        waiting_for_map;
+       int                     mdsmap_err;
 
        struct ceph_mds_session **sessions;    /* NULL for mds if no session */
        atomic_t                num_sessions;
@@ -419,8 +420,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
                                     struct dentry *dentry, char action,
                                     u32 seq);
 
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
-                                struct ceph_msg *msg);
+extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
+                                   struct ceph_msg *msg);
+extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
+                                  struct ceph_msg *msg);
 
 extern struct ceph_mds_session *
 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
index a5b2275..7736a93 100644 (file)
@@ -108,7 +108,6 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
  * mount options
  */
 enum {
-       Opt_mds_namespace,
        Opt_wsize,
        Opt_rsize,
        Opt_rasize,
@@ -121,6 +120,7 @@ enum {
        Opt_last_int,
        /* int args above */
        Opt_snapdirname,
+       Opt_mds_namespace,
        Opt_last_string,
        /* string args above */
        Opt_dirstat,
@@ -144,7 +144,6 @@ enum {
 };
 
 static match_table_t fsopt_tokens = {
-       {Opt_mds_namespace, "mds_namespace=%d"},
        {Opt_wsize, "wsize=%d"},
        {Opt_rsize, "rsize=%d"},
        {Opt_rasize, "rasize=%d"},
@@ -156,6 +155,7 @@ static match_table_t fsopt_tokens = {
        {Opt_congestion_kb, "write_congestion_kb=%d"},
        /* int args above */
        {Opt_snapdirname, "snapdirname=%s"},
+       {Opt_mds_namespace, "mds_namespace=%s"},
        /* string args above */
        {Opt_dirstat, "dirstat"},
        {Opt_nodirstat, "nodirstat"},
@@ -212,11 +212,14 @@ static int parse_fsopt_token(char *c, void *private)
                if (!fsopt->snapdir_name)
                        return -ENOMEM;
                break;
-
-               /* misc */
        case Opt_mds_namespace:
-               fsopt->mds_namespace = intval;
+               fsopt->mds_namespace = kstrndup(argstr[0].from,
+                                               argstr[0].to-argstr[0].from,
+                                               GFP_KERNEL);
+               if (!fsopt->mds_namespace)
+                       return -ENOMEM;
                break;
+               /* misc */
        case Opt_wsize:
                fsopt->wsize = intval;
                break;
@@ -302,6 +305,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
 {
        dout("destroy_mount_options %p\n", args);
        kfree(args->snapdir_name);
+       kfree(args->mds_namespace);
        kfree(args->server_path);
        kfree(args);
 }
@@ -331,6 +335,9 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
                return ret;
 
        ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
+       if (ret)
+               return ret;
+       ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
        if (ret)
                return ret;
 
@@ -376,7 +383,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
        fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
        fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
        fsopt->congestion_kb = default_congestion_kb();
-       fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
 
        /*
         * Distinguish the server list from the path in "dev_name".
@@ -469,8 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
                seq_puts(m, ",noacl");
 #endif
 
-       if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
-               seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
+       if (fsopt->mds_namespace)
+               seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
        if (fsopt->wsize)
                seq_printf(m, ",wsize=%d", fsopt->wsize);
        if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
@@ -509,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
 
        switch (type) {
        case CEPH_MSG_MDS_MAP:
-               ceph_mdsc_handle_map(fsc->mdsc, msg);
+               ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
+               return 0;
+       case CEPH_MSG_FS_MAP_USER:
+               ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
                return 0;
-
        default:
                return -1;
        }
@@ -543,8 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
                goto fail;
        }
        fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-       fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
-       ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
+
+       if (fsopt->mds_namespace == NULL) {
+               ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+                                  0, true);
+       } else {
+               ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
+                                  0, false);
+       }
 
        fsc->mount_options = fsopt;
 
index 5aa3158..9e82e29 100644 (file)
@@ -62,7 +62,6 @@ struct ceph_mount_options {
        int cap_release_safety;
        int max_readdir;       /* max readdir result (entires) */
        int max_readdir_bytes; /* max readdir result (bytes) */
-       int mds_namespace;
 
        /*
         * everything above this point can be memcmp'd; everything below
@@ -70,6 +69,7 @@ struct ceph_mount_options {
         */
 
        char *snapdir_name;   /* default ".snap" */
+       char *mds_namespace;  /* default NULL */
        char *server_path;    /* default  "/" */
 };