Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Jun 2014 06:06:23 +0000 (23:06 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Jun 2014 06:06:23 +0000 (23:06 -0700)
Pull Ceph updates from Sage Weil:
 "This has a mix of bug fixes and cleanups.

  Alex's patch fixes a rare race in RBD.  Ilya's patches fix an ENOENT
  check when a second rbd image is mapped and a couple memory leaks.
  Zheng fixes several issues with fragmented directories and multiple
  MDSs.  Josh fixes a spin/sleep issue, and Josh and Guangliang's
  patches fix setting and unsetting RBD images read-only.

  Naturally there are several other cleanups mixed in for good measure"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  rbd: only set disk to read-only once
  rbd: move calls that may sleep out of spin lock range
  rbd: add ioctl for rbd
  ceph: use truncate_pagecache() instead of truncate_inode_pages()
  ceph: include time stamp in every MDS request
  rbd: fix ida/idr memory leak
  rbd: use reference counts for image requests
  rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
  rbd: make sure we have latest osdmap on 'rbd map'
  libceph: add ceph_monc_wait_osdmap()
  libceph: mon_get_version request infrastructure
  libceph: recognize poolop requests in debugfs
  ceph: refactor readpage_nounlock() to make the logic clearer
  mds: check cap ID when handling cap export message
  ceph: remember subtree root dirfrag's auth MDS
  ceph: introduce ceph_fill_fragtree()
  ceph: handle cap import atomically
  ceph: pre-allocate ceph_cap struct for ceph_add_cap()
  ceph: update inode fields according to issued caps
  rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
  ...

14 files changed:
drivers/block/rbd.c
fs/ceph/acl.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/export.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h
include/linux/ceph/ceph_fs.h
include/linux/ceph/mon_client.h
net/ceph/ceph_common.c
net/ceph/debugfs.c
net/ceph/mon_client.c

index 4c95b50..bbeb404 100644 (file)
@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
                return -ENOENT;
 
        (void) get_device(&rbd_dev->dev);
-       set_device_ro(bdev, rbd_dev->mapping.read_only);
 
        return 0;
 }
@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
        put_device(&rbd_dev->dev);
 }
 
+static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
+{
+       int ret = 0;
+       int val;
+       bool ro;
+       bool ro_changed = false;
+
+       /* get_user() may sleep, so call it before taking rbd_dev->lock */
+       if (get_user(val, (int __user *)(arg)))
+               return -EFAULT;
+
+       ro = val ? true : false;
+       /* Snapshot doesn't allow to write*/
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
+               return -EROFS;
+
+       spin_lock_irq(&rbd_dev->lock);
+       /* prevent others open this device */
+       if (rbd_dev->open_count > 1) {
+               ret = -EBUSY;
+               goto out;
+       }
+
+       if (rbd_dev->mapping.read_only != ro) {
+               rbd_dev->mapping.read_only = ro;
+               ro_changed = true;
+       }
+
+out:
+       spin_unlock_irq(&rbd_dev->lock);
+       /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
+       if (ret == 0 && ro_changed)
+               set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
+
+       return ret;
+}
+
+static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
+                       unsigned int cmd, unsigned long arg)
+{
+       struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+       int ret = 0;
+
+       switch (cmd) {
+       case BLKROSET:
+               ret = rbd_ioctl_set_ro(rbd_dev, arg);
+               break;
+       default:
+               ret = -ENOTTY;
+       }
+
+       return ret;
+}
+
+#ifdef CONFIG_COMPAT
+static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
+                               unsigned int cmd, unsigned long arg)
+{
+       return rbd_ioctl(bdev, mode, cmd, arg);
+}
+#endif /* CONFIG_COMPAT */
+
 static const struct block_device_operations rbd_bd_ops = {
        .owner                  = THIS_MODULE,
        .open                   = rbd_open,
        .release                = rbd_release,
+       .ioctl                  = rbd_ioctl,
+#ifdef CONFIG_COMPAT
+       .compat_ioctl           = rbd_compat_ioctl,
+#endif
 };
 
 /*
@@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+            atomic_read(&img_request->kref.refcount));
+       kref_get(&img_request->kref);
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request);
 static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
@@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
        img_request->next_completion = which;
 out:
        spin_unlock_irq(&img_request->completion_lock);
+       rbd_img_request_put(img_request);
 
        if (!more)
                rbd_img_request_complete(img_request);
@@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                        goto out_unwind;
                obj_request->osd_req = osd_req;
                obj_request->callback = rbd_img_obj_callback;
+               rbd_img_request_get(img_request);
 
                if (write_request) {
                        osd_req_op_alloc_hint_init(osd_req, which,
@@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 }
 
 /*
- * Request sync osd watch/unwatch.  The value of "start" determines
- * whether a watch request is being initiated or torn down.
+ * Initiate a watch request, synchronously.
  */
-static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        int ret;
 
-       rbd_assert(start ^ !!rbd_dev->watch_event);
-       rbd_assert(start ^ !!rbd_dev->watch_request);
+       rbd_assert(!rbd_dev->watch_event);
+       rbd_assert(!rbd_dev->watch_request);
 
-       if (start) {
-               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-                                               &rbd_dev->watch_event);
-               if (ret < 0)
-                       return ret;
-               rbd_assert(rbd_dev->watch_event != NULL);
-       }
+       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+                                    &rbd_dev->watch_event);
+       if (ret < 0)
+               return ret;
+
+       rbd_assert(rbd_dev->watch_event);
 
-       ret = -ENOMEM;
        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-                                                       OBJ_REQUEST_NODATA);
-       if (!obj_request)
+                                            OBJ_REQUEST_NODATA);
+       if (!obj_request) {
+               ret = -ENOMEM;
                goto out_cancel;
+       }
 
        obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
                                                  obj_request);
-       if (!obj_request->osd_req)
-               goto out_cancel;
+       if (!obj_request->osd_req) {
+               ret = -ENOMEM;
+               goto out_put;
+       }
 
-       if (start)
-               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-       else
-               ceph_osdc_unregister_linger_request(osdc,
-                                       rbd_dev->watch_request->osd_req);
+       ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
+                             rbd_dev->watch_event->cookie, 0, 1);
        rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
-               goto out_cancel;
+               goto out_linger;
+
        ret = rbd_obj_request_wait(obj_request);
        if (ret)
-               goto out_cancel;
+               goto out_linger;
+
        ret = obj_request->result;
        if (ret)
-               goto out_cancel;
+               goto out_linger;
 
        /*
         * A watch request is set to linger, so the underlying osd
@@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
         * it.  We'll drop that reference (below) after we've
         * unregistered it.
         */
-       if (start) {
-               rbd_dev->watch_request = obj_request;
+       rbd_dev->watch_request = obj_request;
 
-               return 0;
+       return 0;
+
+out_linger:
+       ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
+out_put:
+       rbd_obj_request_put(obj_request);
+out_cancel:
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
+
+       return ret;
+}
+
+/*
+ * Tear down a watch request, synchronously.
+ */
+static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
+       int ret;
+
+       rbd_assert(rbd_dev->watch_event);
+       rbd_assert(rbd_dev->watch_request);
+
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                            OBJ_REQUEST_NODATA);
+       if (!obj_request) {
+               ret = -ENOMEM;
+               goto out_cancel;
+       }
+
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
+                                                 obj_request);
+       if (!obj_request->osd_req) {
+               ret = -ENOMEM;
+               goto out_put;
        }
 
+       osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
+                             rbd_dev->watch_event->cookie, 0, 0);
+       rbd_osd_req_format_write(obj_request);
+
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out_put;
+
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out_put;
+
+       ret = obj_request->result;
+       if (ret)
+               goto out_put;
+
        /* We have successfully torn down the watch request */
 
+       ceph_osdc_unregister_linger_request(osdc,
+                                           rbd_dev->watch_request->osd_req);
        rbd_obj_request_put(rbd_dev->watch_request);
        rbd_dev->watch_request = NULL;
+
+out_put:
+       rbd_obj_request_put(obj_request);
 out_cancel:
-       /* Cancel the event if we're tearing down, or on error */
        ceph_osdc_cancel_event(rbd_dev->watch_event);
        rbd_dev->watch_event = NULL;
-       if (obj_request)
-               rbd_obj_request_put(obj_request);
 
        return ret;
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
-{
-       return __rbd_dev_header_watch_sync(rbd_dev, true);
-}
-
 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       ret = __rbd_dev_header_watch_sync(rbd_dev, false);
+       ret = __rbd_dev_header_unwatch_sync(rbd_dev);
        if (ret) {
                rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
                         ret);
@@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q)
                __releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       bool read_only = rbd_dev->mapping.read_only;
        struct request *rq;
        int result;
 
@@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q)
 
                if (write_request) {
                        result = -EROFS;
-                       if (read_only)
+                       if (rbd_dev->mapping.read_only)
                                goto end_request;
                        rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
                }
@@ -4682,6 +4802,38 @@ out_err:
        return ret;
 }
 
+/*
+ * Return pool id (>= 0) or a negative error code.
+ */
+static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
+{
+       u64 newest_epoch;
+       unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
+       int tries = 0;
+       int ret;
+
+again:
+       ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
+       if (ret == -ENOENT && tries++ < 1) {
+               ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
+                                              &newest_epoch);
+               if (ret < 0)
+                       return ret;
+
+               if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
+                       ceph_monc_request_next_osdmap(&rbdc->client->monc);
+                       (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
+                                                    newest_epoch, timeout);
+                       goto again;
+               } else {
+                       /* the osdmap we have is new enough */
+                       return -ENOENT;
+               }
+       }
+
+       return ret;
+}
+
 /*
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
@@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 
                image_id = ceph_extract_encoded_string(&p, p + ret,
                                                NULL, GFP_NOIO);
-               ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+               ret = PTR_ERR_OR_ZERO(image_id);
                if (!ret)
                        rbd_dev->image_format = 2;
        } else {
@@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
        ret = rbd_bus_add_dev(rbd_dev);
        if (ret)
@@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        struct rbd_options *rbd_opts = NULL;
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
-       struct ceph_osd_client *osdc;
        bool read_only;
        int rc = -ENOMEM;
 
@@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        }
 
        /* pick the pool */
-       osdc = &rbdc->client->osdc;
-       rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+       rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
        spec->pool_id = (u64)rc;
@@ -5387,6 +5538,7 @@ err_out_slab:
 
 static void __exit rbd_exit(void)
 {
+       ida_destroy(&rbd_dev_id_ida);
        rbd_sysfs_cleanup();
        if (single_major)
                unregister_blkdev(rbd_major, RBD_DRV_NAME);
index 21887d6..469f2e8 100644 (file)
@@ -104,12 +104,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
        umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
        struct dentry *dentry;
 
-       if (acl) {
-               ret = posix_acl_valid(acl);
-               if (ret < 0)
-                       goto out;
-       }
-
        switch (type) {
        case ACL_TYPE_ACCESS:
                name = POSIX_ACL_XATTR_ACCESS;
index 4f3f690..90b3954 100644 (file)
@@ -211,18 +211,15 @@ static int readpage_nounlock(struct file *filp, struct page *page)
                SetPageError(page);
                ceph_fscache_readpage_cancel(inode, page);
                goto out;
-       } else {
-               if (err < PAGE_CACHE_SIZE) {
-               /* zero fill remainder of page */
-                       zero_user_segment(page, err, PAGE_CACHE_SIZE);
-               } else {
-                       flush_dcache_page(page);
-               }
        }
-       SetPageUptodate(page);
+       if (err < PAGE_CACHE_SIZE)
+               /* zero fill remainder of page */
+               zero_user_segment(page, err, PAGE_CACHE_SIZE);
+       else
+               flush_dcache_page(page);
 
-       if (err >= 0)
-               ceph_readpage_to_fscache(inode, page);
+       SetPageUptodate(page);
+       ceph_readpage_to_fscache(inode, page);
 
 out:
        return err < 0 ? err : 0;
index c561b62..1fde164 100644 (file)
@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
        return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
-                               struct ceph_cap_reservation *ctx)
+struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+                             struct ceph_cap_reservation *ctx)
 {
        struct ceph_cap *cap = NULL;
 
@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
  * it is < 0.  (This is so we can atomically add the cap and add an
  * open file reference to it.)
  */
-int ceph_add_cap(struct inode *inode,
-                struct ceph_mds_session *session, u64 cap_id,
-                int fmode, unsigned issued, unsigned wanted,
-                unsigned seq, unsigned mseq, u64 realmino, int flags,
-                struct ceph_cap_reservation *caps_reservation)
+void ceph_add_cap(struct inode *inode,
+                 struct ceph_mds_session *session, u64 cap_id,
+                 int fmode, unsigned issued, unsigned wanted,
+                 unsigned seq, unsigned mseq, u64 realmino, int flags,
+                 struct ceph_cap **new_cap)
 {
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_cap *new_cap = NULL;
        struct ceph_cap *cap;
        int mds = session->s_mds;
        int actual_wanted;
@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
        if (fmode >= 0)
                wanted |= ceph_caps_for_mode(fmode);
 
-retry:
-       spin_lock(&ci->i_ceph_lock);
        cap = __get_cap_for_mds(ci, mds);
        if (!cap) {
-               if (new_cap) {
-                       cap = new_cap;
-                       new_cap = NULL;
-               } else {
-                       spin_unlock(&ci->i_ceph_lock);
-                       new_cap = get_cap(mdsc, caps_reservation);
-                       if (new_cap == NULL)
-                               return -ENOMEM;
-                       goto retry;
-               }
+               cap = *new_cap;
+               *new_cap = NULL;
 
                cap->issued = 0;
                cap->implemented = 0;
@@ -562,9 +551,6 @@ retry:
                session->s_nr_caps++;
                spin_unlock(&session->s_cap_lock);
        } else {
-               if (new_cap)
-                       ceph_put_cap(mdsc, new_cap);
-
                /*
                 * auth mds of the inode changed. we received the cap export
                 * message, but still haven't received the cap import message.
@@ -626,7 +612,6 @@ retry:
                        ci->i_auth_cap = cap;
                        cap->mds_wanted = wanted;
                }
-               ci->i_cap_exporting_issued = 0;
        } else {
                WARN_ON(ci->i_auth_cap == cap);
        }
@@ -648,9 +633,6 @@ retry:
 
        if (fmode >= 0)
                __ceph_get_fmode(ci, fmode);
-       spin_unlock(&ci->i_ceph_lock);
-       wake_up_all(&ci->i_cap_wq);
-       return 0;
 }
 
 /*
@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-       int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
+       int have = ci->i_snap_caps;
        struct ceph_cap *cap;
        struct rb_node *p;
 
@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
  */
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
-       return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
+       return !RB_EMPTY_ROOT(&ci->i_caps);
 }
 
 int ceph_is_any_caps(struct inode *inode)
@@ -2397,32 +2379,30 @@ static void invalidate_aliases(struct inode *inode)
  * actually be a revocation if it specifies a smaller cap set.)
  *
  * caller holds s_mutex and i_ceph_lock, we drop both.
- *
- * return value:
- *  0 - ok
- *  1 - check_caps on auth cap only (writeback)
- *  2 - check_caps (ack revoke)
  */
-static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+static void handle_cap_grant(struct ceph_mds_client *mdsc,
+                            struct inode *inode, struct ceph_mds_caps *grant,
+                            void *snaptrace, int snaptrace_len,
+                            struct ceph_buffer *xattr_buf,
                             struct ceph_mds_session *session,
-                            struct ceph_cap *cap,
-                            struct ceph_buffer *xattr_buf)
-               __releases(ci->i_ceph_lock)
+                            struct ceph_cap *cap, int issued)
+       __releases(ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
        int seq = le32_to_cpu(grant->seq);
        int newcaps = le32_to_cpu(grant->caps);
-       int issued, implemented, used, wanted, dirty;
+       int used, wanted, dirty;
        u64 size = le64_to_cpu(grant->size);
        u64 max_size = le64_to_cpu(grant->max_size);
        struct timespec mtime, atime, ctime;
        int check_caps = 0;
-       int wake = 0;
-       int writeback = 0;
-       int queue_invalidate = 0;
-       int deleted_inode = 0;
-       int queue_revalidate = 0;
+       bool wake = 0;
+       bool writeback = 0;
+       bool queue_trunc = 0;
+       bool queue_invalidate = 0;
+       bool queue_revalidate = 0;
+       bool deleted_inode = 0;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2466,16 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        }
 
        /* side effects now are allowed */
-
-       issued = __ceph_caps_issued(ci, &implemented);
-       issued |= implemented | __ceph_caps_dirty(ci);
-
        cap->cap_gen = session->s_cap_gen;
        cap->seq = seq;
 
        __check_cap_issue(ci, cap, newcaps);
 
-       if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+       if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+           (issued & CEPH_CAP_AUTH_EXCL) == 0) {
                inode->i_mode = le32_to_cpu(grant->mode);
                inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
                inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -2484,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                     from_kgid(&init_user_ns, inode->i_gid));
        }
 
-       if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+       if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+           (issued & CEPH_CAP_LINK_EXCL) == 0) {
                set_nlink(inode, le32_to_cpu(grant->nlink));
                if (inode->i_nlink == 0 &&
                    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
@@ -2511,30 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
                queue_revalidate = 1;
 
-       /* size/ctime/mtime/atime? */
-       ceph_fill_file_size(inode, issued,
-                           le32_to_cpu(grant->truncate_seq),
-                           le64_to_cpu(grant->truncate_size), size);
-       ceph_decode_timespec(&mtime, &grant->mtime);
-       ceph_decode_timespec(&atime, &grant->atime);
-       ceph_decode_timespec(&ctime, &grant->ctime);
-       ceph_fill_file_time(inode, issued,
-                           le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
-                           &atime);
-
-
-       /* file layout may have changed */
-       ci->i_layout = grant->layout;
-
-       /* max size increase? */
-       if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
-               dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
-               ci->i_max_size = max_size;
-               if (max_size >= ci->i_wanted_max_size) {
-                       ci->i_wanted_max_size = 0;  /* reset */
-                       ci->i_requested_max_size = 0;
+       if (newcaps & CEPH_CAP_ANY_RD) {
+               /* ctime/mtime/atime? */
+               ceph_decode_timespec(&mtime, &grant->mtime);
+               ceph_decode_timespec(&atime, &grant->atime);
+               ceph_decode_timespec(&ctime, &grant->ctime);
+               ceph_fill_file_time(inode, issued,
+                                   le32_to_cpu(grant->time_warp_seq),
+                                   &ctime, &mtime, &atime);
+       }
+
+       if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
+               /* file layout may have changed */
+               ci->i_layout = grant->layout;
+               /* size/truncate_seq? */
+               queue_trunc = ceph_fill_file_size(inode, issued,
+                                       le32_to_cpu(grant->truncate_seq),
+                                       le64_to_cpu(grant->truncate_size),
+                                       size);
+               /* max size increase? */
+               if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
+                       dout("max_size %lld -> %llu\n",
+                            ci->i_max_size, max_size);
+                       ci->i_max_size = max_size;
+                       if (max_size >= ci->i_wanted_max_size) {
+                               ci->i_wanted_max_size = 0;  /* reset */
+                               ci->i_requested_max_size = 0;
+                       }
+                       wake = 1;
                }
-               wake = 1;
        }
 
        /* check cap bits */
@@ -2595,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
 
        spin_unlock(&ci->i_ceph_lock);
 
+       if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+               down_write(&mdsc->snap_rwsem);
+               ceph_update_snap_trace(mdsc, snaptrace,
+                                      snaptrace + snaptrace_len, false);
+               downgrade_write(&mdsc->snap_rwsem);
+               kick_flushing_inode_caps(mdsc, session, inode);
+               up_read(&mdsc->snap_rwsem);
+               if (newcaps & ~issued)
+                       wake = 1;
+       }
+
+       if (queue_trunc) {
+               ceph_queue_vmtruncate(inode);
+               ceph_queue_revalidate(inode);
+       } else if (queue_revalidate)
+               ceph_queue_revalidate(inode);
+
        if (writeback)
                /*
                 * queue inode for writeback: we can't actually call
@@ -2606,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                ceph_queue_invalidate(inode);
        if (deleted_inode)
                invalidate_aliases(inode);
-       if (queue_revalidate)
-               ceph_queue_revalidate(inode);
        if (wake)
                wake_up_all(&ci->i_cap_wq);
 
@@ -2784,7 +2782,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
 {
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_mds_session *tsession = NULL;
-       struct ceph_cap *cap, *tcap;
+       struct ceph_cap *cap, *tcap, *new_cap = NULL;
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 t_cap_id;
        unsigned mseq = le32_to_cpu(ex->migrate_seq);
@@ -2807,7 +2805,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
 retry:
        spin_lock(&ci->i_ceph_lock);
        cap = __get_cap_for_mds(ci, mds);
-       if (!cap)
+       if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
                goto out_unlock;
 
        if (target < 0) {
@@ -2846,15 +2844,14 @@ retry:
                }
                __ceph_remove_cap(cap, false);
                goto out_unlock;
-       }
-
-       if (tsession) {
-               int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
-               spin_unlock(&ci->i_ceph_lock);
+       } else if (tsession) {
                /* add placeholder for the export tagert */
+               int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
                ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
-                            t_seq - 1, t_mseq, (u64)-1, flag, NULL);
-               goto retry;
+                            t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
+
+               __ceph_remove_cap(cap, false);
+               goto out_unlock;
        }
 
        spin_unlock(&ci->i_ceph_lock);
@@ -2873,6 +2870,7 @@ retry:
                                          SINGLE_DEPTH_NESTING);
                }
                ceph_add_cap_releases(mdsc, tsession);
+               new_cap = ceph_get_cap(mdsc, NULL);
        } else {
                WARN_ON(1);
                tsession = NULL;
@@ -2887,24 +2885,27 @@ out_unlock:
                mutex_unlock(&tsession->s_mutex);
                ceph_put_mds_session(tsession);
        }
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
 }
 
 /*
- * Handle cap IMPORT.  If there are temp bits from an older EXPORT,
- * clean them up.
+ * Handle cap IMPORT.
  *
- * caller holds s_mutex.
+ * caller holds s_mutex. acquires i_ceph_lock
  */
 static void handle_cap_import(struct ceph_mds_client *mdsc,
                              struct inode *inode, struct ceph_mds_caps *im,
                              struct ceph_mds_cap_peer *ph,
                              struct ceph_mds_session *session,
-                             void *snaptrace, int snaptrace_len)
+                             struct ceph_cap **target_cap, int *old_issued)
+       __acquires(ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_cap *cap;
+       struct ceph_cap *cap, *ocap, *new_cap = NULL;
        int mds = session->s_mds;
-       unsigned issued = le32_to_cpu(im->caps);
+       int issued;
+       unsigned caps = le32_to_cpu(im->caps);
        unsigned wanted = le32_to_cpu(im->wanted);
        unsigned seq = le32_to_cpu(im->seq);
        unsigned mseq = le32_to_cpu(im->migrate_seq);
@@ -2924,40 +2925,52 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
             inode, ci, mds, mseq, peer);
 
+retry:
        spin_lock(&ci->i_ceph_lock);
-       cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
-       if (cap && cap->cap_id == p_cap_id) {
+       cap = __get_cap_for_mds(ci, mds);
+       if (!cap) {
+               if (!new_cap) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       new_cap = ceph_get_cap(mdsc, NULL);
+                       goto retry;
+               }
+               cap = new_cap;
+       } else {
+               if (new_cap) {
+                       ceph_put_cap(mdsc, new_cap);
+                       new_cap = NULL;
+               }
+       }
+
+       __ceph_caps_issued(ci, &issued);
+       issued |= __ceph_caps_dirty(ci);
+
+       ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+                    realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
+
+       ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
+       if (ocap && ocap->cap_id == p_cap_id) {
                dout(" remove export cap %p mds%d flags %d\n",
-                    cap, peer, ph->flags);
+                    ocap, peer, ph->flags);
                if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
-                   (cap->seq != le32_to_cpu(ph->seq) ||
-                    cap->mseq != le32_to_cpu(ph->mseq))) {
+                   (ocap->seq != le32_to_cpu(ph->seq) ||
+                    ocap->mseq != le32_to_cpu(ph->mseq))) {
                        pr_err("handle_cap_import: mismatched seq/mseq: "
                               "ino (%llx.%llx) mds%d seq %d mseq %d "
                               "importer mds%d has peer seq %d mseq %d\n",
-                              ceph_vinop(inode), peer, cap->seq,
-                              cap->mseq, mds, le32_to_cpu(ph->seq),
+                              ceph_vinop(inode), peer, ocap->seq,
+                              ocap->mseq, mds, le32_to_cpu(ph->seq),
                               le32_to_cpu(ph->mseq));
                }
-               ci->i_cap_exporting_issued = cap->issued;
-               __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
+               __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
 
        /* make sure we re-request max_size, if necessary */
        ci->i_wanted_max_size = 0;
        ci->i_requested_max_size = 0;
-       spin_unlock(&ci->i_ceph_lock);
-
-       down_write(&mdsc->snap_rwsem);
-       ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
-                              false);
-       downgrade_write(&mdsc->snap_rwsem);
-       ceph_add_cap(inode, session, cap_id, -1,
-                    issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
-                    NULL /* no caps context */);
-       kick_flushing_inode_caps(mdsc, session, inode);
-       up_read(&mdsc->snap_rwsem);
 
+       *old_issued = issued;
+       *target_cap = cap;
 }
 
 /*
@@ -2977,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct ceph_mds_caps *h;
        struct ceph_mds_cap_peer *peer = NULL;
        int mds = session->s_mds;
-       int op;
+       int op, issued;
        u32 seq, mseq;
        struct ceph_vino vino;
        u64 cap_id;
@@ -3069,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, peer, session,
-                                 snaptrace, snaptrace_len);
+                                 &cap, &issued);
+               handle_cap_grant(mdsc, inode, h,  snaptrace, snaptrace_len,
+                                msg->middle, session, cap, issued);
+               goto done_unlocked;
        }
 
        /* the rest require a cap */
@@ -3086,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        switch (op) {
        case CEPH_CAP_OP_REVOKE:
        case CEPH_CAP_OP_GRANT:
-       case CEPH_CAP_OP_IMPORT:
-               handle_cap_grant(inode, h, session, cap, msg->middle);
+               __ceph_caps_issued(ci, &issued);
+               issued |= __ceph_caps_dirty(ci);
+               handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
+                                session, cap, issued);
                goto done_unlocked;
 
        case CEPH_CAP_OP_FLUSH_ACK:
index 00d6af6..8d7d782 100644 (file)
@@ -169,7 +169,7 @@ static struct dentry *__get_parent(struct super_block *sb,
        return dentry;
 }
 
-struct dentry *ceph_get_parent(struct dentry *child)
+static struct dentry *ceph_get_parent(struct dentry *child)
 {
        /* don't re-export snaps */
        if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
index e4fff9f..04c89c2 100644 (file)
@@ -10,6 +10,7 @@
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
 #include <linux/posix_acl.h>
+#include <linux/random.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
  * specified, copy the frag delegation info to the caller if
  * it is present.
  */
-u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
-                    struct ceph_inode_frag *pfrag,
-                    int *found)
+static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                             struct ceph_inode_frag *pfrag, int *found)
 {
        u32 t = ceph_frag_make(0, 0);
        struct ceph_inode_frag *frag;
@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
        if (found)
                *found = 0;
 
-       mutex_lock(&ci->i_fragtree_mutex);
        while (1) {
                WARN_ON(!ceph_frag_contains_value(t, v));
                frag = __ceph_find_frag(ci, t);
@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
        }
        dout("choose_frag(%x) = %x\n", v, t);
 
-       mutex_unlock(&ci->i_fragtree_mutex);
        return t;
 }
 
+u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                    struct ceph_inode_frag *pfrag, int *found)
+{
+       u32 ret;
+       mutex_lock(&ci->i_fragtree_mutex);
+       ret = __ceph_choose_frag(ci, v, pfrag, found);
+       mutex_unlock(&ci->i_fragtree_mutex);
+       return ret;
+}
+
 /*
  * Process dirfrag (delegation) info from the mds.  Include leaf
  * fragment in tree ONLY if ndist > 0.  Otherwise, only
@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,
        u32 id = le32_to_cpu(dirinfo->frag);
        int mds = le32_to_cpu(dirinfo->auth);
        int ndist = le32_to_cpu(dirinfo->ndist);
+       int diri_auth = -1;
        int i;
        int err = 0;
 
+       spin_lock(&ci->i_ceph_lock);
+       if (ci->i_auth_cap)
+               diri_auth = ci->i_auth_cap->mds;
+       spin_unlock(&ci->i_ceph_lock);
+
        mutex_lock(&ci->i_fragtree_mutex);
-       if (ndist == 0) {
+       if (ndist == 0 && mds == diri_auth) {
                /* no delegation info needed. */
                frag = __ceph_find_frag(ci, id);
                if (!frag)
@@ -286,6 +300,75 @@ out:
        return err;
 }
 
+static int ceph_fill_fragtree(struct inode *inode,
+                             struct ceph_frag_tree_head *fragtree,
+                             struct ceph_mds_reply_dirfrag *dirinfo)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_frag *frag;
+       struct rb_node *rb_node;
+       int i;
+       u32 id, nsplits;
+       bool update = false;
+
+       mutex_lock(&ci->i_fragtree_mutex);
+       nsplits = le32_to_cpu(fragtree->nsplits);
+       if (nsplits) {
+               i = prandom_u32() % nsplits;
+               id = le32_to_cpu(fragtree->splits[i].frag);
+               if (!__ceph_find_frag(ci, id))
+                       update = true;
+       } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
+               rb_node = rb_first(&ci->i_fragtree);
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
+                       update = true;
+       }
+       if (!update && dirinfo) {
+               id = le32_to_cpu(dirinfo->frag);
+               if (id != __ceph_choose_frag(ci, id, NULL, NULL))
+                       update = true;
+       }
+       if (!update)
+               goto out_unlock;
+
+       dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
+       rb_node = rb_first(&ci->i_fragtree);
+       for (i = 0; i < nsplits; i++) {
+               id = le32_to_cpu(fragtree->splits[i].frag);
+               frag = NULL;
+               while (rb_node) {
+                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+                       if (ceph_frag_compare(frag->frag, id) >= 0) {
+                               if (frag->frag != id)
+                                       frag = NULL;
+                               else
+                                       rb_node = rb_next(rb_node);
+                               break;
+                       }
+                       rb_node = rb_next(rb_node);
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       kfree(frag);
+                       frag = NULL;
+               }
+               if (!frag) {
+                       frag = __get_or_create_frag(ci, id);
+                       if (IS_ERR(frag))
+                               continue;
+               }
+               frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+               dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+       }
+       while (rb_node) {
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               rb_node = rb_next(rb_node);
+               rb_erase(&frag->node, &ci->i_fragtree);
+               kfree(frag);
+       }
+out_unlock:
+       mutex_unlock(&ci->i_fragtree_mutex);
+       return 0;
+}
 
 /*
  * initialize a newly allocated inode.
@@ -341,7 +424,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_cap_snaps);
        ci->i_head_snapc = NULL;
        ci->i_snap_caps = 0;
-       ci->i_cap_exporting_issued = 0;
 
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
@@ -407,7 +489,7 @@ void ceph_destroy_inode(struct inode *inode)
 
        /*
         * we may still have a snap_realm reference if there are stray
-        * caps in i_cap_exporting_issued or i_snap_caps.
+        * caps in i_snap_caps.
         */
        if (ci->i_snap_realm) {
                struct ceph_mds_client *mdsc =
@@ -582,22 +664,26 @@ static int fill_inode(struct inode *inode,
                      unsigned long ttl_from, int cap_fmode,
                      struct ceph_cap_reservation *caps_reservation)
 {
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct ceph_mds_reply_inode *info = iinfo->in;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int i;
-       int issued = 0, implemented;
+       int issued = 0, implemented, new_issued;
        struct timespec mtime, atime, ctime;
-       u32 nsplits;
-       struct ceph_inode_frag *frag;
-       struct rb_node *rb_node;
        struct ceph_buffer *xattr_blob = NULL;
+       struct ceph_cap *new_cap = NULL;
        int err = 0;
-       int queue_trunc = 0;
+       bool wake = false;
+       bool queue_trunc = false;
+       bool new_version = false;
 
        dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
             inode, ceph_vinop(inode), le64_to_cpu(info->version),
             ci->i_version);
 
+       /* prealloc new cap struct */
+       if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+               new_cap = ceph_get_cap(mdsc, caps_reservation);
+
        /*
         * prealloc xattr data, if it looks like we'll need it.  only
         * if len > 4 (meaning there are actually xattrs; the first 4
@@ -623,19 +709,23 @@ static int fill_inode(struct inode *inode,
         *   3    2     skip
         *   3    3     update
         */
-       if (le64_to_cpu(info->version) > 0 &&
-           (ci->i_version & ~1) >= le64_to_cpu(info->version))
-               goto no_change;
-       
+       if (ci->i_version == 0 ||
+           ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+            le64_to_cpu(info->version) > (ci->i_version & ~1)))
+               new_version = true;
+
        issued = __ceph_caps_issued(ci, &implemented);
        issued |= implemented | __ceph_caps_dirty(ci);
+       new_issued = ~issued & le32_to_cpu(info->cap.caps);
 
        /* update inode */
        ci->i_version = le64_to_cpu(info->version);
        inode->i_version++;
        inode->i_rdev = le32_to_cpu(info->rdev);
+       inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
-       if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+       if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+           (issued & CEPH_CAP_AUTH_EXCL) == 0) {
                inode->i_mode = le32_to_cpu(info->mode);
                inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
                inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
@@ -644,23 +734,35 @@ static int fill_inode(struct inode *inode,
                     from_kgid(&init_user_ns, inode->i_gid));
        }
 
-       if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+       if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+           (issued & CEPH_CAP_LINK_EXCL) == 0)
                set_nlink(inode, le32_to_cpu(info->nlink));
 
-       /* be careful with mtime, atime, size */
-       ceph_decode_timespec(&atime, &info->atime);
-       ceph_decode_timespec(&mtime, &info->mtime);
-       ceph_decode_timespec(&ctime, &info->ctime);
-       queue_trunc = ceph_fill_file_size(inode, issued,
-                                         le32_to_cpu(info->truncate_seq),
-                                         le64_to_cpu(info->truncate_size),
-                                         le64_to_cpu(info->size));
-       ceph_fill_file_time(inode, issued,
-                           le32_to_cpu(info->time_warp_seq),
-                           &ctime, &mtime, &atime);
-
-       ci->i_layout = info->layout;
-       inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+       if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+               /* be careful with mtime, atime, size */
+               ceph_decode_timespec(&atime, &info->atime);
+               ceph_decode_timespec(&mtime, &info->mtime);
+               ceph_decode_timespec(&ctime, &info->ctime);
+               ceph_fill_file_time(inode, issued,
+                               le32_to_cpu(info->time_warp_seq),
+                               &ctime, &mtime, &atime);
+       }
+
+       if (new_version ||
+           (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+               ci->i_layout = info->layout;
+               queue_trunc = ceph_fill_file_size(inode, issued,
+                                       le32_to_cpu(info->truncate_seq),
+                                       le64_to_cpu(info->truncate_size),
+                                       le64_to_cpu(info->size));
+               /* only update max_size on auth cap */
+               if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+                   ci->i_max_size != le64_to_cpu(info->max_size)) {
+                       dout("max_size %lld -> %llu\n", ci->i_max_size,
+                                       le64_to_cpu(info->max_size));
+                       ci->i_max_size = le64_to_cpu(info->max_size);
+               }
+       }
 
        /* xattrs */
        /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
@@ -745,58 +847,6 @@ static int fill_inode(struct inode *inode,
                dout(" marking %p complete (empty)\n", inode);
                __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
        }
-no_change:
-       /* only update max_size on auth cap */
-       if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
-           ci->i_max_size != le64_to_cpu(info->max_size)) {
-               dout("max_size %lld -> %llu\n", ci->i_max_size,
-                    le64_to_cpu(info->max_size));
-               ci->i_max_size = le64_to_cpu(info->max_size);
-       }
-
-       spin_unlock(&ci->i_ceph_lock);
-
-       /* queue truncate if we saw i_size decrease */
-       if (queue_trunc)
-               ceph_queue_vmtruncate(inode);
-
-       /* populate frag tree */
-       /* FIXME: move me up, if/when version reflects fragtree changes */
-       nsplits = le32_to_cpu(info->fragtree.nsplits);
-       mutex_lock(&ci->i_fragtree_mutex);
-       rb_node = rb_first(&ci->i_fragtree);
-       for (i = 0; i < nsplits; i++) {
-               u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-               frag = NULL;
-               while (rb_node) {
-                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-                       if (ceph_frag_compare(frag->frag, id) >= 0) {
-                               if (frag->frag != id)
-                                       frag = NULL;
-                               else
-                                       rb_node = rb_next(rb_node);
-                               break;
-                       }
-                       rb_node = rb_next(rb_node);
-                       rb_erase(&frag->node, &ci->i_fragtree);
-                       kfree(frag);
-                       frag = NULL;
-               }
-               if (!frag) {
-                       frag = __get_or_create_frag(ci, id);
-                       if (IS_ERR(frag))
-                               continue;
-               }
-               frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
-               dout(" frag %x split by %d\n", frag->frag, frag->split_by);
-       }
-       while (rb_node) {
-               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-               rb_node = rb_next(rb_node);
-               rb_erase(&frag->node, &ci->i_fragtree);
-               kfree(frag);
-       }
-       mutex_unlock(&ci->i_fragtree_mutex);
 
        /* were we issued a capability? */
        if (info->cap.caps) {
@@ -809,30 +859,41 @@ no_change:
                                     le32_to_cpu(info->cap.seq),
                                     le32_to_cpu(info->cap.mseq),
                                     le64_to_cpu(info->cap.realm),
-                                    info->cap.flags,
-                                    caps_reservation);
+                                    info->cap.flags, &new_cap);
+                       wake = true;
                } else {
-                       spin_lock(&ci->i_ceph_lock);
                        dout(" %p got snap_caps %s\n", inode,
                             ceph_cap_string(le32_to_cpu(info->cap.caps)));
                        ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
                        if (cap_fmode >= 0)
                                __ceph_get_fmode(ci, cap_fmode);
-                       spin_unlock(&ci->i_ceph_lock);
                }
        } else if (cap_fmode >= 0) {
                pr_warn("mds issued no caps on %llx.%llx\n",
                           ceph_vinop(inode));
                __ceph_get_fmode(ci, cap_fmode);
        }
+       spin_unlock(&ci->i_ceph_lock);
+
+       if (wake)
+               wake_up_all(&ci->i_cap_wq);
+
+       /* queue truncate if we saw i_size decrease */
+       if (queue_trunc)
+               ceph_queue_vmtruncate(inode);
+
+       /* populate frag tree */
+       if (S_ISDIR(inode->i_mode))
+               ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
 
        /* update delegation info? */
        if (dirinfo)
                ceph_fill_dirfrag(inode, dirinfo);
 
        err = 0;
-
 out:
+       if (new_cap)
+               ceph_put_cap(mdsc, new_cap);
        if (xattr_blob)
                ceph_buffer_put(xattr_blob);
        return err;
@@ -1485,7 +1546,7 @@ static void ceph_invalidate_work(struct work_struct *work)
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&ci->i_ceph_lock);
 
-       truncate_inode_pages(inode->i_mapping, 0);
+       truncate_pagecache(inode, 0);
 
        spin_lock(&ci->i_ceph_lock);
        if (orig_gen == ci->i_rdcache_gen &&
@@ -1588,7 +1649,7 @@ retry:
             ci->i_truncate_pending, to);
        spin_unlock(&ci->i_ceph_lock);
 
-       truncate_inode_pages(inode->i_mapping, to);
+       truncate_pagecache(inode, to);
 
        spin_lock(&ci->i_ceph_lock);
        if (to == ci->i_truncate_size) {
index 9a33b98..92a2548 100644 (file)
@@ -1558,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
 
+       req->r_stamp = CURRENT_TIME;
+
        req->r_op = op;
        req->r_direct_mode = mode;
        return req;
@@ -1783,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        }
 
        len = sizeof(*head) +
-               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
+               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+               sizeof(struct timespec);
 
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
@@ -1800,6 +1803,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                goto out_free2;
        }
 
+       msg->hdr.version = 2;
        msg->hdr.tid = cpu_to_le64(req->r_tid);
 
        head = msg->front.iov_base;
@@ -1836,6 +1840,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
        head->num_releases = cpu_to_le16(releases);
 
+       /* time stamp */
+       ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+
        BUG_ON(p > end);
        msg->front.iov_len = p - msg->front.iov_base;
        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
index e90cfcc..e00737c 100644 (file)
@@ -194,6 +194,7 @@ struct ceph_mds_request {
        int r_fmode;        /* file mode, if expecting cap */
        kuid_t r_uid;
        kgid_t r_gid;
+       struct timespec r_stamp;
 
        /* for choosing which mds to send this request to */
        int r_direct_mode;
index ead05cc..12b2074 100644 (file)
@@ -292,7 +292,6 @@ struct ceph_inode_info {
        struct ceph_snap_context *i_head_snapc;  /* set if wr_buffer_head > 0 or
                                                    dirty|flushing caps */
        unsigned i_snap_caps;           /* cap bits for snapped files */
-       unsigned i_cap_exporting_issued;
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
@@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
 extern const char *ceph_cap_string(int c);
 extern void ceph_handle_caps(struct ceph_mds_session *session,
                             struct ceph_msg *msg);
-extern int ceph_add_cap(struct inode *inode,
-                       struct ceph_mds_session *session, u64 cap_id,
-                       int fmode, unsigned issued, unsigned wanted,
-                       unsigned cap, unsigned seq, u64 realmino, int flags,
-                       struct ceph_cap_reservation *caps_reservation);
+extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+                                    struct ceph_cap_reservation *ctx);
+extern void ceph_add_cap(struct inode *inode,
+                        struct ceph_mds_session *session, u64 cap_id,
+                        int fmode, unsigned issued, unsigned wanted,
+                        unsigned cap, unsigned seq, u64 realmino, int flags,
+                        struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);
index 5f6db18..3c97d5e 100644 (file)
@@ -625,6 +625,8 @@ int ceph_flags_to_mode(int flags);
                           CEPH_CAP_LINK_EXCL |         \
                           CEPH_CAP_XATTR_EXCL |        \
                           CEPH_CAP_FILE_EXCL)
+#define CEPH_CAP_ANY_FILE_RD (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE | \
+                             CEPH_CAP_FILE_SHARED)
 #define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |        \
                              CEPH_CAP_FILE_EXCL)
 #define CEPH_CAP_ANY_WR   (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
index a486f39..deb47e4 100644 (file)
@@ -40,9 +40,9 @@ struct ceph_mon_request {
 };
 
 /*
- * ceph_mon_generic_request is being used for the statfs and poolop requests
- * which are bening done a bit differently because we need to get data back
- * to the caller
+ * ceph_mon_generic_request is being used for the statfs, poolop and
+ * mon_get_version requests which are being done a bit differently
+ * because we need to get data back to the caller
  */
 struct ceph_mon_generic_request {
        struct kref kref;
@@ -104,10 +104,15 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
+extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+                                unsigned long timeout);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
                               struct ceph_statfs *buf);
 
+extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
+                                   const char *what, u64 *newest);
+
 extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
index 67d7721..1675021 100644 (file)
@@ -72,6 +72,8 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
        case CEPH_MSG_STATFS: return "statfs";
        case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
+       case CEPH_MSG_MON_GET_VERSION: return "mon_get_version";
+       case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";
        case CEPH_MSG_MDS_MAP: return "mds_map";
        case CEPH_MSG_CLIENT_SESSION: return "client_session";
        case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
index 10421a4..d1a62c6 100644 (file)
@@ -126,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p)
                req = rb_entry(rp, struct ceph_mon_generic_request, node);
                op = le16_to_cpu(req->request->hdr.type);
                if (op == CEPH_MSG_STATFS)
-                       seq_printf(s, "%lld statfs\n", req->tid);
+                       seq_printf(s, "%llu statfs\n", req->tid);
+               else if (op == CEPH_MSG_POOLOP)
+                       seq_printf(s, "%llu poolop\n", req->tid);
+               else if (op == CEPH_MSG_MON_GET_VERSION)
+                       seq_printf(s, "%llu mon_get_version", req->tid);
                else
-                       seq_printf(s, "%lld unknown\n", req->tid);
+                       seq_printf(s, "%llu unknown\n", req->tid);
        }
 
        mutex_unlock(&monc->mutex);
index 2ac9ef3..067d3af 100644 (file)
@@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
                __send_subscribe(monc);
        mutex_unlock(&monc->mutex);
 }
+EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
+
+int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+                         unsigned long timeout)
+{
+       unsigned long started = jiffies;
+       int ret;
+
+       mutex_lock(&monc->mutex);
+       while (monc->have_osdmap < epoch) {
+               mutex_unlock(&monc->mutex);
+
+               if (timeout != 0 && time_after_eq(jiffies, started + timeout))
+                       return -ETIMEDOUT;
+
+               ret = wait_event_interruptible_timeout(monc->client->auth_wq,
+                                        monc->have_osdmap >= epoch, timeout);
+               if (ret < 0)
+                       return ret;
+
+               mutex_lock(&monc->mutex);
+       }
+
+       mutex_unlock(&monc->mutex);
+       return 0;
+}
+EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
  *
@@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
        return m;
 }
 
-static int do_generic_request(struct ceph_mon_client *monc,
-                             struct ceph_mon_generic_request *req)
+static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
+                               struct ceph_mon_generic_request *req)
 {
        int err;
 
        /* register request */
-       mutex_lock(&monc->mutex);
-       req->tid = ++monc->last_tid;
+       req->tid = tid != 0 ? tid : ++monc->last_tid;
        req->request->hdr.tid = cpu_to_le64(req->tid);
        __insert_generic_request(monc, req);
        monc->num_generic_requests++;
@@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,
        mutex_lock(&monc->mutex);
        rb_erase(&req->node, &monc->generic_request_tree);
        monc->num_generic_requests--;
-       mutex_unlock(&monc->mutex);
 
        if (!err)
                err = req->result;
        return err;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+                             struct ceph_mon_generic_request *req)
+{
+       int err;
+
+       mutex_lock(&monc->mutex);
+       err = __do_generic_request(monc, 0, req);
+       mutex_unlock(&monc->mutex);
+
+       return err;
+}
+
 /*
  * statfs
  */
@@ -579,6 +616,96 @@ out:
 }
 EXPORT_SYMBOL(ceph_monc_do_statfs);
 
+static void handle_get_version_reply(struct ceph_mon_client *monc,
+                                    struct ceph_msg *msg)
+{
+       struct ceph_mon_generic_request *req;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front_alloc_len;
+       u64 handle;
+
+       dout("%s %p tid %llu\n", __func__, msg, tid);
+
+       ceph_decode_need(&p, end, 2*sizeof(u64), bad);
+       handle = ceph_decode_64(&p);
+       if (tid != 0 && tid != handle)
+               goto bad;
+
+       mutex_lock(&monc->mutex);
+       req = __lookup_generic_req(monc, handle);
+       if (req) {
+               *(u64 *)req->buf = ceph_decode_64(&p);
+               req->result = 0;
+               get_generic_request(req);
+       }
+       mutex_unlock(&monc->mutex);
+       if (req) {
+               complete_all(&req->completion);
+               put_generic_request(req);
+       }
+
+       return;
+bad:
+       pr_err("corrupt mon_get_version reply\n");
+       ceph_msg_dump(msg);
+}
+
+/*
+ * Send MMonGetVersion and wait for the reply.
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
+                            u64 *newest)
+{
+       struct ceph_mon_generic_request *req;
+       void *p, *end;
+       u64 tid;
+       int err;
+
+       req = kzalloc(sizeof(*req), GFP_NOFS);
+       if (!req)
+               return -ENOMEM;
+
+       kref_init(&req->kref);
+       req->buf = newest;
+       req->buf_len = sizeof(*newest);
+       init_completion(&req->completion);
+
+       req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
+                                   sizeof(u64) + sizeof(u32) + strlen(what),
+                                   GFP_NOFS, true);
+       if (!req->request) {
+               err = -ENOMEM;
+               goto out;
+       }
+
+       req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
+                                 GFP_NOFS, true);
+       if (!req->reply) {
+               err = -ENOMEM;
+               goto out;
+       }
+
+       p = req->request->front.iov_base;
+       end = p + req->request->front_alloc_len;
+
+       /* fill out request */
+       mutex_lock(&monc->mutex);
+       tid = ++monc->last_tid;
+       ceph_encode_64(&p, tid); /* handle */
+       ceph_encode_string(&p, end, what, strlen(what));
+
+       err = __do_generic_request(monc, tid, req);
+
+       mutex_unlock(&monc->mutex);
+out:
+       kref_put(&req->kref, release_generic_request);
+       return err;
+}
+EXPORT_SYMBOL(ceph_monc_do_get_version);
+
 /*
  * pool ops
  */
@@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                handle_statfs_reply(monc, msg);
                break;
 
+       case CEPH_MSG_MON_GET_VERSION_REPLY:
+               handle_get_version_reply(monc, msg);
+               break;
+
        case CEPH_MSG_POOLOP_REPLY:
                handle_poolop_reply(monc, msg);
                break;
@@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
        case CEPH_MSG_AUTH_REPLY:
                m = ceph_msg_get(monc->m_auth_reply);
                break;
+       case CEPH_MSG_MON_GET_VERSION_REPLY:
+               if (le64_to_cpu(hdr->tid) != 0)
+                       return get_generic_reply(con, hdr, skip);
+
+               /*
+                * Older OSDs don't set reply tid even if the orignal
+                * request had a non-zero tid.  Workaround this weirdness
+                * by falling through to the allocate case.
+                */
        case CEPH_MSG_MON_MAP:
        case CEPH_MSG_MDS_MAP:
        case CEPH_MSG_OSD_MAP: