Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[cascardo/linux.git] / fs / ceph / inode.c
index e669cfa..f059b59 100644 (file)
@@ -11,6 +11,7 @@
 #include <linux/xattr.h>
 #include <linux/posix_acl.h>
 #include <linux/random.h>
+#include <linux/sort.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -254,6 +255,9 @@ static int ceph_fill_dirfrag(struct inode *inode,
                diri_auth = ci->i_auth_cap->mds;
        spin_unlock(&ci->i_ceph_lock);
 
+       if (mds == -1) /* CDIR_AUTH_PARENT */
+               mds = diri_auth;
+
        mutex_lock(&ci->i_fragtree_mutex);
        if (ndist == 0 && mds == diri_auth) {
                /* no delegation info needed. */
@@ -300,20 +304,38 @@ out:
        return err;
 }
 
+static int frag_tree_split_cmp(const void *l, const void *r)
+{
+       struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
+       struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
+       return ceph_frag_compare(ls->frag, rs->frag);
+}
+
+static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
+{
+       if (!frag)
+               return f == ceph_frag_make(0, 0);
+       if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
+               return false;
+       return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
+}
+
 static int ceph_fill_fragtree(struct inode *inode,
                              struct ceph_frag_tree_head *fragtree,
                              struct ceph_mds_reply_dirfrag *dirinfo)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_inode_frag *frag;
+       struct ceph_inode_frag *frag, *prev_frag = NULL;
        struct rb_node *rb_node;
-       int i;
-       u32 id, nsplits;
+       unsigned i, split_by, nsplits;
+       u32 id;
        bool update = false;
 
        mutex_lock(&ci->i_fragtree_mutex);
        nsplits = le32_to_cpu(fragtree->nsplits);
-       if (nsplits) {
+       if (nsplits != ci->i_fragtree_nsplits) {
+               update = true;
+       } else if (nsplits) {
                i = prandom_u32() % nsplits;
                id = le32_to_cpu(fragtree->splits[i].frag);
                if (!__ceph_find_frag(ci, id))
@@ -332,10 +354,22 @@ static int ceph_fill_fragtree(struct inode *inode,
        if (!update)
                goto out_unlock;
 
+       if (nsplits > 1) {
+               sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
+                    frag_tree_split_cmp, NULL);
+       }
+
        dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
        rb_node = rb_first(&ci->i_fragtree);
        for (i = 0; i < nsplits; i++) {
                id = le32_to_cpu(fragtree->splits[i].frag);
+               split_by = le32_to_cpu(fragtree->splits[i].by);
+               if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
+                       pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
+                              "frag %x split by %d\n", ceph_vinop(inode),
+                              i, nsplits, id, split_by);
+                       continue;
+               }
                frag = NULL;
                while (rb_node) {
                        frag = rb_entry(rb_node, struct ceph_inode_frag, node);
@@ -347,8 +381,14 @@ static int ceph_fill_fragtree(struct inode *inode,
                                break;
                        }
                        rb_node = rb_next(rb_node);
-                       rb_erase(&frag->node, &ci->i_fragtree);
-                       kfree(frag);
+                       /* delete stale split/leaf node */
+                       if (frag->split_by > 0 ||
+                           !is_frag_child(frag->frag, prev_frag)) {
+                               rb_erase(&frag->node, &ci->i_fragtree);
+                               if (frag->split_by > 0)
+                                       ci->i_fragtree_nsplits--;
+                               kfree(frag);
+                       }
                        frag = NULL;
                }
                if (!frag) {
@@ -356,14 +396,23 @@ static int ceph_fill_fragtree(struct inode *inode,
                        if (IS_ERR(frag))
                                continue;
                }
-               frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+               if (frag->split_by == 0)
+                       ci->i_fragtree_nsplits++;
+               frag->split_by = split_by;
                dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+               prev_frag = frag;
        }
        while (rb_node) {
                frag = rb_entry(rb_node, struct ceph_inode_frag, node);
                rb_node = rb_next(rb_node);
-               rb_erase(&frag->node, &ci->i_fragtree);
-               kfree(frag);
+               /* delete stale split/leaf node */
+               if (frag->split_by > 0 ||
+                   !is_frag_child(frag->frag, prev_frag)) {
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       if (frag->split_by > 0)
+                               ci->i_fragtree_nsplits--;
+                       kfree(frag);
+               }
        }
 out_unlock:
        mutex_unlock(&ci->i_fragtree_mutex);
@@ -513,6 +562,7 @@ void ceph_destroy_inode(struct inode *inode)
                rb_erase(n, &ci->i_fragtree);
                kfree(frag);
        }
+       ci->i_fragtree_nsplits = 0;
 
        __ceph_destroy_xattrs(ci);
        if (ci->i_xattrs.blob)
@@ -533,6 +583,11 @@ int ceph_drop_inode(struct inode *inode)
        return 1;
 }
 
+static inline blkcnt_t calc_inode_blocks(u64 size)
+{
+       return (size + (1<<9) - 1) >> 9;
+}
+
 /*
  * Helpers to fill in size, ctime, mtime, and atime.  We have to be
  * careful because either the client or MDS may have more up to date
@@ -555,7 +610,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                        size = 0;
                }
                i_size_write(inode, size);
-               inode->i_blocks = (size + (1<<9) - 1) >> 9;
+               inode->i_blocks = calc_inode_blocks(size);
                ci->i_reported_size = size;
                if (truncate_seq != ci->i_truncate_seq) {
                        dout("truncate_seq %u -> %u\n",
@@ -814,9 +869,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 
                        spin_unlock(&ci->i_ceph_lock);
 
-                       err = -EINVAL;
-                       if (WARN_ON(symlen != i_size_read(inode)))
-                               goto out;
+                       if (symlen != i_size_read(inode)) {
+                               pr_err("fill_inode %llx.%llx BAD symlink "
+                                       "size %lld\n", ceph_vinop(inode),
+                                       i_size_read(inode));
+                               i_size_write(inode, symlen);
+                               inode->i_blocks = calc_inode_blocks(symlen);
+                       }
 
                        err = -ENOMEM;
                        sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
@@ -1309,12 +1368,13 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
        int i, err = 0;
 
        for (i = 0; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
                struct inode *in;
                int rc;
 
-               vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
-               vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
 
                in = ceph_get_inode(req->r_dentry->d_sb, vino);
                if (IS_ERR(in)) {
@@ -1322,14 +1382,14 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
                        dout("new_inode badness got %d\n", err);
                        continue;
                }
-               rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+               rc = fill_inode(in, NULL, &rde->inode, NULL, session,
                                req->r_request_started, -1,
                                &req->r_caps_reservation);
                if (rc < 0) {
                        pr_err("fill_inode badness on %p got %d\n", in, rc);
                        err = rc;
-                       continue;
                }
+               iput(in);
        }
 
        return err;
@@ -1387,6 +1447,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
                             struct ceph_mds_session *session)
 {
        struct dentry *parent = req->r_dentry;
+       struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
        struct qstr dname;
        struct dentry *dn;
@@ -1394,22 +1455,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        int err = 0, skipped = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       struct ceph_dentry_info *di;
        u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+       u32 last_hash = 0;
+       u32 fpos_offset;
        struct ceph_readdir_cache_control cache_ctl = {};
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
 
+       if (rinfo->hash_order && req->r_path2) {
+               last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                         req->r_path2, strlen(req->r_path2));
+               last_hash = ceph_frag_value(last_hash);
+       }
+
        if (rinfo->dir_dir &&
            le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                dout("readdir_prepopulate got new frag %x -> %x\n",
                     frag, le32_to_cpu(rinfo->dir_dir->frag));
                frag = le32_to_cpu(rinfo->dir_dir->frag);
-               if (ceph_frag_is_leftmost(frag))
+               if (!rinfo->hash_order)
                        req->r_readdir_offset = 2;
-               else
-                       req->r_readdir_offset = 0;
        }
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
@@ -1427,24 +1493,37 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
                /* note dir version at start of readdir so we can tell
                 * if any dentries get dropped */
-               struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
                req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
                req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
                req->r_readdir_cache_idx = 0;
        }
 
        cache_ctl.index = req->r_readdir_cache_idx;
+       fpos_offset = req->r_readdir_offset;
 
        /* FIXME: release caps/leases if error occurs */
        for (i = 0; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
 
-               dname.name = rinfo->dir_dname[i];
-               dname.len = rinfo->dir_dname_len[i];
+               dname.name = rde->name;
+               dname.len = rde->name_len;
                dname.hash = full_name_hash(dname.name, dname.len);
 
-               vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino);
-               vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid);
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
+
+               if (rinfo->hash_order) {
+                       u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                                rde->name, rde->name_len);
+                       hash = ceph_frag_value(hash);
+                       if (hash != last_hash)
+                               fpos_offset = 2;
+                       last_hash = hash;
+                       rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
+               } else {
+                       rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
+               }
 
 retry_lookup:
                dn = d_lookup(parent, &dname);
@@ -1490,7 +1569,7 @@ retry_lookup:
                        }
                }
 
-               ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+               ret = fill_inode(in, NULL, &rde->inode, NULL, session,
                                 req->r_request_started, -1,
                                 &req->r_caps_reservation);
                if (ret < 0) {
@@ -1523,11 +1602,9 @@ retry_lookup:
                        dn = realdn;
                }
 
-               di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+               ceph_dentry(dn)->offset = rde->offset;
 
-               update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                   req->r_session,
+               update_dentry_lease(dn, rde->lease, req->r_session,
                                    req->r_request_started);
 
                if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
@@ -1562,7 +1639,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
        spin_lock(&ci->i_ceph_lock);
        dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
        i_size_write(inode, size);
-       inode->i_blocks = (size + (1 << 9) - 1) >> 9;
+       inode->i_blocks = calc_inode_blocks(size);
 
        /* tell the MDS if we are approaching max_size */
        if ((size << 1) >= ci->i_max_size &&
@@ -1624,10 +1701,21 @@ static void ceph_invalidate_work(struct work_struct *work)
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
                                                  i_pg_inv_work);
        struct inode *inode = &ci->vfs_inode;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        u32 orig_gen;
        int check = 0;
 
        mutex_lock(&ci->i_truncate_mutex);
+
+       if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+               pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
+                                   inode, ceph_ino(inode));
+               mapping_set_error(inode->i_mapping, -EIO);
+               truncate_pagecache(inode, 0);
+               mutex_unlock(&ci->i_truncate_mutex);
+               goto out;
+       }
+
        spin_lock(&ci->i_ceph_lock);
        dout("invalidate_pages %p gen %d revoking %d\n", inode,
             ci->i_rdcache_gen, ci->i_rdcache_revoking);
@@ -1641,7 +1729,9 @@ static void ceph_invalidate_work(struct work_struct *work)
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&ci->i_ceph_lock);
 
-       truncate_pagecache(inode, 0);
+       if (invalidate_inode_pages2(inode->i_mapping) < 0) {
+               pr_err("invalidate_pages %p fails\n", inode);
+       }
 
        spin_lock(&ci->i_ceph_lock);
        if (orig_gen == ci->i_rdcache_gen &&
@@ -1920,8 +2010,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
                        i_size_write(inode, attr->ia_size);
-                       inode->i_blocks =
-                               (attr->ia_size + (1 << 9) - 1) >> 9;
+                       inode->i_blocks = calc_inode_blocks(attr->ia_size);
                        inode->i_ctime = attr->ia_ctime;
                        ci->i_reported_size = attr->ia_size;
                        dirtied |= CEPH_CAP_FILE_EXCL;