ceph: add acl for cephfs
[cascardo/linux.git] / fs / ceph / inode.c
index 8549a48..a808bfb 100644 (file)
@@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = {
        .getxattr = ceph_getxattr,
        .listxattr = ceph_listxattr,
        .removexattr = ceph_removexattr,
+       .get_acl = ceph_get_acl,
 };
 
 
@@ -436,6 +437,16 @@ void ceph_destroy_inode(struct inode *inode)
        call_rcu(&inode->i_rcu, ceph_i_callback);
 }
 
+int ceph_drop_inode(struct inode *inode)
+{
+       /*
+        * Positve dentry and corresponding inode are always accompanied
+        * in MDS reply. So no need to keep inode in the cache after
+        * dropping all its aliases.
+        */
+       return 1;
+}
+
 /*
  * Helpers to fill in size, ctime, mtime, and atime.  We have to be
  * careful because either the client or MDS may have more up to date
@@ -577,6 +588,8 @@ static int fill_inode(struct inode *inode,
        int issued = 0, implemented;
        struct timespec mtime, atime, ctime;
        u32 nsplits;
+       struct ceph_inode_frag *frag;
+       struct rb_node *rb_node;
        struct ceph_buffer *xattr_blob = NULL;
        int err = 0;
        int queue_trunc = 0;
@@ -668,6 +681,7 @@ static int fill_inode(struct inode *inode,
                        memcpy(ci->i_xattrs.blob->vec.iov_base,
                               iinfo->xattr_data, iinfo->xattr_len);
                ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
+               ceph_forget_all_cached_acls(inode);
                xattr_blob = NULL;
        }
 
@@ -751,15 +765,38 @@ no_change:
        /* FIXME: move me up, if/when version reflects fragtree changes */
        nsplits = le32_to_cpu(info->fragtree.nsplits);
        mutex_lock(&ci->i_fragtree_mutex);
+       rb_node = rb_first(&ci->i_fragtree);
        for (i = 0; i < nsplits; i++) {
                u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-               struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
-
-               if (IS_ERR(frag))
-                       continue;
+               frag = NULL;
+               while (rb_node) {
+                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+                       if (ceph_frag_compare(frag->frag, id) >= 0) {
+                               if (frag->frag != id)
+                                       frag = NULL;
+                               else
+                                       rb_node = rb_next(rb_node);
+                               break;
+                       }
+                       rb_node = rb_next(rb_node);
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       kfree(frag);
+                       frag = NULL;
+               }
+               if (!frag) {
+                       frag = __get_or_create_frag(ci, id);
+                       if (IS_ERR(frag))
+                               continue;
+               }
                frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
                dout(" frag %x split by %d\n", frag->frag, frag->split_by);
        }
+       while (rb_node) {
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               rb_node = rb_next(rb_node);
+               rb_erase(&frag->node, &ci->i_fragtree);
+               kfree(frag);
+       }
        mutex_unlock(&ci->i_fragtree_mutex);
 
        /* were we issued a capability? */
@@ -953,7 +990,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct ceph_mds_reply_inode *ininfo;
        struct ceph_vino vino;
        struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
-       int i = 0;
        int err = 0;
 
        dout("fill_trace %p is_dentry %d is_target %d\n", req,
@@ -1014,6 +1050,29 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                }
        }
 
+       if (rinfo->head->is_target) {
+               vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
+               vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
+
+               in = ceph_get_inode(sb, vino);
+               if (IS_ERR(in)) {
+                       err = PTR_ERR(in);
+                       goto done;
+               }
+               req->r_target_inode = in;
+
+               err = fill_inode(in, &rinfo->targeti, NULL,
+                               session, req->r_request_started,
+                               (le32_to_cpu(rinfo->head->result) == 0) ?
+                               req->r_fmode : -1,
+                               &req->r_caps_reservation);
+               if (err < 0) {
+                       pr_err("fill_inode badness %p %llx.%llx\n",
+                               in, ceph_vinop(in));
+                       goto done;
+               }
+       }
+
        /*
         * ignore null lease/binding on snapdir ENOENT, or else we
         * will have trouble splicing in the virtual snapdir later
@@ -1083,7 +1142,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                             ceph_dentry(req->r_old_dentry)->offset);
 
                        dn = req->r_old_dentry;  /* use old_dentry */
-                       in = dn->d_inode;
                }
 
                /* null dentry? */
@@ -1105,44 +1163,28 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                }
 
                /* attach proper inode */
-               ininfo = rinfo->targeti.in;
-               vino.ino = le64_to_cpu(ininfo->ino);
-               vino.snap = le64_to_cpu(ininfo->snapid);
-               in = dn->d_inode;
-               if (!in) {
-                       in = ceph_get_inode(sb, vino);
-                       if (IS_ERR(in)) {
-                               pr_err("fill_trace bad get_inode "
-                                      "%llx.%llx\n", vino.ino, vino.snap);
-                               err = PTR_ERR(in);
-                               d_drop(dn);
-                               goto done;
-                       }
+               if (!dn->d_inode) {
+                       ihold(in);
                        dn = splice_dentry(dn, in, &have_lease, true);
                        if (IS_ERR(dn)) {
                                err = PTR_ERR(dn);
                                goto done;
                        }
                        req->r_dentry = dn;  /* may have spliced */
-                       ihold(in);
-               } else if (ceph_ino(in) == vino.ino &&
-                          ceph_snap(in) == vino.snap) {
-                       ihold(in);
-               } else {
+               } else if (dn->d_inode && dn->d_inode != in) {
                        dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
-                            dn, in, ceph_ino(in), ceph_snap(in),
-                            vino.ino, vino.snap);
+                            dn, dn->d_inode, ceph_vinop(dn->d_inode),
+                            ceph_vinop(in));
                        have_lease = false;
-                       in = NULL;
                }
 
                if (have_lease)
                        update_dentry_lease(dn, rinfo->dlease, session,
                                            req->r_request_started);
                dout(" final dn %p\n", dn);
-               i++;
-       } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
-                  req->r_op == CEPH_MDS_OP_MKSNAP) && !req->r_aborted) {
+       } else if (!req->r_aborted &&
+                  (req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
+                   req->r_op == CEPH_MDS_OP_MKSNAP)) {
                struct dentry *dn = req->r_dentry;
 
                /* fill out a snapdir LOOKUPSNAP dentry */
@@ -1152,52 +1194,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                ininfo = rinfo->targeti.in;
                vino.ino = le64_to_cpu(ininfo->ino);
                vino.snap = le64_to_cpu(ininfo->snapid);
-               in = ceph_get_inode(sb, vino);
-               if (IS_ERR(in)) {
-                       pr_err("fill_inode get_inode badness %llx.%llx\n",
-                              vino.ino, vino.snap);
-                       err = PTR_ERR(in);
-                       d_delete(dn);
-                       goto done;
-               }
                dout(" linking snapped dir %p to dn %p\n", in, dn);
+               ihold(in);
                dn = splice_dentry(dn, in, NULL, true);
                if (IS_ERR(dn)) {
                        err = PTR_ERR(dn);
                        goto done;
                }
                req->r_dentry = dn;  /* may have spliced */
-               ihold(in);
-               rinfo->head->is_dentry = 1;  /* fool notrace handlers */
-       }
-
-       if (rinfo->head->is_target) {
-               vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
-               vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
-
-               if (in == NULL || ceph_ino(in) != vino.ino ||
-                   ceph_snap(in) != vino.snap) {
-                       in = ceph_get_inode(sb, vino);
-                       if (IS_ERR(in)) {
-                               err = PTR_ERR(in);
-                               goto done;
-                       }
-               }
-               req->r_target_inode = in;
-
-               err = fill_inode(in,
-                                &rinfo->targeti, NULL,
-                                session, req->r_request_started,
-                                (le32_to_cpu(rinfo->head->result) == 0) ?
-                                req->r_fmode : -1,
-                                &req->r_caps_reservation);
-               if (err < 0) {
-                       pr_err("fill_inode badness %p %llx.%llx\n",
-                              in, ceph_vinop(in));
-                       goto done;
-               }
        }
-
 done:
        dout("fill_trace done err=%d\n", err);
        return err;
@@ -1247,11 +1252,23 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        struct qstr dname;
        struct dentry *dn;
        struct inode *in;
-       int err = 0, i;
+       int err = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       u64 frag = le32_to_cpu(rhead->args.readdir.frag);
        struct ceph_dentry_info *di;
+       u64 r_readdir_offset = req->r_readdir_offset;
+       u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+
+       if (rinfo->dir_dir &&
+           le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+               dout("readdir_prepopulate got new frag %x -> %x\n",
+                    frag, le32_to_cpu(rinfo->dir_dir->frag));
+               frag = le32_to_cpu(rinfo->dir_dir->frag);
+               if (ceph_frag_is_leftmost(frag))
+                       r_readdir_offset = 2;
+               else
+                       r_readdir_offset = 0;
+       }
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
@@ -1268,6 +1285,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
                        ceph_fill_dirfrag(parent->d_inode, rinfo->dir_dir);
        }
 
+       /* FIXME: release caps/leases if error occurs */
        for (i = 0; i < rinfo->dir_nr; i++) {
                struct ceph_vino vino;
 
@@ -1292,9 +1310,10 @@ retry_lookup:
                                err = -ENOMEM;
                                goto out;
                        }
-                       err = ceph_init_dentry(dn);
-                       if (err < 0) {
+                       ret = ceph_init_dentry(dn);
+                       if (ret < 0) {
                                dput(dn);
+                               err = ret;
                                goto out;
                        }
                } else if (dn->d_inode &&
@@ -1314,9 +1333,6 @@ retry_lookup:
                        spin_unlock(&parent->d_lock);
                }
 
-               di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
-
                /* inode */
                if (dn->d_inode) {
                        in = dn->d_inode;
@@ -1329,26 +1345,39 @@ retry_lookup:
                                err = PTR_ERR(in);
                                goto out;
                        }
-                       dn = splice_dentry(dn, in, NULL, false);
-                       if (IS_ERR(dn))
-                               dn = NULL;
                }
 
                if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
                               req->r_request_started, -1,
                               &req->r_caps_reservation) < 0) {
                        pr_err("fill_inode badness on %p\n", in);
+                       if (!dn->d_inode)
+                               iput(in);
+                       d_drop(dn);
                        goto next_item;
                }
-               if (dn)
-                       update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                           req->r_session,
-                                           req->r_request_started);
+
+               if (!dn->d_inode) {
+                       dn = splice_dentry(dn, in, NULL, false);
+                       if (IS_ERR(dn)) {
+                               err = PTR_ERR(dn);
+                               dn = NULL;
+                               goto next_item;
+                       }
+               }
+
+               di = dn->d_fsdata;
+               di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
+
+               update_dentry_lease(dn, rinfo->dir_dlease[i],
+                                   req->r_session,
+                                   req->r_request_started);
 next_item:
                if (dn)
                        dput(dn);
        }
-       req->r_did_prepopulate = true;
+       if (err == 0)
+               req->r_did_prepopulate = true;
 
 out:
        if (snapdir) {
@@ -1585,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = {
        .getxattr = ceph_getxattr,
        .listxattr = ceph_listxattr,
        .removexattr = ceph_removexattr,
+       .get_acl = ceph_get_acl,
 };
 
 /*
@@ -1658,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
                        dirtied |= CEPH_CAP_AUTH_EXCL;
                } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
                           attr->ia_mode != inode->i_mode) {
+                       inode->i_mode = attr->ia_mode;
                        req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
                        mask |= CEPH_SETATTR_MODE;
                        release |= CEPH_CAP_AUTH_SHARED;
@@ -1773,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (inode_dirty_flags)
                __mark_inode_dirty(inode, inode_dirty_flags);
 
+       if (ia_valid & ATTR_MODE) {
+               err = ceph_acl_chmod(dentry, inode);
+               if (err)
+                       goto out_put;
+       }
+
        if (mask) {
                req->r_inode = inode;
                ihold(inode);
@@ -1792,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        return err;
 out:
        spin_unlock(&ci->i_ceph_lock);
+out_put:
        ceph_mdsc_put_request(req);
        return err;
 }