Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[cascardo/linux.git] / fs / ceph / inode.c
index edfade0..f059b59 100644 (file)
@@ -8,8 +8,10 @@
 #include <linux/kernel.h>
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
+#include <linux/xattr.h>
 #include <linux/posix_acl.h>
 #include <linux/random.h>
+#include <linux/sort.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -92,10 +94,10 @@ const struct inode_operations ceph_file_iops = {
        .permission = ceph_permission,
        .setattr = ceph_setattr,
        .getattr = ceph_getattr,
-       .setxattr = ceph_setxattr,
-       .getxattr = ceph_getxattr,
+       .setxattr = generic_setxattr,
+       .getxattr = generic_getxattr,
        .listxattr = ceph_listxattr,
-       .removexattr = ceph_removexattr,
+       .removexattr = generic_removexattr,
        .get_acl = ceph_get_acl,
        .set_acl = ceph_set_acl,
 };
@@ -253,6 +255,9 @@ static int ceph_fill_dirfrag(struct inode *inode,
                diri_auth = ci->i_auth_cap->mds;
        spin_unlock(&ci->i_ceph_lock);
 
+       if (mds == -1) /* CDIR_AUTH_PARENT */
+               mds = diri_auth;
+
        mutex_lock(&ci->i_fragtree_mutex);
        if (ndist == 0 && mds == diri_auth) {
                /* no delegation info needed. */
@@ -299,20 +304,38 @@ out:
        return err;
 }
 
+static int frag_tree_split_cmp(const void *l, const void *r)
+{
+       struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
+       struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
+       return ceph_frag_compare(ls->frag, rs->frag);
+}
+
+static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
+{
+       if (!frag)
+               return f == ceph_frag_make(0, 0);
+       if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
+               return false;
+       return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
+}
+
 static int ceph_fill_fragtree(struct inode *inode,
                              struct ceph_frag_tree_head *fragtree,
                              struct ceph_mds_reply_dirfrag *dirinfo)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_inode_frag *frag;
+       struct ceph_inode_frag *frag, *prev_frag = NULL;
        struct rb_node *rb_node;
-       int i;
-       u32 id, nsplits;
+       unsigned i, split_by, nsplits;
+       u32 id;
        bool update = false;
 
        mutex_lock(&ci->i_fragtree_mutex);
        nsplits = le32_to_cpu(fragtree->nsplits);
-       if (nsplits) {
+       if (nsplits != ci->i_fragtree_nsplits) {
+               update = true;
+       } else if (nsplits) {
                i = prandom_u32() % nsplits;
                id = le32_to_cpu(fragtree->splits[i].frag);
                if (!__ceph_find_frag(ci, id))
@@ -331,10 +354,22 @@ static int ceph_fill_fragtree(struct inode *inode,
        if (!update)
                goto out_unlock;
 
+       if (nsplits > 1) {
+               sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
+                    frag_tree_split_cmp, NULL);
+       }
+
        dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
        rb_node = rb_first(&ci->i_fragtree);
        for (i = 0; i < nsplits; i++) {
                id = le32_to_cpu(fragtree->splits[i].frag);
+               split_by = le32_to_cpu(fragtree->splits[i].by);
+               if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
+                       pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
+                              "frag %x split by %d\n", ceph_vinop(inode),
+                              i, nsplits, id, split_by);
+                       continue;
+               }
                frag = NULL;
                while (rb_node) {
                        frag = rb_entry(rb_node, struct ceph_inode_frag, node);
@@ -346,8 +381,14 @@ static int ceph_fill_fragtree(struct inode *inode,
                                break;
                        }
                        rb_node = rb_next(rb_node);
-                       rb_erase(&frag->node, &ci->i_fragtree);
-                       kfree(frag);
+                       /* delete stale split/leaf node */
+                       if (frag->split_by > 0 ||
+                           !is_frag_child(frag->frag, prev_frag)) {
+                               rb_erase(&frag->node, &ci->i_fragtree);
+                               if (frag->split_by > 0)
+                                       ci->i_fragtree_nsplits--;
+                               kfree(frag);
+                       }
                        frag = NULL;
                }
                if (!frag) {
@@ -355,14 +396,23 @@ static int ceph_fill_fragtree(struct inode *inode,
                        if (IS_ERR(frag))
                                continue;
                }
-               frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+               if (frag->split_by == 0)
+                       ci->i_fragtree_nsplits++;
+               frag->split_by = split_by;
                dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+               prev_frag = frag;
        }
        while (rb_node) {
                frag = rb_entry(rb_node, struct ceph_inode_frag, node);
                rb_node = rb_next(rb_node);
-               rb_erase(&frag->node, &ci->i_fragtree);
-               kfree(frag);
+               /* delete stale split/leaf node */
+               if (frag->split_by > 0 ||
+                   !is_frag_child(frag->frag, prev_frag)) {
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       if (frag->split_by > 0)
+                               ci->i_fragtree_nsplits--;
+                       kfree(frag);
+               }
        }
 out_unlock:
        mutex_unlock(&ci->i_fragtree_mutex);
@@ -512,6 +562,7 @@ void ceph_destroy_inode(struct inode *inode)
                rb_erase(n, &ci->i_fragtree);
                kfree(frag);
        }
+       ci->i_fragtree_nsplits = 0;
 
        __ceph_destroy_xattrs(ci);
        if (ci->i_xattrs.blob)
@@ -532,6 +583,11 @@ int ceph_drop_inode(struct inode *inode)
        return 1;
 }
 
+static inline blkcnt_t calc_inode_blocks(u64 size)
+{
+       return (size + (1<<9) - 1) >> 9;
+}
+
 /*
  * Helpers to fill in size, ctime, mtime, and atime.  We have to be
  * careful because either the client or MDS may have more up to date
@@ -554,7 +610,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                        size = 0;
                }
                i_size_write(inode, size);
-               inode->i_blocks = (size + (1<<9) - 1) >> 9;
+               inode->i_blocks = calc_inode_blocks(size);
                ci->i_reported_size = size;
                if (truncate_seq != ci->i_truncate_seq) {
                        dout("truncate_seq %u -> %u\n",
@@ -813,9 +869,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 
                        spin_unlock(&ci->i_ceph_lock);
 
-                       err = -EINVAL;
-                       if (WARN_ON(symlen != i_size_read(inode)))
-                               goto out;
+                       if (symlen != i_size_read(inode)) {
+                               pr_err("fill_inode %llx.%llx BAD symlink "
+                                       "size %lld\n", ceph_vinop(inode),
+                                       i_size_read(inode));
+                               i_size_write(inode, symlen);
+                               inode->i_blocks = calc_inode_blocks(symlen);
+                       }
 
                        err = -ENOMEM;
                        sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
@@ -1308,12 +1368,13 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
        int i, err = 0;
 
        for (i = 0; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
                struct inode *in;
                int rc;
 
-               vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
-               vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
 
                in = ceph_get_inode(req->r_dentry->d_sb, vino);
                if (IS_ERR(in)) {
@@ -1321,14 +1382,14 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
                        dout("new_inode badness got %d\n", err);
                        continue;
                }
-               rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+               rc = fill_inode(in, NULL, &rde->inode, NULL, session,
                                req->r_request_started, -1,
                                &req->r_caps_reservation);
                if (rc < 0) {
                        pr_err("fill_inode badness on %p got %d\n", in, rc);
                        err = rc;
-                       continue;
                }
+               iput(in);
        }
 
        return err;
@@ -1386,6 +1447,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
                             struct ceph_mds_session *session)
 {
        struct dentry *parent = req->r_dentry;
+       struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
        struct qstr dname;
        struct dentry *dn;
@@ -1393,22 +1455,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        int err = 0, skipped = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       struct ceph_dentry_info *di;
        u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+       u32 last_hash = 0;
+       u32 fpos_offset;
        struct ceph_readdir_cache_control cache_ctl = {};
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
 
+       if (rinfo->hash_order && req->r_path2) {
+               last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                         req->r_path2, strlen(req->r_path2));
+               last_hash = ceph_frag_value(last_hash);
+       }
+
        if (rinfo->dir_dir &&
            le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                dout("readdir_prepopulate got new frag %x -> %x\n",
                     frag, le32_to_cpu(rinfo->dir_dir->frag));
                frag = le32_to_cpu(rinfo->dir_dir->frag);
-               if (ceph_frag_is_leftmost(frag))
+               if (!rinfo->hash_order)
                        req->r_readdir_offset = 2;
-               else
-                       req->r_readdir_offset = 0;
        }
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
@@ -1426,24 +1493,37 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
                /* note dir version at start of readdir so we can tell
                 * if any dentries get dropped */
-               struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
                req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
                req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
                req->r_readdir_cache_idx = 0;
        }
 
        cache_ctl.index = req->r_readdir_cache_idx;
+       fpos_offset = req->r_readdir_offset;
 
        /* FIXME: release caps/leases if error occurs */
        for (i = 0; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
 
-               dname.name = rinfo->dir_dname[i];
-               dname.len = rinfo->dir_dname_len[i];
+               dname.name = rde->name;
+               dname.len = rde->name_len;
                dname.hash = full_name_hash(dname.name, dname.len);
 
-               vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
-               vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
+
+               if (rinfo->hash_order) {
+                       u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                                rde->name, rde->name_len);
+                       hash = ceph_frag_value(hash);
+                       if (hash != last_hash)
+                               fpos_offset = 2;
+                       last_hash = hash;
+                       rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
+               } else {
+                       rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
+               }
 
 retry_lookup:
                dn = d_lookup(parent, &dname);
@@ -1489,7 +1569,7 @@ retry_lookup:
                        }
                }
 
-               ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+               ret = fill_inode(in, NULL, &rde->inode, NULL, session,
                                 req->r_request_started, -1,
                                 &req->r_caps_reservation);
                if (ret < 0) {
@@ -1522,11 +1602,9 @@ retry_lookup:
                        dn = realdn;
                }
 
-               di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+               ceph_dentry(dn)->offset = rde->offset;
 
-               update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                   req->r_session,
+               update_dentry_lease(dn, rde->lease, req->r_session,
                                    req->r_request_started);
 
                if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
@@ -1561,7 +1639,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
        spin_lock(&ci->i_ceph_lock);
        dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
        i_size_write(inode, size);
-       inode->i_blocks = (size + (1 << 9) - 1) >> 9;
+       inode->i_blocks = calc_inode_blocks(size);
 
        /* tell the MDS if we are approaching max_size */
        if ((size << 1) >= ci->i_max_size &&
@@ -1623,10 +1701,21 @@ static void ceph_invalidate_work(struct work_struct *work)
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_pg_inv_work);
        struct inode *inode = &ci->vfs_inode;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        u32 orig_gen;
        int check = 0;
 
        mutex_lock(&ci->i_truncate_mutex);
+
+       if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+               pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
+                                   inode, ceph_ino(inode));
+               mapping_set_error(inode->i_mapping, -EIO);
+               truncate_pagecache(inode, 0);
+               mutex_unlock(&ci->i_truncate_mutex);
+               goto out;
+       }
+
        spin_lock(&ci->i_ceph_lock);
        dout("invalidate_pages %p gen %d revoking %d\n", inode,
             ci->i_rdcache_gen, ci->i_rdcache_revoking);
@@ -1640,7 +1729,9 @@ static void ceph_invalidate_work(struct work_struct *work)
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&ci->i_ceph_lock);
 
-       truncate_pagecache(inode, 0);
+       if (invalidate_inode_pages2(inode->i_mapping) < 0) {
+               pr_err("invalidate_pages %p fails\n", inode);
+       }
 
        spin_lock(&ci->i_ceph_lock);
        if (orig_gen == ci->i_rdcache_gen &&
@@ -1770,22 +1861,18 @@ static const struct inode_operations ceph_symlink_iops = {
        .get_link = simple_get_link,
        .setattr = ceph_setattr,
        .getattr = ceph_getattr,
-       .setxattr = ceph_setxattr,
-       .getxattr = ceph_getxattr,
+       .setxattr = generic_setxattr,
+       .getxattr = generic_getxattr,
        .listxattr = ceph_listxattr,
-       .removexattr = ceph_removexattr,
+       .removexattr = generic_removexattr,
 };
 
-/*
- * setattr
- */
-int ceph_setattr(struct dentry *dentry, struct iattr *attr)
+int __ceph_setattr(struct inode *inode, struct iattr *attr)
 {
-       struct inode *inode = d_inode(dentry);
        struct ceph_inode_info *ci = ceph_inode(inode);
        const unsigned int ia_valid = attr->ia_valid;
        struct ceph_mds_request *req;
-       struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_cap_flush *prealloc_cf;
        int issued;
        int release = 0, dirtied = 0;
@@ -1923,8 +2010,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
                        i_size_write(inode, attr->ia_size);
-                       inode->i_blocks =
-                               (attr->ia_size + (1 << 9) - 1) >> 9;
+                       inode->i_blocks = calc_inode_blocks(attr->ia_size);
                        inode->i_ctime = attr->ia_ctime;
                        ci->i_reported_size = attr->ia_size;
                        dirtied |= CEPH_CAP_FILE_EXCL;
@@ -2009,6 +2095,14 @@ out_put:
        return err;
 }
 
+/*
+ * setattr
+ */
+int ceph_setattr(struct dentry *dentry, struct iattr *attr)
+{
+       return __ceph_setattr(d_inode(dentry), attr);
+}
+
 /*
  * Verify that we have a lease on the given mask.  If not,
  * do a getattr against an mds.