Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Mon, 10 Oct 2016 20:52:05 +0000 (13:52 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Mon, 10 Oct 2016 20:52:05 +0000 (13:52 -0700)
Pull Ceph updates from Ilya Dryomov:
 "The big ticket item here is support for rbd exclusive-lock feature,
  with maintenance operations offloaded to userspace (Douglas Fuller,
  Mike Christie and myself). Another block device bullet is a series
  fixing up layering error paths (myself).

  On the filesystem side, we've got patches that improve our handling of
  buffered vs dio write races (Neil Brown) and a few assorted fixes from
  Zheng. Also included a couple of random cleanups and a minor CRUSH
  update"

* tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client: (39 commits)
  crush: remove redundant local variable
  crush: don't normalize input of crush_ln iteratively
  libceph: ceph_build_auth() doesn't need ceph_auth_build_hello()
  libceph: use CEPH_AUTH_UNKNOWN in ceph_auth_build_hello()
  ceph: fix description for rsize and rasize mount options
  rbd: use kmalloc_array() in rbd_header_from_disk()
  ceph: use list_move instead of list_del/list_add
  ceph: handle CEPH_SESSION_REJECT message
  ceph: avoid accessing / when mounting a subpath
  ceph: fix mandatory flock check
  ceph: remove warning when ceph_releasepage() is called on dirty page
  ceph: ignore error from invalidate_inode_pages2_range() in direct write
  ceph: fix error handling of start_read()
  rbd: add rbd_obj_request_error() helper
  rbd: img_data requests don't own their page array
  rbd: don't call rbd_osd_req_format_read() for !img_data requests
  rbd: rework rbd_img_obj_exists_submit() error paths
  rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback()
  rbd: move bumping img_request refcount into rbd_obj_request_submit()
  rbd: mark the original request as done if stat request fails
  ...

1  2 
drivers/block/rbd.c

diff --combined drivers/block/rbd.c
@@@ -31,6 -31,7 +31,7 @@@
  #include <linux/ceph/libceph.h>
  #include <linux/ceph/osd_client.h>
  #include <linux/ceph/mon_client.h>
+ #include <linux/ceph/cls_lock_client.h>
  #include <linux/ceph/decode.h>
  #include <linux/parser.h>
  #include <linux/bsearch.h>
@@@ -114,12 -115,17 +115,17 @@@ static int atomic_dec_return_safe(atomi
  
  #define RBD_OBJ_PREFIX_LEN_MAX        64
  
+ #define RBD_NOTIFY_TIMEOUT    5       /* seconds */
+ #define RBD_RETRY_DELAY               msecs_to_jiffies(1000)
  /* Feature bits */
  
  #define RBD_FEATURE_LAYERING  (1<<0)
  #define RBD_FEATURE_STRIPINGV2        (1<<1)
- #define RBD_FEATURES_ALL \
-           (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
+ #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
+ #define RBD_FEATURES_ALL      (RBD_FEATURE_LAYERING |         \
+                                RBD_FEATURE_STRIPINGV2 |       \
+                                RBD_FEATURE_EXCLUSIVE_LOCK)
  
  /* Features supported by this (client software) implementation. */
  
  /*
   * An RBD device name will be "rbd#", where the "rbd" comes from
   * RBD_DRV_NAME above, and # is a unique integer identifier.
-  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
-  * enough to hold all possible device names.
   */
  #define DEV_NAME_LEN          32
- #define MAX_INT_FORMAT_WIDTH  ((5 * sizeof (int)) / 2 + 1)
  
  /*
   * block device image metadata (in-memory version)
@@@ -322,6 -325,24 +325,24 @@@ struct rbd_img_request 
  #define for_each_obj_request_safe(ireq, oreq, n) \
        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
  
+ enum rbd_watch_state {
+       RBD_WATCH_STATE_UNREGISTERED,
+       RBD_WATCH_STATE_REGISTERED,
+       RBD_WATCH_STATE_ERROR,
+ };
+ enum rbd_lock_state {
+       RBD_LOCK_STATE_UNLOCKED,
+       RBD_LOCK_STATE_LOCKED,
+       RBD_LOCK_STATE_RELEASING,
+ };
+ /* WatchNotify::ClientId */
+ struct rbd_client_id {
+       u64 gid;
+       u64 handle;
+ };
  struct rbd_mapping {
        u64                     size;
        u64                     features;
@@@ -349,13 -370,29 +370,29 @@@ struct rbd_device 
        unsigned long           flags;          /* possibly lock protected */
        struct rbd_spec         *spec;
        struct rbd_options      *opts;
+       char                    *config_info;   /* add{,_single_major} string */
  
        struct ceph_object_id   header_oid;
        struct ceph_object_locator header_oloc;
  
-       struct ceph_file_layout layout;
+       struct ceph_file_layout layout;         /* used for all rbd requests */
  
+       struct mutex            watch_mutex;
+       enum rbd_watch_state    watch_state;
        struct ceph_osd_linger_request *watch_handle;
+       u64                     watch_cookie;
+       struct delayed_work     watch_dwork;
+       struct rw_semaphore     lock_rwsem;
+       enum rbd_lock_state     lock_state;
+       struct rbd_client_id    owner_cid;
+       struct work_struct      acquired_lock_work;
+       struct work_struct      released_lock_work;
+       struct delayed_work     lock_dwork;
+       struct work_struct      unlock_work;
+       wait_queue_head_t       lock_waitq;
+       struct workqueue_struct *task_wq;
  
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
@@@ -439,6 -476,29 +476,29 @@@ static int minor_to_rbd_dev_id(int mino
        return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
  }
  
+ static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
+ {
+       return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+              rbd_dev->spec->snap_id == CEPH_NOSNAP &&
+              !rbd_dev->mapping.read_only;
+ }
+ static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
+ {
+       return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
+              rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
+ }
+ static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
+ {
+       bool is_lock_owner;
+       down_read(&rbd_dev->lock_rwsem);
+       is_lock_owner = __rbd_is_lock_owner(rbd_dev);
+       up_read(&rbd_dev->lock_rwsem);
+       return is_lock_owner;
+ }
  static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
  static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
  static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
@@@ -735,6 -795,7 +795,7 @@@ enum 
        /* string args above */
        Opt_read_only,
        Opt_read_write,
+       Opt_lock_on_read,
        Opt_err
  };
  
@@@ -746,16 -807,19 +807,19 @@@ static match_table_t rbd_opts_tokens = 
        {Opt_read_only, "ro"},          /* Alternate spelling */
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
+       {Opt_lock_on_read, "lock_on_read"},
        {Opt_err, NULL}
  };
  
  struct rbd_options {
        int     queue_depth;
        bool    read_only;
+       bool    lock_on_read;
  };
  
  #define RBD_QUEUE_DEPTH_DEFAULT       BLKDEV_MAX_RQ
  #define RBD_READ_ONLY_DEFAULT false
+ #define RBD_LOCK_ON_READ_DEFAULT false
  
  static int parse_rbd_opts_token(char *c, void *private)
  {
        case Opt_read_write:
                rbd_opts->read_only = false;
                break;
+       case Opt_lock_on_read:
+               rbd_opts->lock_on_read = true;
+               break;
        default:
                /* libceph prints "bad option" msg */
                return -EINVAL;
@@@ -919,7 -986,6 +986,6 @@@ static int rbd_header_from_disk(struct 
        char *snap_names = NULL;
        u64 *snap_sizes = NULL;
        u32 snap_count;
-       size_t size;
        int ret = -ENOMEM;
        u32 i;
  
                        goto out_err;
  
                /* ...as well as the array of their sizes. */
-               size = snap_count * sizeof (*header->snap_sizes);
-               snap_sizes = kmalloc(size, GFP_KERNEL);
+               snap_sizes = kmalloc_array(snap_count,
+                                          sizeof(*header->snap_sizes),
+                                          GFP_KERNEL);
                if (!snap_sizes)
                        goto out_err;
  
@@@ -1551,11 -1617,18 +1617,18 @@@ static bool obj_request_type_valid(enu
        }
  }
  
- static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
-                               struct rbd_obj_request *obj_request)
+ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
+ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
  {
-       dout("%s %p\n", __func__, obj_request);
-       return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
+       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
+       if (obj_request_img_data_test(obj_request)) {
+               WARN_ON(obj_request->callback != rbd_img_obj_callback);
+               rbd_img_request_get(obj_request->img_request);
+       }
+       ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
  }
  
  static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
@@@ -1745,6 -1818,22 +1818,22 @@@ static void rbd_obj_request_complete(st
                complete_all(&obj_request->completion);
  }
  
+ static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
+ {
+       obj_request->result = err;
+       obj_request->xferred = 0;
+       /*
+        * kludge - mirror rbd_obj_request_submit() to match a put in
+        * rbd_img_obj_callback()
+        */
+       if (obj_request_img_data_test(obj_request)) {
+               WARN_ON(obj_request->callback != rbd_img_obj_callback);
+               rbd_img_request_get(obj_request->img_request);
+       }
+       obj_request_done_set(obj_request);
+       rbd_obj_request_complete(obj_request);
+ }
  static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
  {
        struct rbd_img_request *img_request = NULL;
@@@ -1877,11 -1966,10 +1966,10 @@@ static void rbd_osd_req_callback(struc
  
  static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
  {
-       struct rbd_img_request *img_request = obj_request->img_request;
        struct ceph_osd_request *osd_req = obj_request->osd_req;
  
-       if (img_request)
-               osd_req->r_snapid = img_request->snap_id;
+       rbd_assert(obj_request_img_data_test(obj_request));
+       osd_req->r_snapid = obj_request->img_request->snap_id;
  }
  
  static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
@@@ -2074,7 -2162,9 +2162,9 @@@ static void rbd_obj_request_destroy(str
                        bio_chain_put(obj_request->bio_list);
                break;
        case OBJ_REQUEST_PAGES:
-               if (obj_request->pages)
+               /* img_data requests don't own their page array */
+               if (obj_request->pages &&
+                   !obj_request_img_data_test(obj_request))
                        ceph_release_page_vector(obj_request->pages,
                                                obj_request->page_count);
                break;
@@@ -2295,13 -2385,6 +2385,6 @@@ static bool rbd_img_obj_end_request(str
                xferred = obj_request->length;
        }
  
-       /* Image object requests don't own their page array */
-       if (obj_request->type == OBJ_REQUEST_PAGES) {
-               obj_request->pages = NULL;
-               obj_request->page_count = 0;
-       }
        if (img_request_child_test(img_request)) {
                rbd_assert(img_request->obj_request != NULL);
                more = obj_request->which < img_request->obj_request_count - 1;
@@@ -2520,8 -2603,6 +2603,6 @@@ static int rbd_img_request_fill(struct 
  
                rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
  
-               rbd_img_request_get(img_request);
                img_offset += length;
                resid -= length;
        }
@@@ -2579,7 -2660,6 +2660,6 @@@ rbd_img_obj_parent_read_full_callback(s
  {
        struct rbd_obj_request *orig_request;
        struct ceph_osd_request *osd_req;
-       struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
        struct page **pages;
        enum obj_operation_type op_type;
        rbd_assert(obj_request_type_valid(orig_request->type));
        img_result = img_request->result;
        parent_length = img_request->length;
-       rbd_assert(parent_length == img_request->xferred);
+       rbd_assert(img_result || parent_length == img_request->xferred);
        rbd_img_request_put(img_request);
  
        rbd_assert(orig_request->img_request);
         * and re-submit the original write request.
         */
        if (!rbd_dev->parent_overlap) {
-               struct ceph_osd_client *osdc;
                ceph_release_page_vector(pages, page_count);
-               osdc = &rbd_dev->rbd_client->client->osdc;
-               img_result = rbd_obj_request_submit(osdc, orig_request);
-               if (!img_result)
-                       return;
+               rbd_obj_request_submit(orig_request);
+               return;
        }
  
        if (img_result)
  
        /* All set, send it off. */
  
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       img_result = rbd_obj_request_submit(osdc, orig_request);
-       if (!img_result)
-               return;
- out_err:
-       /* Record the error code and complete the request */
+       rbd_obj_request_submit(orig_request);
+       return;
  
-       orig_request->result = img_result;
-       orig_request->xferred = 0;
-       obj_request_done_set(orig_request);
-       rbd_obj_request_complete(orig_request);
+ out_err:
+       ceph_release_page_vector(pages, page_count);
+       rbd_obj_request_error(orig_request, img_result);
  }
  
  /*
   * When the read completes, this page array will be transferred to
   * the original object request for the copyup operation.
   *
-  * If an error occurs, record it as the result of the original
-  * object request and mark it done so it gets completed.
+  * If an error occurs, it is recorded as the result of the original
+  * object request in rbd_img_obj_exists_callback().
   */
  static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
  {
-       struct rbd_img_request *img_request = NULL;
+       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
        struct rbd_img_request *parent_request = NULL;
-       struct rbd_device *rbd_dev;
        u64 img_offset;
        u64 length;
        struct page **pages = NULL;
        u32 page_count;
        int result;
  
-       rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request_type_valid(obj_request->type));
-       img_request = obj_request->img_request;
-       rbd_assert(img_request != NULL);
-       rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev->parent != NULL);
  
        /*
        result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
        if (result)
                goto out_err;
        parent_request->copyup_pages = pages;
        parent_request->copyup_page_count = page_count;
        parent_request->callback = rbd_img_obj_parent_read_full_callback;
        result = rbd_img_request_submit(parent_request);
        if (!result)
                return 0;
@@@ -2757,10 -2822,6 +2822,6 @@@ out_err
                ceph_release_page_vector(pages, page_count);
        if (parent_request)
                rbd_img_request_put(parent_request);
-       obj_request->result = result;
-       obj_request->xferred = 0;
-       obj_request_done_set(obj_request);
        return result;
  }
  
@@@ -2793,17 -2854,13 +2854,13 @@@ static void rbd_img_obj_exists_callback
  
        /*
         * If the overlap has become 0 (most likely because the
-        * image has been flattened) we need to free the pages
-        * and re-submit the original write request.
+        * image has been flattened) we need to re-submit the
+        * original request.
         */
        rbd_dev = orig_request->img_request->rbd_dev;
        if (!rbd_dev->parent_overlap) {
-               struct ceph_osd_client *osdc;
-               osdc = &rbd_dev->rbd_client->client->osdc;
-               result = rbd_obj_request_submit(osdc, orig_request);
-               if (!result)
-                       return;
+               rbd_obj_request_submit(orig_request);
+               return;
        }
  
        /*
                obj_request_existence_set(orig_request, true);
        } else if (result == -ENOENT) {
                obj_request_existence_set(orig_request, false);
-       } else if (result) {
-               orig_request->result = result;
-               goto out;
+       } else {
+               goto fail_orig_request;
        }
  
        /*
         * Resubmit the original request now that we have recorded
         * whether the target object exists.
         */
-       orig_request->result = rbd_img_obj_request_submit(orig_request);
- out:
-       if (orig_request->result)
-               rbd_obj_request_complete(orig_request);
+       result = rbd_img_obj_request_submit(orig_request);
+       if (result)
+               goto fail_orig_request;
+       return;
+ fail_orig_request:
+       rbd_obj_request_error(orig_request, result);
  }
  
  static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
  {
+       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
        struct rbd_obj_request *stat_request;
-       struct rbd_device *rbd_dev;
-       struct ceph_osd_client *osdc;
-       struct page **pages = NULL;
+       struct page **pages;
        u32 page_count;
        size_t size;
        int ret;
  
+       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+                                             OBJ_REQUEST_PAGES);
+       if (!stat_request)
+               return -ENOMEM;
+       stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
+                                                  stat_request);
+       if (!stat_request->osd_req) {
+               ret = -ENOMEM;
+               goto fail_stat_request;
+       }
        /*
         * The response data for a STAT call consists of:
         *     le64 length;
        size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
        page_count = (u32)calc_pages_for(0, size);
        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+       if (IS_ERR(pages)) {
+               ret = PTR_ERR(pages);
+               goto fail_stat_request;
+       }
  
-       ret = -ENOMEM;
-       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
-                                                       OBJ_REQUEST_PAGES);
-       if (!stat_request)
-               goto out;
+       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
+       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+                                    false, false);
  
        rbd_obj_request_get(obj_request);
        stat_request->obj_request = obj_request;
        stat_request->pages = pages;
        stat_request->page_count = page_count;
-       rbd_assert(obj_request->img_request);
-       rbd_dev = obj_request->img_request->rbd_dev;
-       stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-                                                  stat_request);
-       if (!stat_request->osd_req)
-               goto out;
        stat_request->callback = rbd_img_obj_exists_callback;
  
-       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
-       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
-                                       false, false);
-       rbd_osd_req_format_read(stat_request);
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       ret = rbd_obj_request_submit(osdc, stat_request);
- out:
-       if (ret)
-               rbd_obj_request_put(obj_request);
+       rbd_obj_request_submit(stat_request);
+       return 0;
  
+ fail_stat_request:
+       rbd_obj_request_put(stat_request);
        return ret;
  }
  
  static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
  {
-       struct rbd_img_request *img_request;
-       struct rbd_device *rbd_dev;
-       rbd_assert(obj_request_img_data_test(obj_request));
-       img_request = obj_request->img_request;
-       rbd_assert(img_request);
-       rbd_dev = img_request->rbd_dev;
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct rbd_device *rbd_dev = img_request->rbd_dev;
  
        /* Reads */
        if (!img_request_write_test(img_request) &&
  
  static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
  {
-       if (img_obj_request_simple(obj_request)) {
-               struct rbd_device *rbd_dev;
-               struct ceph_osd_client *osdc;
-               rbd_dev = obj_request->img_request->rbd_dev;
-               osdc = &rbd_dev->rbd_client->client->osdc;
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request_type_valid(obj_request->type));
+       rbd_assert(obj_request->img_request);
  
-               return rbd_obj_request_submit(osdc, obj_request);
+       if (img_obj_request_simple(obj_request)) {
+               rbd_obj_request_submit(obj_request);
+               return 0;
        }
  
        /*
@@@ -3006,12 -3057,8 +3057,8 @@@ static void rbd_img_parent_read_callbac
        rbd_assert(obj_request->img_request);
        rbd_dev = obj_request->img_request->rbd_dev;
        if (!rbd_dev->parent_overlap) {
-               struct ceph_osd_client *osdc;
-               osdc = &rbd_dev->rbd_client->client->osdc;
-               img_result = rbd_obj_request_submit(osdc, obj_request);
-               if (!img_result)
-                       return;
+               rbd_obj_request_submit(obj_request);
+               return;
        }
  
        obj_request->result = img_result;
@@@ -3084,153 -3131,897 +3131,897 @@@ out_err
        obj_request_done_set(obj_request);
  }
  
- static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
- static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
+ static const struct rbd_client_id rbd_empty_cid;
  
- static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
-                        u64 notifier_id, void *data, size_t data_len)
+ static bool rbd_cid_equal(const struct rbd_client_id *lhs,
+                         const struct rbd_client_id *rhs)
+ {
+       return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
+ }
+ static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
+ {
+       struct rbd_client_id cid;
+       mutex_lock(&rbd_dev->watch_mutex);
+       cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
+       cid.handle = rbd_dev->watch_cookie;
+       mutex_unlock(&rbd_dev->watch_mutex);
+       return cid;
+ }
+ /*
+  * lock_rwsem must be held for write
+  */
+ static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
+                             const struct rbd_client_id *cid)
+ {
+       dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
+            rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
+            cid->gid, cid->handle);
+       rbd_dev->owner_cid = *cid; /* struct */
+ }
+ static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
+ {
+       mutex_lock(&rbd_dev->watch_mutex);
+       sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
+       mutex_unlock(&rbd_dev->watch_mutex);
+ }
+ /*
+  * lock_rwsem must be held for write
+  */
+ static int rbd_lock(struct rbd_device *rbd_dev)
  {
-       struct rbd_device *rbd_dev = arg;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+       char cookie[32];
        int ret;
  
-       dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
-            cookie, notify_id);
+       WARN_ON(__rbd_is_lock_owner(rbd_dev));
  
-       /*
-        * Until adequate refresh error handling is in place, there is
-        * not much we can do here, except warn.
-        *
-        * See http://tracker.ceph.com/issues/5040
-        */
-       ret = rbd_dev_refresh(rbd_dev);
+       format_lock_cookie(rbd_dev, cookie);
+       ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+                           RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
+                           RBD_LOCK_TAG, "", 0);
        if (ret)
-               rbd_warn(rbd_dev, "refresh failed: %d", ret);
+               return ret;
  
-       ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
-                                  &rbd_dev->header_oloc, notify_id, cookie,
-                                  NULL, 0);
-       if (ret)
-               rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+       rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+       rbd_set_owner_cid(rbd_dev, &cid);
+       queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
+       return 0;
  }
  
- static void rbd_watch_errcb(void *arg, u64 cookie, int err)
+ /*
+  * lock_rwsem must be held for write
+  */
+ static int rbd_unlock(struct rbd_device *rbd_dev)
  {
-       struct rbd_device *rbd_dev = arg;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       char cookie[32];
        int ret;
  
-       rbd_warn(rbd_dev, "encountered watch error: %d", err);
+       WARN_ON(!__rbd_is_lock_owner(rbd_dev));
  
-       __rbd_dev_header_unwatch_sync(rbd_dev);
+       rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
  
-       ret = rbd_dev_header_watch_sync(rbd_dev);
-       if (ret) {
-               rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-               return;
+       format_lock_cookie(rbd_dev, cookie);
+       ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+                             RBD_LOCK_NAME, cookie);
+       if (ret && ret != -ENOENT) {
+               rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
+               return ret;
        }
  
-       ret = rbd_dev_refresh(rbd_dev);
-       if (ret)
-               rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+       rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+       queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
+       return 0;
  }
  
- /*
-  * Initiate a watch request, synchronously.
-  */
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
+                               enum rbd_notify_op notify_op,
+                               struct page ***preply_pages,
                              size_t *preply_len)
  {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct ceph_osd_linger_request *handle;
+       struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+       int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
+       char buf[buf_size];
+       void *p = buf;
  
-       rbd_assert(!rbd_dev->watch_handle);
+       dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
  
-       handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
-                                &rbd_dev->header_oloc, rbd_watch_cb,
-                                rbd_watch_errcb, rbd_dev);
-       if (IS_ERR(handle))
-               return PTR_ERR(handle);
+       /* encode *LockPayload NotifyMessage (op + ClientId) */
+       ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
+       ceph_encode_32(&p, notify_op);
+       ceph_encode_64(&p, cid.gid);
+       ceph_encode_64(&p, cid.handle);
  
-       rbd_dev->watch_handle = handle;
-       return 0;
+       return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
+                               &rbd_dev->header_oloc, buf, buf_size,
+                               RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
  }
  
- static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+ static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
+                              enum rbd_notify_op notify_op)
  {
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       int ret;
+       struct page **reply_pages;
+       size_t reply_len;
  
-       if (!rbd_dev->watch_handle)
-               return;
+       __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
+       ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+ }
  
-       ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
-       if (ret)
-               rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
+ static void rbd_notify_acquired_lock(struct work_struct *work)
+ {
+       struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+                                                 acquired_lock_work);
  
-       rbd_dev->watch_handle = NULL;
+       rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
  }
  
- /*
-  * Tear down a watch request, synchronously.
-  */
- static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+ static void rbd_notify_released_lock(struct work_struct *work)
  {
-       __rbd_dev_header_unwatch_sync(rbd_dev);
+       struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+                                                 released_lock_work);
  
-       dout("%s flushing notifies\n", __func__);
-       ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
+       rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
  }
  
- /*
-  * Synchronous osd object method call.  Returns the number of bytes
-  * returned in the outbound buffer, or a negative error code.
-  */
- static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
-                            const char *object_name,
-                            const char *class_name,
-                            const char *method_name,
-                            const void *outbound,
-                            size_t outbound_size,
-                            void *inbound,
-                            size_t inbound_size)
+ static int rbd_request_lock(struct rbd_device *rbd_dev)
  {
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct rbd_obj_request *obj_request;
-       struct page **pages;
-       u32 page_count;
+       struct page **reply_pages;
+       size_t reply_len;
+       bool lock_owner_responded = false;
        int ret;
  
-       /*
-        * Method calls are ultimately read operations.  The result
-        * should placed into the inbound buffer provided.  They
-        * also supply outbound data--parameters for the object
-        * method.  Currently if this is present it will be a
-        * snapshot id.
-        */
-       page_count = (u32)calc_pages_for(0, inbound_size);
-       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
  
-       ret = -ENOMEM;
-       obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
-                                                       OBJ_REQUEST_PAGES);
-       if (!obj_request)
+       ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
+                                  &reply_pages, &reply_len);
+       if (ret && ret != -ETIMEDOUT) {
+               rbd_warn(rbd_dev, "failed to request lock: %d", ret);
                goto out;
+       }
  
-       obj_request->pages = pages;
-       obj_request->page_count = page_count;
+       if (reply_len > 0 && reply_len <= PAGE_SIZE) {
+               void *p = page_address(reply_pages[0]);
+               void *const end = p + reply_len;
+               u32 n;
  
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-                                                 obj_request);
-       if (!obj_request->osd_req)
-               goto out;
+               ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
+               while (n--) {
+                       u8 struct_v;
+                       u32 len;
  
-       osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
-                                       class_name, method_name);
-       if (outbound_size) {
-               struct ceph_pagelist *pagelist;
+                       ceph_decode_need(&p, end, 8 + 8, e_inval);
+                       p += 8 + 8; /* skip gid and cookie */
  
-               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+                       ceph_decode_32_safe(&p, end, len, e_inval);
+                       if (!len)
+                               continue;
+                       if (lock_owner_responded) {
+                               rbd_warn(rbd_dev,
+                                        "duplicate lock owners detected");
+                               ret = -EIO;
+                               goto out;
+                       }
+                       lock_owner_responded = true;
+                       ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
+                                                 &struct_v, &len);
+                       if (ret) {
+                               rbd_warn(rbd_dev,
+                                        "failed to decode ResponseMessage: %d",
+                                        ret);
+                               goto e_inval;
+                       }
+                       ret = ceph_decode_32(&p);
+               }
+       }
+       if (!lock_owner_responded) {
+               rbd_warn(rbd_dev, "no lock owners detected");
+               ret = -ETIMEDOUT;
+       }
+ out:
+       ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+       return ret;
+ e_inval:
+       ret = -EINVAL;
+       goto out;
+ }
+ static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+ {
+       dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+       cancel_delayed_work(&rbd_dev->lock_dwork);
+       if (wake_all)
+               wake_up_all(&rbd_dev->lock_waitq);
+       else
+               wake_up(&rbd_dev->lock_waitq);
+ }
+ static int get_lock_owner_info(struct rbd_device *rbd_dev,
+                              struct ceph_locker **lockers, u32 *num_lockers)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       u8 lock_type;
+       char *lock_tag;
+       int ret;
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
+                                &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                                &lock_type, &lock_tag, lockers, num_lockers);
+       if (ret)
+               return ret;
+       if (*num_lockers == 0) {
+               dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+               goto out;
+       }
+       if (strcmp(lock_tag, RBD_LOCK_TAG)) {
+               rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
+                        lock_tag);
+               ret = -EBUSY;
+               goto out;
+       }
+       if (lock_type == CEPH_CLS_LOCK_SHARED) {
+               rbd_warn(rbd_dev, "shared lock type detected");
+               ret = -EBUSY;
+               goto out;
+       }
+       if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+                   strlen(RBD_LOCK_COOKIE_PREFIX))) {
+               rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
+                        (*lockers)[0].id.cookie);
+               ret = -EBUSY;
+               goto out;
+       }
+ out:
+       kfree(lock_tag);
+       return ret;
+ }
+ static int find_watcher(struct rbd_device *rbd_dev,
+                       const struct ceph_locker *locker)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_watch_item *watchers;
+       u32 num_watchers;
+       u64 cookie;
+       int i;
+       int ret;
+       ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
+                                     &rbd_dev->header_oloc, &watchers,
+                                     &num_watchers);
+       if (ret)
+               return ret;
+       sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
+       for (i = 0; i < num_watchers; i++) {
+               if (!memcmp(&watchers[i].addr, &locker->info.addr,
+                           sizeof(locker->info.addr)) &&
+                   watchers[i].cookie == cookie) {
+                       struct rbd_client_id cid = {
+                               .gid = le64_to_cpu(watchers[i].name.num),
+                               .handle = cookie,
+                       };
+                       dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
+                            rbd_dev, cid.gid, cid.handle);
+                       rbd_set_owner_cid(rbd_dev, &cid);
+                       ret = 1;
+                       goto out;
+               }
+       }
+       dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
+       ret = 0;
+ out:
+       kfree(watchers);
+       return ret;
+ }
+ /*
+  * lock_rwsem must be held for write
+  */
+ static int rbd_try_lock(struct rbd_device *rbd_dev)
+ {
+       struct ceph_client *client = rbd_dev->rbd_client->client;
+       struct ceph_locker *lockers;
+       u32 num_lockers;
+       int ret;
+       for (;;) {
+               ret = rbd_lock(rbd_dev);
+               if (ret != -EBUSY)
+                       return ret;
+               /* determine if the current lock holder is still alive */
+               ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
+               if (ret)
+                       return ret;
+               if (num_lockers == 0)
+                       goto again;
+               ret = find_watcher(rbd_dev, lockers);
+               if (ret) {
+                       if (ret > 0)
+                               ret = 0; /* have to request lock */
+                       goto out;
+               }
+               rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+                        ENTITY_NAME(lockers[0].id.name));
+               ret = ceph_monc_blacklist_add(&client->monc,
+                                             &lockers[0].info.addr);
+               if (ret) {
+                       rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
+                                ENTITY_NAME(lockers[0].id.name), ret);
+                       goto out;
+               }
+               ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
+                                         &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                                         lockers[0].id.cookie,
+                                         &lockers[0].id.name);
+               if (ret && ret != -ENOENT)
+                       goto out;
+ again:
+               ceph_free_lockers(lockers, num_lockers);
+       }
+ out:
+       ceph_free_lockers(lockers, num_lockers);
+       return ret;
+ }
+ /*
+  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+  */
+ static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
+                                               int *pret)
+ {
+       enum rbd_lock_state lock_state;
+       down_read(&rbd_dev->lock_rwsem);
+       dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+            rbd_dev->lock_state);
+       if (__rbd_is_lock_owner(rbd_dev)) {
+               lock_state = rbd_dev->lock_state;
+               up_read(&rbd_dev->lock_rwsem);
+               return lock_state;
+       }
+       up_read(&rbd_dev->lock_rwsem);
+       down_write(&rbd_dev->lock_rwsem);
+       dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+            rbd_dev->lock_state);
+       if (!__rbd_is_lock_owner(rbd_dev)) {
+               *pret = rbd_try_lock(rbd_dev);
+               if (*pret)
+                       rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+       }
+       lock_state = rbd_dev->lock_state;
+       up_write(&rbd_dev->lock_rwsem);
+       return lock_state;
+ }
+ static void rbd_acquire_lock(struct work_struct *work)
+ {
+       struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+                                           struct rbd_device, lock_dwork);
+       enum rbd_lock_state lock_state;
+       int ret;
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+ again:
+       lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
+       if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
+               if (lock_state == RBD_LOCK_STATE_LOCKED)
+                       wake_requests(rbd_dev, true);
+               dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
+                    rbd_dev, lock_state, ret);
+               return;
+       }
+       ret = rbd_request_lock(rbd_dev);
+       if (ret == -ETIMEDOUT) {
+               goto again; /* treat this as a dead client */
+       } else if (ret < 0) {
+               rbd_warn(rbd_dev, "error requesting lock: %d", ret);
+               mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+                                RBD_RETRY_DELAY);
+       } else {
+               /*
+                * lock owner acked, but resend if we don't see them
+                * release the lock
+                */
+               dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+                    rbd_dev);
+               mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+                   msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
+       }
+ }
+ /*
+  * lock_rwsem must be held for write
+  */
+ static bool rbd_release_lock(struct rbd_device *rbd_dev)
+ {
+       dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+            rbd_dev->lock_state);
+       if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+               return false;
+       rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+       downgrade_write(&rbd_dev->lock_rwsem);
+       /*
+        * Ensure that all in-flight IO is flushed.
+        *
+        * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
+        * may be shared with other devices.
+        */
+       ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+       up_read(&rbd_dev->lock_rwsem);
+       down_write(&rbd_dev->lock_rwsem);
+       dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+            rbd_dev->lock_state);
+       if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
+               return false;
+       if (!rbd_unlock(rbd_dev))
+               /*
+                * Give others a chance to grab the lock - we would re-acquire
+                * almost immediately if we got new IO during ceph_osdc_sync()
+                * otherwise.  We need to ack our own notifications, so this
+                * lock_dwork will be requeued from rbd_wait_state_locked()
+                * after wake_requests() in rbd_handle_released_lock().
+                */
+               cancel_delayed_work(&rbd_dev->lock_dwork);
+       return true;
+ }
+ static void rbd_release_lock_work(struct work_struct *work)
+ {
+       struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+                                                 unlock_work);
+       down_write(&rbd_dev->lock_rwsem);
+       rbd_release_lock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
+ }
+ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
+                                    void **p)
+ {
+       struct rbd_client_id cid = { 0 };
+       if (struct_v >= 2) {
+               cid.gid = ceph_decode_64(p);
+               cid.handle = ceph_decode_64(p);
+       }
+       dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+            cid.handle);
+       if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+               down_write(&rbd_dev->lock_rwsem);
+               if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+                       /*
+                        * we already know that the remote client is
+                        * the owner
+                        */
+                       up_write(&rbd_dev->lock_rwsem);
+                       return;
+               }
+               rbd_set_owner_cid(rbd_dev, &cid);
+               downgrade_write(&rbd_dev->lock_rwsem);
+       } else {
+               down_read(&rbd_dev->lock_rwsem);
+       }
+       if (!__rbd_is_lock_owner(rbd_dev))
+               wake_requests(rbd_dev, false);
+       up_read(&rbd_dev->lock_rwsem);
+ }
+ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
+                                    void **p)
+ {
+       struct rbd_client_id cid = { 0 };
+       if (struct_v >= 2) {
+               cid.gid = ceph_decode_64(p);
+               cid.handle = ceph_decode_64(p);
+       }
+       dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+            cid.handle);
+       if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+               down_write(&rbd_dev->lock_rwsem);
+               if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+                       dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
+                            __func__, rbd_dev, cid.gid, cid.handle,
+                            rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
+                       up_write(&rbd_dev->lock_rwsem);
+                       return;
+               }
+               rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+               downgrade_write(&rbd_dev->lock_rwsem);
+       } else {
+               down_read(&rbd_dev->lock_rwsem);
+       }
+       if (!__rbd_is_lock_owner(rbd_dev))
+               wake_requests(rbd_dev, false);
+       up_read(&rbd_dev->lock_rwsem);
+ }
+ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+                                   void **p)
+ {
+       struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
+       struct rbd_client_id cid = { 0 };
+       bool need_to_send;
+       if (struct_v >= 2) {
+               cid.gid = ceph_decode_64(p);
+               cid.handle = ceph_decode_64(p);
+       }
+       dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+            cid.handle);
+       if (rbd_cid_equal(&cid, &my_cid))
+               return false;
+       down_read(&rbd_dev->lock_rwsem);
+       need_to_send = __rbd_is_lock_owner(rbd_dev);
+       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+               if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
+                       dout("%s rbd_dev %p queueing unlock_work\n", __func__,
+                            rbd_dev);
+                       queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+               }
+       }
+       up_read(&rbd_dev->lock_rwsem);
+       return need_to_send;
+ }
+ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
+                                    u64 notify_id, u64 cookie, s32 *result)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
+       char buf[buf_size];
+       int ret;
+       if (result) {
+               void *p = buf;
+               /* encode ResponseMessage */
+               ceph_start_encoding(&p, 1, 1,
+                                   buf_size - CEPH_ENCODING_START_BLK_LEN);
+               ceph_encode_32(&p, *result);
+       } else {
+               buf_size = 0;
+       }
+       ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
+                                  &rbd_dev->header_oloc, notify_id, cookie,
+                                  buf, buf_size);
+       if (ret)
+               rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
+ }
+ static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
+                                  u64 cookie)
+ {
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
+ }
+ static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
+                                         u64 notify_id, u64 cookie, s32 result)
+ {
+       dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+       __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
+ }
+ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
+                        u64 notifier_id, void *data, size_t data_len)
+ {
+       struct rbd_device *rbd_dev = arg;
+       void *p = data;
+       void *const end = p + data_len;
+       u8 struct_v;
+       u32 len;
+       u32 notify_op;
+       int ret;
+       dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
+            __func__, rbd_dev, cookie, notify_id, data_len);
+       if (data_len) {
+               ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
+                                         &struct_v, &len);
+               if (ret) {
+                       rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
+                                ret);
+                       return;
+               }
+               notify_op = ceph_decode_32(&p);
+       } else {
+               /* legacy notification for header updates */
+               notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
+               len = 0;
+       }
+       dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
+       switch (notify_op) {
+       case RBD_NOTIFY_OP_ACQUIRED_LOCK:
+               rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
+               rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+               break;
+       case RBD_NOTIFY_OP_RELEASED_LOCK:
+               rbd_handle_released_lock(rbd_dev, struct_v, &p);
+               rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+               break;
+       case RBD_NOTIFY_OP_REQUEST_LOCK:
+               if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
+                       /*
+                        * send ResponseMessage(0) back so the client
+                        * can detect a missing owner
+                        */
+                       rbd_acknowledge_notify_result(rbd_dev, notify_id,
+                                                     cookie, 0);
+               else
+                       rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+               break;
+       case RBD_NOTIFY_OP_HEADER_UPDATE:
+               ret = rbd_dev_refresh(rbd_dev);
+               if (ret)
+                       rbd_warn(rbd_dev, "refresh failed: %d", ret);
+               rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+               break;
+       default:
+               if (rbd_is_lock_owner(rbd_dev))
+                       rbd_acknowledge_notify_result(rbd_dev, notify_id,
+                                                     cookie, -EOPNOTSUPP);
+               else
+                       rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+               break;
+       }
+ }
+ static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
+ static void rbd_watch_errcb(void *arg, u64 cookie, int err)
+ {
+       struct rbd_device *rbd_dev = arg;
+       rbd_warn(rbd_dev, "encountered watch error: %d", err);
+       down_write(&rbd_dev->lock_rwsem);
+       rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+       up_write(&rbd_dev->lock_rwsem);
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
+               __rbd_unregister_watch(rbd_dev);
+               rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
+               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
+       }
+       mutex_unlock(&rbd_dev->watch_mutex);
+ }
+ /*
+  * watch_mutex must be locked
+  */
+ static int __rbd_register_watch(struct rbd_device *rbd_dev)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_osd_linger_request *handle;
+       rbd_assert(!rbd_dev->watch_handle);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
+                                &rbd_dev->header_oloc, rbd_watch_cb,
+                                rbd_watch_errcb, rbd_dev);
+       if (IS_ERR(handle))
+               return PTR_ERR(handle);
+       rbd_dev->watch_handle = handle;
+       return 0;
+ }
+ /*
+  * watch_mutex must be locked
+  */
+ static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
+ {
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       int ret;
+       rbd_assert(rbd_dev->watch_handle);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
+       if (ret)
+               rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
+       rbd_dev->watch_handle = NULL;
+ }
+ static int rbd_register_watch(struct rbd_device *rbd_dev)
+ {
+       int ret;
+       mutex_lock(&rbd_dev->watch_mutex);
+       rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
+       ret = __rbd_register_watch(rbd_dev);
+       if (ret)
+               goto out;
+       rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+       rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+ out:
+       mutex_unlock(&rbd_dev->watch_mutex);
+       return ret;
+ }
+ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
+ {
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+       cancel_work_sync(&rbd_dev->acquired_lock_work);
+       cancel_work_sync(&rbd_dev->released_lock_work);
+       cancel_delayed_work_sync(&rbd_dev->lock_dwork);
+       cancel_work_sync(&rbd_dev->unlock_work);
+ }
+ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
+ {
+       WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
+       cancel_tasks_sync(rbd_dev);
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
+               __rbd_unregister_watch(rbd_dev);
+       rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+       mutex_unlock(&rbd_dev->watch_mutex);
+       ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
+ }
+ static void rbd_reregister_watch(struct work_struct *work)
+ {
+       struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+                                           struct rbd_device, watch_dwork);
+       bool was_lock_owner = false;
+       int ret;
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       down_write(&rbd_dev->lock_rwsem);
+       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+               was_lock_owner = rbd_release_lock(rbd_dev);
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
+               goto fail_unlock;
+       ret = __rbd_register_watch(rbd_dev);
+       if (ret) {
+               rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+               if (ret != -EBLACKLISTED)
+                       queue_delayed_work(rbd_dev->task_wq,
+                                          &rbd_dev->watch_dwork,
+                                          RBD_RETRY_DELAY);
+               goto fail_unlock;
+       }
+       rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+       rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+       mutex_unlock(&rbd_dev->watch_mutex);
+       ret = rbd_dev_refresh(rbd_dev);
+       if (ret)
+               rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+       if (was_lock_owner) {
+               ret = rbd_try_lock(rbd_dev);
+               if (ret)
+                       rbd_warn(rbd_dev, "reregisteration lock failed: %d",
+                                ret);
+       }
+       up_write(&rbd_dev->lock_rwsem);
+       wake_requests(rbd_dev, true);
+       return;
+ fail_unlock:
+       mutex_unlock(&rbd_dev->watch_mutex);
+       up_write(&rbd_dev->lock_rwsem);
+ }
+ /*
+  * Synchronous osd object method call.  Returns the number of bytes
+  * returned in the outbound buffer, or a negative error code.
+  */
+ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
+                            const char *object_name,
+                            const char *class_name,
+                            const char *method_name,
+                            const void *outbound,
+                            size_t outbound_size,
+                            void *inbound,
+                            size_t inbound_size)
+ {
+       struct rbd_obj_request *obj_request;
+       struct page **pages;
+       u32 page_count;
+       int ret;
+       /*
+        * Method calls are ultimately read operations.  The result
+        * should placed into the inbound buffer provided.  They
+        * also supply outbound data--parameters for the object
+        * method.  Currently if this is present it will be a
+        * snapshot id.
+        */
+       page_count = (u32)calc_pages_for(0, inbound_size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
+                                                       OBJ_REQUEST_PAGES);
+       if (!obj_request)
+               goto out;
+       obj_request->pages = pages;
+       obj_request->page_count = page_count;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
+                                                 obj_request);
+       if (!obj_request->osd_req)
+               goto out;
+       osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
+                                       class_name, method_name);
+       if (outbound_size) {
+               struct ceph_pagelist *pagelist;
+               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
                if (!pagelist)
                        goto out;
  
        osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
                                        obj_request->pages, inbound_size,
                                        0, false, false);
-       rbd_osd_req_format_read(obj_request);
  
-       ret = rbd_obj_request_submit(osdc, obj_request);
-       if (ret)
-               goto out;
+       rbd_obj_request_submit(obj_request);
        ret = rbd_obj_request_wait(obj_request);
        if (ret)
                goto out;
        return ret;
  }
  
+ /*
+  * lock_rwsem must be held for read
+  */
+ static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
+ {
+       DEFINE_WAIT(wait);
+       do {
+               /*
+                * Note the use of mod_delayed_work() in rbd_acquire_lock()
+                * and cancel_delayed_work() in wake_requests().
+                */
+               dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+               prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
+                                         TASK_UNINTERRUPTIBLE);
+               up_read(&rbd_dev->lock_rwsem);
+               schedule();
+               down_read(&rbd_dev->lock_rwsem);
+       } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+       finish_wait(&rbd_dev->lock_waitq, &wait);
+ }
  static void rbd_queue_workfn(struct work_struct *work)
  {
        struct request *rq = blk_mq_rq_from_pdu(work);
        u64 length = blk_rq_bytes(rq);
        enum obj_operation_type op_type;
        u64 mapping_size;
+       bool must_be_locked;
        int result;
  
        if (rq->cmd_type != REQ_TYPE_FS) {
        if (op_type != OBJ_OP_READ) {
                snapc = rbd_dev->header.snapc;
                ceph_get_snap_context(snapc);
+               must_be_locked = rbd_is_lock_supported(rbd_dev);
+       } else {
+               must_be_locked = rbd_dev->opts->lock_on_read &&
+                                       rbd_is_lock_supported(rbd_dev);
        }
        up_read(&rbd_dev->header_rwsem);
  
                goto err_rq;
        }
  
+       if (must_be_locked) {
+               down_read(&rbd_dev->lock_rwsem);
+               if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+                       rbd_wait_state_locked(rbd_dev);
+       }
        img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
                                             snapc);
        if (!img_request) {
                result = -ENOMEM;
-               goto err_rq;
+               goto err_unlock;
        }
        img_request->rq = rq;
        snapc = NULL; /* img_request consumes a ref */
        if (result)
                goto err_img_request;
  
+       if (must_be_locked)
+               up_read(&rbd_dev->lock_rwsem);
        return;
  
  err_img_request:
        rbd_img_request_put(img_request);
+ err_unlock:
+       if (must_be_locked)
+               up_read(&rbd_dev->lock_rwsem);
  err_rq:
        if (result)
                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@@ -3415,7 -4242,6 +4242,6 @@@ static int rbd_obj_read_sync(struct rbd
                                u64 offset, u64 length, void *buf)
  
  {
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        struct page **pages = NULL;
        u32 page_count;
                                        obj_request->length,
                                        obj_request->offset & ~PAGE_MASK,
                                        false, false);
-       rbd_osd_req_format_read(obj_request);
  
-       ret = rbd_obj_request_submit(osdc, obj_request);
-       if (ret)
-               goto out;
+       rbd_obj_request_submit(obj_request);
        ret = rbd_obj_request_wait(obj_request);
        if (ret)
                goto out;
@@@ -3621,6 -4444,7 +4444,6 @@@ static int rbd_init_request(void *data
  
  static struct blk_mq_ops rbd_mq_ops = {
        .queue_rq       = rbd_queue_rq,
 -      .map_queue      = blk_mq_map_queue,
        .init_request   = rbd_init_request,
  };
  
@@@ -3751,13 -4575,40 +4574,40 @@@ static ssize_t rbd_minor_show(struct de
        return sprintf(buf, "%d\n", rbd_dev->minor);
  }
  
+ static ssize_t rbd_client_addr_show(struct device *dev,
+                                   struct device_attribute *attr, char *buf)
+ {
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       struct ceph_entity_addr *client_addr =
+           ceph_client_addr(rbd_dev->rbd_client->client);
+       return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
+                      le32_to_cpu(client_addr->nonce));
+ }
  static ssize_t rbd_client_id_show(struct device *dev,
                                  struct device_attribute *attr, char *buf)
  {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
  
        return sprintf(buf, "client%lld\n",
-                       ceph_client_id(rbd_dev->rbd_client->client));
+                      ceph_client_gid(rbd_dev->rbd_client->client));
+ }
+ static ssize_t rbd_cluster_fsid_show(struct device *dev,
+                                    struct device_attribute *attr, char *buf)
+ {
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
+ }
+ static ssize_t rbd_config_info_show(struct device *dev,
+                                   struct device_attribute *attr, char *buf)
+ {
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       return sprintf(buf, "%s\n", rbd_dev->config_info);
  }
  
  static ssize_t rbd_pool_show(struct device *dev,
@@@ -3809,6 -4660,14 +4659,14 @@@ static ssize_t rbd_snap_show(struct dev
        return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
  }
  
+ static ssize_t rbd_snap_id_show(struct device *dev,
+                               struct device_attribute *attr, char *buf)
+ {
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
+ }
  /*
   * For a v2 image, shows the chain of parent images, separated by empty
   * lines.  For v1 images or if there is no parent, shows "(no parent
@@@ -3861,13 -4720,17 +4719,17 @@@ static DEVICE_ATTR(size, S_IRUGO, rbd_s
  static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
  static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
  static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
+ static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
  static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
+ static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
+ static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
  static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
  static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
  static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
  static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
  static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
  static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+ static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
  static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
  
  static struct attribute *rbd_attrs[] = {
        &dev_attr_features.attr,
        &dev_attr_major.attr,
        &dev_attr_minor.attr,
+       &dev_attr_client_addr.attr,
        &dev_attr_client_id.attr,
+       &dev_attr_cluster_fsid.attr,
+       &dev_attr_config_info.attr,
        &dev_attr_pool.attr,
        &dev_attr_pool_id.attr,
        &dev_attr_name.attr,
        &dev_attr_image_id.attr,
        &dev_attr_current_snap.attr,
+       &dev_attr_snap_id.attr,
        &dev_attr_parent.attr,
        &dev_attr_refresh.attr,
        NULL
@@@ -3943,18 -4810,32 +4809,32 @@@ static void rbd_spec_free(struct kref *
        kfree(spec);
  }
  
- static void rbd_dev_release(struct device *dev)
+ static void rbd_dev_free(struct rbd_device *rbd_dev)
  {
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       bool need_put = !!rbd_dev->opts;
+       WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+       WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
  
        ceph_oid_destroy(&rbd_dev->header_oid);
        ceph_oloc_destroy(&rbd_dev->header_oloc);
+       kfree(rbd_dev->config_info);
  
        rbd_put_client(rbd_dev->rbd_client);
        rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev->opts);
        kfree(rbd_dev);
+ }
+ static void rbd_dev_release(struct device *dev)
+ {
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       bool need_put = !!rbd_dev->opts;
+       if (need_put) {
+               destroy_workqueue(rbd_dev->task_wq);
+               ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+       }
+       rbd_dev_free(rbd_dev);
  
        /*
         * This is racy, but way better than putting module outside of
                module_put(THIS_MODULE);
  }
  
- static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
-                                        struct rbd_spec *spec,
-                                        struct rbd_options *opts)
+ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
+                                          struct rbd_spec *spec)
  {
        struct rbd_device *rbd_dev;
  
-       rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+       rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
                return NULL;
  
        spin_lock_init(&rbd_dev->lock);
-       rbd_dev->flags = 0;
-       atomic_set(&rbd_dev->parent_ref, 0);
        INIT_LIST_HEAD(&rbd_dev->node);
        init_rwsem(&rbd_dev->header_rwsem);
  
        ceph_oid_init(&rbd_dev->header_oid);
        ceph_oloc_init(&rbd_dev->header_oloc);
  
+       mutex_init(&rbd_dev->watch_mutex);
+       rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+       INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
+       init_rwsem(&rbd_dev->lock_rwsem);
+       rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+       INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
+       INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
+       INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
+       INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
+       init_waitqueue_head(&rbd_dev->lock_waitq);
        rbd_dev->dev.bus = &rbd_bus_type;
        rbd_dev->dev.type = &rbd_device_type;
        rbd_dev->dev.parent = &rbd_root_dev;
  
        rbd_dev->rbd_client = rbdc;
        rbd_dev->spec = spec;
-       rbd_dev->opts = opts;
-       /* Initialize the layout used for all rbd requests */
  
        rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
        rbd_dev->layout.stripe_count = 1;
        rbd_dev->layout.pool_id = spec->pool_id;
        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
  
-       /*
-        * If this is a mapping rbd_dev (as opposed to a parent one),
-        * pin our module.  We have a ref from do_rbd_add(), so use
-        * __module_get().
-        */
-       if (rbd_dev->opts)
-               __module_get(THIS_MODULE);
+       return rbd_dev;
+ }
+ /*
+  * Create a mapping rbd_dev.
+  */
+ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+                                        struct rbd_spec *spec,
+                                        struct rbd_options *opts)
+ {
+       struct rbd_device *rbd_dev;
+       rbd_dev = __rbd_dev_create(rbdc, spec);
+       if (!rbd_dev)
+               return NULL;
+       rbd_dev->opts = opts;
+       /* get an id and fill in device name */
+       rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
+                                        minor_to_rbd_dev_id(1 << MINORBITS),
+                                        GFP_KERNEL);
+       if (rbd_dev->dev_id < 0)
+               goto fail_rbd_dev;
+       sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
+       rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
+                                                  rbd_dev->name);
+       if (!rbd_dev->task_wq)
+               goto fail_dev_id;
+       /* we have a ref from do_rbd_add() */
+       __module_get(THIS_MODULE);
  
+       dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
        return rbd_dev;
+ fail_dev_id:
+       ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+ fail_rbd_dev:
+       rbd_dev_free(rbd_dev);
+       return NULL;
  }
  
  static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@@ -4644,46 -5564,6 +5563,6 @@@ static int rbd_dev_header_info(struct r
        return rbd_dev_v2_header_info(rbd_dev);
  }
  
- /*
-  * Get a unique rbd identifier for the given new rbd_dev, and add
-  * the rbd_dev to the global list.
-  */
- static int rbd_dev_id_get(struct rbd_device *rbd_dev)
- {
-       int new_dev_id;
-       new_dev_id = ida_simple_get(&rbd_dev_id_ida,
-                                   0, minor_to_rbd_dev_id(1 << MINORBITS),
-                                   GFP_KERNEL);
-       if (new_dev_id < 0)
-               return new_dev_id;
-       rbd_dev->dev_id = new_dev_id;
-       spin_lock(&rbd_dev_list_lock);
-       list_add_tail(&rbd_dev->node, &rbd_dev_list);
-       spin_unlock(&rbd_dev_list_lock);
-       dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
-       return 0;
- }
- /*
-  * Remove an rbd_dev from the global list, and record that its
-  * identifier is no longer in use.
-  */
- static void rbd_dev_id_put(struct rbd_device *rbd_dev)
- {
-       spin_lock(&rbd_dev_list_lock);
-       list_del_init(&rbd_dev->node);
-       spin_unlock(&rbd_dev_list_lock);
-       ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
-       dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
- }
  /*
   * Skips over white space at *buf, and updates *buf to point to the
   * first found non-space character (if any). Returns the length of
@@@ -4859,6 -5739,7 +5738,7 @@@ static int rbd_add_parse_args(const cha
  
        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
        rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+       rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
  
        copts = ceph_parse_options(options, mon_addrs,
                                        mon_addrs + mon_addrs_size - 1,
@@@ -5076,8 -5957,7 +5956,7 @@@ static int rbd_dev_probe_parent(struct 
                goto out_err;
        }
  
-       parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
-                               NULL);
+       parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
        if (!parent) {
                ret = -ENOMEM;
                goto out_err;
@@@ -5112,22 -5992,12 +5991,12 @@@ static int rbd_dev_device_setup(struct 
  {
        int ret;
  
-       /* Get an id and fill in device name. */
-       ret = rbd_dev_id_get(rbd_dev);
-       if (ret)
-               goto err_out_unlock;
-       BUILD_BUG_ON(DEV_NAME_LEN
-                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
        /* Record our major and minor device numbers. */
  
        if (!single_major) {
                ret = register_blkdev(0, rbd_dev->name);
                if (ret < 0)
-                       goto err_out_id;
+                       goto err_out_unlock;
  
                rbd_dev->major = ret;
                rbd_dev->minor = 0;
        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        up_write(&rbd_dev->header_rwsem);
  
+       spin_lock(&rbd_dev_list_lock);
+       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       spin_unlock(&rbd_dev_list_lock);
        add_disk(rbd_dev->disk);
-       pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
-               (unsigned long long) rbd_dev->mapping.size);
+       pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+               (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+               rbd_dev->header.features);
  
        return ret;
  
@@@ -5172,8 -6047,6 +6046,6 @@@ err_out_disk
  err_out_blkdev:
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
- err_out_id:
-       rbd_dev_id_put(rbd_dev);
  err_out_unlock:
        up_write(&rbd_dev->header_rwsem);
        return ret;
@@@ -5234,7 -6107,7 +6106,7 @@@ static int rbd_dev_image_probe(struct r
                goto err_out_format;
  
        if (!depth) {
-               ret = rbd_dev_header_watch_sync(rbd_dev);
+               ret = rbd_register_watch(rbd_dev);
                if (ret) {
                        if (ret == -ENOENT)
                                pr_info("image %s/%s does not exist\n",
@@@ -5293,7 -6166,7 +6165,7 @@@ err_out_probe
        rbd_dev_unprobe(rbd_dev);
  err_out_watch:
        if (!depth)
-               rbd_dev_header_unwatch_sync(rbd_dev);
+               rbd_unregister_watch(rbd_dev);
  err_out_format:
        rbd_dev->image_format = 0;
        kfree(rbd_dev->spec->image_id);
@@@ -5345,10 -6218,18 +6217,18 @@@ static ssize_t do_rbd_add(struct bus_ty
        spec = NULL;            /* rbd_dev now owns this */
        rbd_opts = NULL;        /* rbd_dev now owns this */
  
+       rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
+       if (!rbd_dev->config_info) {
+               rc = -ENOMEM;
+               goto err_out_rbd_dev;
+       }
        down_write(&rbd_dev->header_rwsem);
        rc = rbd_dev_image_probe(rbd_dev, 0);
-       if (rc < 0)
+       if (rc < 0) {
+               up_write(&rbd_dev->header_rwsem);
                goto err_out_rbd_dev;
+       }
  
        /* If we are mapping a snapshot it must be marked read-only */
  
        rc = rbd_dev_device_setup(rbd_dev);
        if (rc) {
                /*
-                * rbd_dev_header_unwatch_sync() can't be moved into
+                * rbd_unregister_watch() can't be moved into
                 * rbd_dev_image_release() without refactoring, see
                 * commit 1f3ef78861ac.
                 */
-               rbd_dev_header_unwatch_sync(rbd_dev);
+               rbd_unregister_watch(rbd_dev);
                rbd_dev_image_release(rbd_dev);
                goto out;
        }
@@@ -5375,7 -6256,6 +6255,6 @@@ out
        return rc;
  
  err_out_rbd_dev:
-       up_write(&rbd_dev->header_rwsem);
        rbd_dev_destroy(rbd_dev);
  err_out_client:
        rbd_put_client(rbdc);
@@@ -5405,12 -6285,16 +6284,16 @@@ static ssize_t rbd_add_single_major(str
  static void rbd_dev_device_release(struct rbd_device *rbd_dev)
  {
        rbd_free_disk(rbd_dev);
+       spin_lock(&rbd_dev_list_lock);
+       list_del_init(&rbd_dev->node);
+       spin_unlock(&rbd_dev_list_lock);
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        device_del(&rbd_dev->dev);
        rbd_dev_mapping_clear(rbd_dev);
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
-       rbd_dev_id_put(rbd_dev);
  }
  
  static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@@ -5446,18 -6330,26 +6329,26 @@@ static ssize_t do_rbd_remove(struct bus
        struct rbd_device *rbd_dev = NULL;
        struct list_head *tmp;
        int dev_id;
-       unsigned long ul;
+       char opt_buf[6];
        bool already = false;
+       bool force = false;
        int ret;
  
-       ret = kstrtoul(buf, 10, &ul);
-       if (ret)
-               return ret;
-       /* convert to int; abort if we lost anything in the conversion */
-       dev_id = (int)ul;
-       if (dev_id != ul)
+       dev_id = -1;
+       opt_buf[0] = '\0';
+       sscanf(buf, "%d %5s", &dev_id, opt_buf);
+       if (dev_id < 0) {
+               pr_err("dev_id out of range\n");
                return -EINVAL;
+       }
+       if (opt_buf[0] != '\0') {
+               if (!strcmp(opt_buf, "force")) {
+                       force = true;
+               } else {
+                       pr_err("bad remove option at '%s'\n", opt_buf);
+                       return -EINVAL;
+               }
+       }
  
        ret = -ENOENT;
        spin_lock(&rbd_dev_list_lock);
        }
        if (!ret) {
                spin_lock_irq(&rbd_dev->lock);
-               if (rbd_dev->open_count)
+               if (rbd_dev->open_count && !force)
                        ret = -EBUSY;
                else
                        already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
        if (ret < 0 || already)
                return ret;
  
-       rbd_dev_header_unwatch_sync(rbd_dev);
+       if (force) {
+               /*
+                * Prevent new IO from being queued and wait for existing
+                * IO to complete/fail.
+                */
+               blk_mq_freeze_queue(rbd_dev->disk->queue);
+               blk_set_queue_dying(rbd_dev->disk->queue);
+       }
+       down_write(&rbd_dev->lock_rwsem);
+       if (__rbd_is_lock_owner(rbd_dev))
+               rbd_unlock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
+       rbd_unregister_watch(rbd_dev);
  
        /*
         * Don't free anything from rbd_dev->disk until after all