rbd: retry watch re-registration periodically
authorIlya Dryomov <idryomov@gmail.com>
Fri, 12 Aug 2016 14:11:41 +0000 (16:11 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 24 Aug 2016 21:49:16 +0000 (23:49 +0200)
Revamp watch code to support retrying watch re-registration:

- add rbd_dev->watch_state for more robust errcb handling
- store watch cookie separately to avoid dereferencing watch_handle
  which is set to NULL on unwatch
- move re-register code into a delayed work and retry re-registration
  every second, unless the client is blacklisted

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Mike Christie <mchristi@redhat.com>
Tested-by: Mike Christie <mchristi@redhat.com>
drivers/block/rbd.c
net/ceph/osd_client.c

index 1c805ee..cb96fb1 100644 (file)
@@ -114,6 +114,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 
 #define RBD_OBJ_PREFIX_LEN_MAX 64
 
+#define RBD_RETRY_DELAY                msecs_to_jiffies(1000)
+
 /* Feature bits */
 
 #define RBD_FEATURE_LAYERING   (1<<0)
@@ -319,6 +321,12 @@ struct rbd_img_request {
 #define for_each_obj_request_safe(ireq, oreq, n) \
        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
+enum rbd_watch_state {
+       RBD_WATCH_STATE_UNREGISTERED,
+       RBD_WATCH_STATE_REGISTERED,
+       RBD_WATCH_STATE_ERROR,
+};
+
 struct rbd_mapping {
        u64                     size;
        u64                     features;
@@ -352,7 +360,11 @@ struct rbd_device {
 
        struct ceph_file_layout layout;         /* used for all rbd requests */
 
+       struct mutex            watch_mutex;
+       enum rbd_watch_state    watch_state;
        struct ceph_osd_linger_request *watch_handle;
+       u64                     watch_cookie;
+       struct delayed_work     watch_dwork;
 
        struct workqueue_struct *task_wq;
 
@@ -3083,9 +3095,6 @@ out_err:
        obj_request_done_set(obj_request);
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
-
 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
                         u64 notifier_id, void *data, size_t data_len)
 {
@@ -3113,35 +3122,34 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
                rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
+
 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 {
        struct rbd_device *rbd_dev = arg;
-       int ret;
 
        rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
-       __rbd_dev_header_unwatch_sync(rbd_dev);
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
+               __rbd_unregister_watch(rbd_dev);
+               rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
 
-       ret = rbd_dev_header_watch_sync(rbd_dev);
-       if (ret) {
-               rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-               return;
+               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
        }
-
-       ret = rbd_dev_refresh(rbd_dev);
-       if (ret)
-               rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+       mutex_unlock(&rbd_dev->watch_mutex);
 }
 
 /*
- * Initiate a watch request, synchronously.
+ * watch_mutex must be locked
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static int __rbd_register_watch(struct rbd_device *rbd_dev)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_linger_request *handle;
 
        rbd_assert(!rbd_dev->watch_handle);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
        handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
                                 &rbd_dev->header_oloc, rbd_watch_cb,
@@ -3153,13 +3161,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
        return 0;
 }
 
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+/*
+ * watch_mutex must be locked
+ */
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        int ret;
 
-       if (!rbd_dev->watch_handle)
-               return;
+       rbd_assert(rbd_dev->watch_handle);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
        ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
        if (ret)
@@ -3168,17 +3179,80 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
        rbd_dev->watch_handle = NULL;
 }
 
-/*
- * Tear down a watch request, synchronously.
- */
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_register_watch(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       mutex_lock(&rbd_dev->watch_mutex);
+       rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
+       ret = __rbd_register_watch(rbd_dev);
+       if (ret)
+               goto out;
+
+       rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+       rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+
+out:
+       mutex_unlock(&rbd_dev->watch_mutex);
+       return ret;
+}
+
+static void cancel_tasks_sync(struct rbd_device *rbd_dev)
 {
-       __rbd_dev_header_unwatch_sync(rbd_dev);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+       cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+}
+
+static void rbd_unregister_watch(struct rbd_device *rbd_dev)
+{
+       cancel_tasks_sync(rbd_dev);
+
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
+               __rbd_unregister_watch(rbd_dev);
+       rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+       mutex_unlock(&rbd_dev->watch_mutex);
 
-       dout("%s flushing notifies\n", __func__);
        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 }
 
+static void rbd_reregister_watch(struct work_struct *work)
+{
+       struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+                                           struct rbd_device, watch_dwork);
+       int ret;
+
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+       mutex_lock(&rbd_dev->watch_mutex);
+       if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
+               goto fail_unlock;
+
+       ret = __rbd_register_watch(rbd_dev);
+       if (ret) {
+               rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+               if (ret != -EBLACKLISTED)
+                       queue_delayed_work(rbd_dev->task_wq,
+                                          &rbd_dev->watch_dwork,
+                                          RBD_RETRY_DELAY);
+               goto fail_unlock;
+       }
+
+       rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+       rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+       mutex_unlock(&rbd_dev->watch_mutex);
+
+       ret = rbd_dev_refresh(rbd_dev);
+       if (ret)
+               rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+
+       return;
+
+fail_unlock:
+       mutex_unlock(&rbd_dev->watch_mutex);
+}
+
 /*
  * Synchronous osd object method call.  Returns the number of bytes
  * returned in the outbound buffer, or a negative error code.
@@ -3945,6 +4019,8 @@ static void rbd_spec_free(struct kref *kref)
 
 static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
+       WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+
        ceph_oid_destroy(&rbd_dev->header_oid);
        ceph_oloc_destroy(&rbd_dev->header_oloc);
 
@@ -3991,6 +4067,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
        ceph_oid_init(&rbd_dev->header_oid);
        ceph_oloc_init(&rbd_dev->header_oloc);
 
+       mutex_init(&rbd_dev->watch_mutex);
+       rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+       INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
+
        rbd_dev->dev.bus = &rbd_bus_type;
        rbd_dev->dev.type = &rbd_device_type;
        rbd_dev->dev.parent = &rbd_root_dev;
@@ -5222,7 +5302,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
                goto err_out_format;
 
        if (!depth) {
-               ret = rbd_dev_header_watch_sync(rbd_dev);
+               ret = rbd_register_watch(rbd_dev);
                if (ret) {
                        if (ret == -ENOENT)
                                pr_info("image %s/%s does not exist\n",
@@ -5281,7 +5361,7 @@ err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
        if (!depth)
-               rbd_dev_header_unwatch_sync(rbd_dev);
+               rbd_unregister_watch(rbd_dev);
 err_out_format:
        rbd_dev->image_format = 0;
        kfree(rbd_dev->spec->image_id);
@@ -5348,11 +5428,11 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        rc = rbd_dev_device_setup(rbd_dev);
        if (rc) {
                /*
-                * rbd_dev_header_unwatch_sync() can't be moved into
+                * rbd_unregister_watch() can't be moved into
                 * rbd_dev_image_release() without refactoring, see
                 * commit 1f3ef78861ac.
                 */
-               rbd_dev_header_unwatch_sync(rbd_dev);
+               rbd_unregister_watch(rbd_dev);
                rbd_dev_image_release(rbd_dev);
                goto out;
        }
@@ -5473,7 +5553,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
        if (ret < 0 || already)
                return ret;
 
-       rbd_dev_header_unwatch_sync(rbd_dev);
+       rbd_unregister_watch(rbd_dev);
 
        /*
         * Don't free anything from rbd_dev->disk until after all
index fbc6b70..d9bf7a1 100644 (file)
@@ -4014,6 +4014,7 @@ EXPORT_SYMBOL(ceph_osdc_list_watchers);
  */
 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
 {
+       dout("%s osdc %p\n", __func__, osdc);
        flush_workqueue(osdc->notify_wq);
 }
 EXPORT_SYMBOL(ceph_osdc_flush_notifies);