ceph: rados pool namespace support
authorYan, Zheng <zyan@redhat.com>
Mon, 7 Mar 2016 01:35:06 +0000 (09:35 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 28 Jul 2016 00:55:38 +0000 (02:55 +0200)
This patch adds codes that decode pool namespace information in
cap message and request reply. Pool namespace is saved in i_layout,
it will be passed to libceph when doing read/write.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/ioctl.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h
fs/ceph/xattr.c

index 3f8efd8..d5b6f95 100644 (file)
@@ -1730,7 +1730,8 @@ enum {
        POOL_WRITE      = 2,
 };
 
-static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
+                               s64 pool, struct ceph_string *pool_ns)
 {
        struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
        struct rb_node **p, *parent;
        struct ceph_pool_perm *perm;
        struct page **pages;
+       size_t pool_ns_len;
        int err = 0, err2 = 0, have = 0;
 
        down_read(&mdsc->pool_perm_rwsem);
@@ -1749,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
                else if (pool > perm->pool)
                        p = &(*p)->rb_right;
                else {
-                       have = perm->perm;
-                       break;
+                       int ret = ceph_compare_string(pool_ns,
+                                               perm->pool_ns,
+                                               perm->pool_ns_len);
+                       if (ret < 0)
+                               p = &(*p)->rb_left;
+                       else if (ret > 0)
+                               p = &(*p)->rb_right;
+                       else {
+                               have = perm->perm;
+                               break;
+                       }
                }
        }
        up_read(&mdsc->pool_perm_rwsem);
        if (*p)
                goto out;
 
-       dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
+       if (pool_ns)
+               dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
+                    pool, (int)pool_ns->len, pool_ns->str);
+       else
+               dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
 
        down_write(&mdsc->pool_perm_rwsem);
+       p = &mdsc->pool_perm_tree.rb_node;
        parent = NULL;
        while (*p) {
                parent = *p;
@@ -1769,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
                else if (pool > perm->pool)
                        p = &(*p)->rb_right;
                else {
-                       have = perm->perm;
-                       break;
+                       int ret = ceph_compare_string(pool_ns,
+                                               perm->pool_ns,
+                                               perm->pool_ns_len);
+                       if (ret < 0)
+                               p = &(*p)->rb_left;
+                       else if (ret > 0)
+                               p = &(*p)->rb_right;
+                       else {
+                               have = perm->perm;
+                               break;
+                       }
                }
        }
        if (*p) {
@@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
        rd_req->r_flags = CEPH_OSD_FLAG_READ;
        osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
        rd_req->r_base_oloc.pool = pool;
+       if (pool_ns)
+               rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
        ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
 
        err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
@@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
                goto out_unlock;
        }
 
-       perm = kmalloc(sizeof(*perm), GFP_NOFS);
+       pool_ns_len = pool_ns ? pool_ns->len : 0;
+       perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
        if (!perm) {
                err = -ENOMEM;
                goto out_unlock;
@@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
 
        perm->pool = pool;
        perm->perm = have;
+       perm->pool_ns_len = pool_ns_len;
+       if (pool_ns_len > 0)
+               memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
+       perm->pool_ns[pool_ns_len] = 0;
+
        rb_link_node(&perm->node, parent, p);
        rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
        err = 0;
@@ -1860,19 +1893,20 @@ out_unlock:
 out:
        if (!err)
                err = have;
-       dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
+       if (pool_ns)
+               dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
+                    pool, (int)pool_ns->len, pool_ns->str, err);
+       else
+               dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
        return err;
 }
 
 int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
 {
        s64 pool;
+       struct ceph_string *pool_ns;
        int ret, flags;
 
-       /* does not support pool namespace yet */
-       if (ci->i_pool_ns_len)
-               return -EIO;
-
        if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
                                NOPOOLPERM))
                return 0;
@@ -1896,7 +1930,9 @@ check:
                return 0;
        }
 
-       ret = __ceph_pool_perm_get(ci, pool);
+       pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
+       ret = __ceph_pool_perm_get(ci, pool, pool_ns);
+       ceph_put_string(pool_ns);
        if (ret < 0)
                return ret;
 
@@ -1907,8 +1943,9 @@ check:
                flags |= CEPH_I_POOL_WR;
 
        spin_lock(&ci->i_ceph_lock);
-       if (pool == ci->i_layout.pool_id) {
-               ci->i_ceph_flags = flags;
+       if (pool == ci->i_layout.pool_id &&
+           pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
+               ci->i_ceph_flags |= flags;
         } else {
                pool = ci->i_layout.pool_id;
                flags = ci->i_ceph_flags;
index f24722d..0a9406a 100644 (file)
@@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode)
  */
 static void handle_cap_grant(struct ceph_mds_client *mdsc,
                             struct inode *inode, struct ceph_mds_caps *grant,
-                            u64 inline_version,
-                            void *inline_data, int inline_len,
+                            struct ceph_string **pns, u64 inline_version,
+                            void *inline_data, u32 inline_len,
                             struct ceph_buffer *xattr_buf,
                             struct ceph_mds_session *session,
-                            struct ceph_cap *cap, int issued,
-                            u32 pool_ns_len)
+                            struct ceph_cap *cap, int issued)
        __releases(ci->i_ceph_lock)
        __releases(mdsc->snap_rwsem)
 {
@@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
        if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
                /* file layout may have changed */
                s64 old_pool = ci->i_layout.pool_id;
+               struct ceph_string *old_ns;
+
                ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
-               ci->i_pool_ns_len = pool_ns_len;
-               if (ci->i_layout.pool_id != old_pool)
+               old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+                                       lockdep_is_held(&ci->i_ceph_lock));
+               rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+
+               if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
                        ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
+               *pns = old_ns;
+
                /* size/truncate_seq? */
                queue_trunc = ceph_fill_file_size(inode, issued,
                                        le32_to_cpu(grant->truncate_seq),
@@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct ceph_cap *cap;
        struct ceph_mds_caps *h;
        struct ceph_mds_cap_peer *peer = NULL;
-       struct ceph_snap_realm *realm;
+       struct ceph_snap_realm *realm = NULL;
+       struct ceph_string *pool_ns = NULL;
        int mds = session->s_mds;
        int op, issued;
        u32 seq, mseq;
        struct ceph_vino vino;
-       u64 cap_id;
-       u64 size, max_size;
        u64 tid;
        u64 inline_version = 0;
        void *inline_data = NULL;
        u32  inline_len = 0;
        void *snaptrace;
        size_t snaptrace_len;
-       u32 pool_ns_len = 0;
        void *p, *end;
 
        dout("handle_caps from mds%d\n", mds);
@@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
-       cap_id = le64_to_cpu(h->cap_id);
        seq = le32_to_cpu(h->seq);
        mseq = le32_to_cpu(h->migrate_seq);
-       size = le64_to_cpu(h->size);
-       max_size = le64_to_cpu(h->max_size);
 
        snaptrace = h + 1;
        snaptrace_len = le32_to_cpu(h->snap_trace_len);
@@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                u64 flush_tid;
                u32 caller_uid, caller_gid;
                u32 osd_epoch_barrier;
+               u32 pool_ns_len;
                /* version >= 5 */
                ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
                /* version >= 6 */
@@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                ceph_decode_32_safe(&p, end, caller_gid, bad);
                /* version >= 8 */
                ceph_decode_32_safe(&p, end, pool_ns_len, bad);
+               if (pool_ns_len > 0) {
+                       ceph_decode_need(&p, end, pool_ns_len, bad);
+                       pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+                       p += pool_ns_len;
+               }
        }
 
        /* lookup ino */
@@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                        cap = ceph_get_cap(mdsc, NULL);
                        cap->cap_ino = vino.ino;
                        cap->queue_release = 1;
-                       cap->cap_id = cap_id;
+                       cap->cap_id = le64_to_cpu(h->cap_id);
                        cap->mseq = mseq;
                        cap->seq = seq;
                        spin_lock(&session->s_cap_lock);
@@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                }
                handle_cap_import(mdsc, inode, h, peer, session,
                                  &cap, &issued);
-               handle_cap_grant(mdsc, inode, h,
+               handle_cap_grant(mdsc, inode, h, &pool_ns,
                                 inline_version, inline_data, inline_len,
-                                msg->middle, session, cap, issued,
-                                pool_ns_len);
+                                msg->middle, session, cap, issued);
                if (realm)
                        ceph_put_snap_realm(mdsc, realm);
                goto done_unlocked;
@@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        case CEPH_CAP_OP_GRANT:
                __ceph_caps_issued(ci, &issued);
                issued |= __ceph_caps_dirty(ci);
-               handle_cap_grant(mdsc, inode, h,
+               handle_cap_grant(mdsc, inode, h, &pool_ns,
                                 inline_version, inline_data, inline_len,
-                                msg->middle, session, cap, issued,
-                                pool_ns_len);
+                                msg->middle, session, cap, issued);
                goto done_unlocked;
 
        case CEPH_CAP_OP_FLUSH_ACK:
@@ -3616,6 +3621,7 @@ done:
        mutex_unlock(&session->s_mutex);
 done_unlocked:
        iput(inode);
+       ceph_put_string(pool_ns);
        return;
 
 bad:
index d035e0a..dc03256 100644 (file)
@@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 
        memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
        RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
-       ci->i_pool_ns_len = 0;
 
        ci->i_fragtree = RB_ROOT;
        mutex_init(&ci->i_fragtree_mutex);
@@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode)
        if (ci->i_xattrs.prealloc_blob)
                ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
-       ceph_put_string(ci->i_layout.pool_ns);
+       ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
 
        call_rcu(&inode->i_rcu, ceph_i_callback);
 }
@@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
        int issued = 0, implemented, new_issued;
        struct timespec mtime, atime, ctime;
        struct ceph_buffer *xattr_blob = NULL;
+       struct ceph_string *pool_ns = NULL;
        struct ceph_cap *new_cap = NULL;
        int err = 0;
        bool wake = false;
@@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
                               iinfo->xattr_len);
        }
 
+       if (iinfo->pool_ns_len > 0)
+               pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
+                                                    iinfo->pool_ns_len);
+
        spin_lock(&ci->i_ceph_lock);
 
        /*
@@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
        if (new_version ||
            (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
                s64 old_pool = ci->i_layout.pool_id;
+               struct ceph_string *old_ns;
+
                ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
-               ci->i_pool_ns_len = iinfo->pool_ns_len;
-               if (ci->i_layout.pool_id != old_pool)
+               old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+                                       lockdep_is_held(&ci->i_ceph_lock));
+               rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
+
+               if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
                        ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
+               pool_ns = old_ns;
+
                queue_trunc = ceph_fill_file_size(inode, issued,
                                        le32_to_cpu(info->truncate_seq),
                                        le64_to_cpu(info->truncate_size),
@@ -989,6 +1000,7 @@ out:
                ceph_put_cap(mdsc, new_cap);
        if (xattr_blob)
                ceph_buffer_put(xattr_blob);
+       ceph_put_string(pool_ns);
        return err;
 }
 
index 843dd31..6a30101 100644 (file)
@@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
                 ceph_ino(inode), dl.object_no);
 
        oloc.pool = ci->i_layout.pool_id;
+       oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
        ceph_oid_printf(&oid, "%s", dl.object_name);
 
        r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
+
+       ceph_oloc_destroy(&oloc);
        if (r < 0) {
                up_read(&osdc->lock);
                return r;
index 2103b82..46641bb 100644 (file)
@@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end,
        } else
                info->inline_version = CEPH_INLINE_NONE;
 
+       info->pool_ns_len = 0;
+       info->pool_ns_data = NULL;
        if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
                ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
-               ceph_decode_need(p, end, info->pool_ns_len, bad);
-               *p += info->pool_ns_len;
-       } else {
-               info->pool_ns_len = 0;
+               if (info->pool_ns_len > 0) {
+                       ceph_decode_need(p, end, info->pool_ns_len, bad);
+                       info->pool_ns_data = *p;
+                       *p += info->pool_ns_len;
+               }
        }
 
        return 0;
@@ -2292,14 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
                                  CEPH_CAP_PIN);
 
-       /* deny access to directories with pool_ns layouts */
-       if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
-           ceph_inode(req->r_inode)->i_pool_ns_len)
-               return -EIO;
-       if (req->r_locked_dir &&
-           ceph_inode(req->r_locked_dir)->i_pool_ns_len)
-               return -EIO;
-
        /* issue */
        mutex_lock(&mdsc->mutex);
        __register_request(mdsc, req, dir);
index 75ecf96..2ce8e9f 100644 (file)
@@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in {
        u32 inline_len;
        char *inline_data;
        u32 pool_ns_len;
+       char *pool_ns_data;
 };
 
 struct ceph_mds_reply_dir_entry {
@@ -277,6 +278,8 @@ struct ceph_pool_perm {
        struct rb_node node;
        int perm;
        s64 pool;
+       size_t pool_ns_len;
+       char pool_ns[];
 };
 
 /*
index 0168b49..7ceab18 100644 (file)
@@ -287,7 +287,6 @@ struct ceph_inode_info {
 
        struct ceph_dir_layout i_dir_layout;
        struct ceph_file_layout i_layout;
-       size_t i_pool_ns_len;
        char *i_symlink;
 
        /* for dirs */
index 5377c9c..adc2318 100644 (file)
@@ -57,56 +57,69 @@ struct ceph_vxattr {
 
 static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
 {
-       size_t s;
-       char *p = (char *)&ci->i_layout;
-
-       for (s = 0; s < sizeof(ci->i_layout); s++, p++)
-               if (*p)
-                       return true;
-       return false;
+       struct ceph_file_layout *fl = &ci->i_layout;
+       return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
+               fl->object_size > 0 || fl->pool_id >= 0 ||
+               rcu_dereference_raw(fl->pool_ns) != NULL);
 }
 
 static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
                                   size_t size)
 {
-       int ret;
        struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
        struct ceph_osd_client *osdc = &fsc->client->osdc;
+       struct ceph_string *pool_ns;
        s64 pool = ci->i_layout.pool_id;
        const char *pool_name;
+       const char *ns_field = " pool_namespace=";
        char buf[128];
+       size_t len, total_len = 0;
+       int ret;
+
+       pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
 
        dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
        down_read(&osdc->lock);
        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
        if (pool_name) {
-               size_t len = strlen(pool_name);
-               ret = snprintf(buf, sizeof(buf),
+               len = snprintf(buf, sizeof(buf),
                "stripe_unit=%u stripe_count=%u object_size=%u pool=",
                ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
                ci->i_layout.object_size);
-               if (!size) {
-                       ret += len;
-               } else if (ret + len > size) {
-                       ret = -ERANGE;
-               } else {
-                       memcpy(val, buf, ret);
-                       memcpy(val + ret, pool_name, len);
-                       ret += len;
-               }
+               total_len = len + strlen(pool_name);
        } else {
-               ret = snprintf(buf, sizeof(buf),
+               len = snprintf(buf, sizeof(buf),
                "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
                ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
                ci->i_layout.object_size, (unsigned long long)pool);
-               if (size) {
-                       if (ret <= size)
-                               memcpy(val, buf, ret);
-                       else
-                               ret = -ERANGE;
+               total_len = len;
+       }
+
+       if (pool_ns)
+               total_len += strlen(ns_field) + pool_ns->len;
+
+       if (!size) {
+               ret = total_len;
+       } else if (total_len > size) {
+               ret = -ERANGE;
+       } else {
+               memcpy(val, buf, len);
+               ret = len;
+               if (pool_name) {
+                       len = strlen(pool_name);
+                       memcpy(val + ret, pool_name, len);
+                       ret += len;
+               }
+               if (pool_ns) {
+                       len = strlen(ns_field);
+                       memcpy(val + ret, ns_field, len);
+                       ret += len;
+                       memcpy(val + ret, pool_ns->str, pool_ns->len);
+                       ret += pool_ns->len;
                }
        }
        up_read(&osdc->lock);
+       ceph_put_string(pool_ns);
        return ret;
 }
 
@@ -147,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
        return ret;
 }
 
+static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
+                                                 char *val, size_t size)
+{
+       int ret = 0;
+       struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
+       if (ns) {
+               ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
+               ceph_put_string(ns);
+       }
+       return ret;
+}
+
 /* directories */
 
 static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
@@ -235,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
        XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
        XATTR_LAYOUT_FIELD(dir, layout, object_size),
        XATTR_LAYOUT_FIELD(dir, layout, pool),
+       XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
        XATTR_NAME_CEPH(dir, entries),
        XATTR_NAME_CEPH(dir, files),
        XATTR_NAME_CEPH(dir, subdirs),
@@ -262,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
        XATTR_LAYOUT_FIELD(file, layout, stripe_count),
        XATTR_LAYOUT_FIELD(file, layout, object_size),
        XATTR_LAYOUT_FIELD(file, layout, pool),
+       XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
        { .name = NULL, 0 }     /* Required table terminator */
 };
 static size_t ceph_file_vxattrs_name_size;     /* total size of all names */