libceph: pool deletion detection
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:27 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:29 +0000 (01:15 +0200)
This adds the "map check" infrastructure for sending osdmap version
checks on CALC_TARGET_POOL_DNE and completing in-flight requests with
-ENOENT if the target pool doesn't exist or has just been deleted.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index 2ae7cfd..3e7bf72 100644 (file)
@@ -151,6 +151,7 @@ struct ceph_osd_request_target {
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
        struct rb_node  r_node;
+       struct rb_node  r_mc_node;          /* map check */
        struct ceph_osd *r_osd;
 
        struct ceph_osd_request_target r_t;
@@ -191,6 +192,7 @@ struct ceph_osd_request {
        int r_attempts;
        struct ceph_eversion r_replay_version; /* aka reassert_version */
        u32 r_last_force_resend;
+       u32 r_map_dne_bound;
 
        struct ceph_osd_req_op r_ops[];
 };
@@ -218,6 +220,7 @@ struct ceph_osd_linger_request {
 
        struct ceph_osd_request_target t;
        u32 last_force_resend;
+       u32 map_dne_bound;
 
        struct timespec mtime;
 
@@ -225,6 +228,7 @@ struct ceph_osd_linger_request {
        struct mutex lock;
        struct rb_node node;            /* osd */
        struct rb_node osdc_node;       /* osdc */
+       struct rb_node mc_node;         /* map check */
        struct list_head scan_item;
 
        struct completion reg_commit_wait;
@@ -257,6 +261,8 @@ struct ceph_osd_client {
        atomic64_t             last_tid;      /* tid of last request */
        u64                    last_linger_id;
        struct rb_root         linger_requests; /* lingering requests */
+       struct rb_root         map_checks;
+       struct rb_root         linger_map_checks;
        atomic_t               num_requests;
        atomic_t               num_homeless;
        struct delayed_work    timeout_work;
index 5ac6dce..ece2d10 100644 (file)
@@ -396,6 +396,7 @@ static void target_destroy(struct ceph_osd_request_target *t)
 static void request_release_checks(struct ceph_osd_request *req)
 {
        WARN_ON(!RB_EMPTY_NODE(&req->r_node));
+       WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
        WARN_ON(!list_empty(&req->r_unsafe_item));
        WARN_ON(req->r_osd);
 }
@@ -456,6 +457,7 @@ static void request_init(struct ceph_osd_request *req)
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
        RB_CLEAR_NODE(&req->r_node);
+       RB_CLEAR_NODE(&req->r_mc_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
 
        target_init(&req->r_t);
@@ -969,6 +971,7 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
  * We keep osd requests in an rbtree, sorted by ->r_tid.
  */
 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
+DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
 
 static bool osd_homeless(struct ceph_osd *osd)
 {
@@ -1601,10 +1604,13 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
                ceph_monc_renew_subs(&osdc->client->monc);
 }
 
+static void send_map_check(struct ceph_osd_request *req);
+
 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
        struct ceph_osd *osd;
+       enum calc_target_result ct_res;
        bool need_send = false;
        bool promoted = false;
 
@@ -1612,7 +1618,10 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
        dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
 
 again:
-       calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+       ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+       if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
+               goto promote;
+
        osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
        if (IS_ERR(osd)) {
                WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
@@ -1656,6 +1665,9 @@ again:
                send_request(req);
        mutex_unlock(&osd->lock);
 
+       if (ct_res == CALC_TARGET_POOL_DNE)
+               send_map_check(req);
+
        if (promoted)
                downgrade_write(&osdc->lock);
        return;
@@ -1699,6 +1711,7 @@ static void __finish_request(struct ceph_osd_request *req)
        verify_osd_locked(osd);
        dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+       WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
        unlink_request(osd, req);
        atomic_dec(&osdc->num_requests);
 
@@ -1726,13 +1739,127 @@ static void __complete_request(struct ceph_osd_request *req)
                complete_all(&req->r_completion);
 }
 
+/*
+ * Note that this is open-coded in handle_reply(), which has to deal
+ * with ack vs commit, dup acks, etc.
+ */
+static void complete_request(struct ceph_osd_request *req, int err)
+{
+       dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
+
+       req->r_result = err;
+       __finish_request(req);
+       __complete_request(req);
+       complete_all(&req->r_safe_completion);
+       ceph_osdc_put_request(req);
+}
+
+static void cancel_map_check(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd_request *lookup_req;
+
+       verify_osdc_wrlocked(osdc);
+
+       lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+       if (!lookup_req)
+               return;
+
+       WARN_ON(lookup_req != req);
+       erase_request_mc(&osdc->map_checks, req);
+       ceph_osdc_put_request(req);
+}
+
 static void cancel_request(struct ceph_osd_request *req)
 {
        dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+       cancel_map_check(req);
        finish_request(req);
 }
 
+static void check_pool_dne(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osdmap *map = osdc->osdmap;
+
+       verify_osdc_wrlocked(osdc);
+       WARN_ON(!map->epoch);
+
+       if (req->r_attempts) {
+               /*
+                * We sent a request earlier, which means that
+                * previously the pool existed, and now it does not
+                * (i.e., it was deleted).
+                */
+               req->r_map_dne_bound = map->epoch;
+               dout("%s req %p tid %llu pool disappeared\n", __func__, req,
+                    req->r_tid);
+       } else {
+               dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
+                    req, req->r_tid, req->r_map_dne_bound, map->epoch);
+       }
+
+       if (req->r_map_dne_bound) {
+               if (map->epoch >= req->r_map_dne_bound) {
+                       /* we had a new enough map */
+                       pr_info_ratelimited("tid %llu pool does not exist\n",
+                                           req->r_tid);
+                       complete_request(req, -ENOENT);
+               }
+       } else {
+               send_map_check(req);
+       }
+}
+
+static void map_check_cb(struct ceph_mon_generic_request *greq)
+{
+       struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+       struct ceph_osd_request *req;
+       u64 tid = greq->private_data;
+
+       WARN_ON(greq->result || !greq->u.newest);
+
+       down_write(&osdc->lock);
+       req = lookup_request_mc(&osdc->map_checks, tid);
+       if (!req) {
+               dout("%s tid %llu dne\n", __func__, tid);
+               goto out_unlock;
+       }
+
+       dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
+            req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
+       if (!req->r_map_dne_bound)
+               req->r_map_dne_bound = greq->u.newest;
+       erase_request_mc(&osdc->map_checks, req);
+       check_pool_dne(req);
+
+       ceph_osdc_put_request(req);
+out_unlock:
+       up_write(&osdc->lock);
+}
+
+static void send_map_check(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd_request *lookup_req;
+       int ret;
+
+       verify_osdc_wrlocked(osdc);
+
+       lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+       if (lookup_req) {
+               WARN_ON(lookup_req != req);
+               return;
+       }
+
+       ceph_osdc_get_request(req);
+       insert_request_mc(&osdc->map_checks, req);
+       ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+                                         map_check_cb, req->r_tid);
+       WARN_ON(ret);
+}
+
 /*
  * lingering requests, watch/notify v2 infrastructure
  */
@@ -1745,6 +1872,7 @@ static void linger_release(struct kref *kref)
             lreq->reg_req, lreq->ping_req);
        WARN_ON(!RB_EMPTY_NODE(&lreq->node));
        WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
+       WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
        WARN_ON(!list_empty(&lreq->scan_item));
        WARN_ON(!list_empty(&lreq->pending_lworks));
        WARN_ON(lreq->osd);
@@ -1783,6 +1911,7 @@ linger_alloc(struct ceph_osd_client *osdc)
        mutex_init(&lreq->lock);
        RB_CLEAR_NODE(&lreq->node);
        RB_CLEAR_NODE(&lreq->osdc_node);
+       RB_CLEAR_NODE(&lreq->mc_node);
        INIT_LIST_HEAD(&lreq->scan_item);
        INIT_LIST_HEAD(&lreq->pending_lworks);
        init_completion(&lreq->reg_commit_wait);
@@ -1797,6 +1926,7 @@ linger_alloc(struct ceph_osd_client *osdc)
 
 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
+DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
 
 /*
  * Create linger request <-> OSD session relation.
@@ -2193,6 +2323,23 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
        send_linger(lreq);
 }
 
+static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+       struct ceph_osd_client *osdc = lreq->osdc;
+       struct ceph_osd_linger_request *lookup_lreq;
+
+       verify_osdc_wrlocked(osdc);
+
+       lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+                                      lreq->linger_id);
+       if (!lookup_lreq)
+               return;
+
+       WARN_ON(lookup_lreq != lreq);
+       erase_linger_mc(&osdc->linger_map_checks, lreq);
+       linger_put(lreq);
+}
+
 /*
  * @lreq has to be both registered and linked.
  */
@@ -2202,6 +2349,7 @@ static void __linger_cancel(struct ceph_osd_linger_request *lreq)
                cancel_linger_request(lreq->ping_req);
        if (lreq->reg_req->r_osd)
                cancel_linger_request(lreq->reg_req);
+       cancel_linger_map_check(lreq);
        unlink_linger(lreq->osd, lreq);
        linger_unregister(lreq);
 }
@@ -2216,6 +2364,89 @@ static void linger_cancel(struct ceph_osd_linger_request *lreq)
        up_write(&osdc->lock);
 }
 
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
+
+static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
+{
+       struct ceph_osd_client *osdc = lreq->osdc;
+       struct ceph_osdmap *map = osdc->osdmap;
+
+       verify_osdc_wrlocked(osdc);
+       WARN_ON(!map->epoch);
+
+       if (lreq->register_gen) {
+               lreq->map_dne_bound = map->epoch;
+               dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
+                    lreq, lreq->linger_id);
+       } else {
+               dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
+                    __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+                    map->epoch);
+       }
+
+       if (lreq->map_dne_bound) {
+               if (map->epoch >= lreq->map_dne_bound) {
+                       /* we had a new enough map */
+                       pr_info("linger_id %llu pool does not exist\n",
+                               lreq->linger_id);
+                       linger_reg_commit_complete(lreq, -ENOENT);
+                       __linger_cancel(lreq);
+               }
+       } else {
+               send_linger_map_check(lreq);
+       }
+}
+
+static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
+{
+       struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+       struct ceph_osd_linger_request *lreq;
+       u64 linger_id = greq->private_data;
+
+       WARN_ON(greq->result || !greq->u.newest);
+
+       down_write(&osdc->lock);
+       lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
+       if (!lreq) {
+               dout("%s linger_id %llu dne\n", __func__, linger_id);
+               goto out_unlock;
+       }
+
+       dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
+            __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+            greq->u.newest);
+       if (!lreq->map_dne_bound)
+               lreq->map_dne_bound = greq->u.newest;
+       erase_linger_mc(&osdc->linger_map_checks, lreq);
+       check_linger_pool_dne(lreq);
+
+       linger_put(lreq);
+out_unlock:
+       up_write(&osdc->lock);
+}
+
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+       struct ceph_osd_client *osdc = lreq->osdc;
+       struct ceph_osd_linger_request *lookup_lreq;
+       int ret;
+
+       verify_osdc_wrlocked(osdc);
+
+       lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+                                      lreq->linger_id);
+       if (lookup_lreq) {
+               WARN_ON(lookup_lreq != lreq);
+               return;
+       }
+
+       linger_get(lreq);
+       insert_linger_mc(&osdc->linger_map_checks, lreq);
+       ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+                                         linger_map_check_cb, lreq->linger_id);
+       WARN_ON(ret);
+}
+
 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
 {
        int ret;
@@ -2677,10 +2908,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
        return;
 
 fail_request:
-       req->r_result = -EIO;
-       __finish_request(req);
-       __complete_request(req);
-       complete_all(&req->r_safe_completion);
+       complete_request(req, -EIO);
 out_unlock_session:
        mutex_unlock(&osd->lock);
 out_unlock_osdc:
@@ -2764,6 +2992,7 @@ static void scan_requests(struct ceph_osd *osd,
 
                        /* fall through */
                case CALC_TARGET_NEED_RESEND:
+                       cancel_linger_map_check(lreq);
                        /*
                         * scan_requests() for the previous epoch(s)
                         * may have already added it to the list, since
@@ -2773,6 +3002,7 @@ static void scan_requests(struct ceph_osd *osd,
                                list_add_tail(&lreq->scan_item, need_resend_linger);
                        break;
                case CALC_TARGET_POOL_DNE:
+                       check_linger_pool_dne(lreq);
                        break;
                }
        }
@@ -2782,7 +3012,7 @@ static void scan_requests(struct ceph_osd *osd,
                    rb_entry(n, struct ceph_osd_request, r_node);
                enum calc_target_result ct_res;
 
-               n = rb_next(n); /* unlink_request() */
+               n = rb_next(n); /* unlink_request(), check_pool_dne() */
 
                dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
                ct_res = calc_target(osdc, &req->r_t,
@@ -2799,10 +3029,12 @@ static void scan_requests(struct ceph_osd *osd,
 
                        /* fall through */
                case CALC_TARGET_NEED_RESEND:
+                       cancel_map_check(req);
                        unlink_request(osd, req);
                        insert_request(need_resend, req);
                        break;
                case CALC_TARGET_POOL_DNE:
+                       check_pool_dne(req);
                        break;
                }
        }
@@ -3655,6 +3887,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->homeless_osd.o_osdc = osdc;
        osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
        osdc->linger_requests = RB_ROOT;
+       osdc->map_checks = RB_ROOT;
+       osdc->linger_map_checks = RB_ROOT;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
 
@@ -3720,6 +3954,8 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 
        WARN_ON(!list_empty(&osdc->osd_lru));
        WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
+       WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
+       WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
        WARN_ON(atomic_read(&osdc->num_requests));
        WARN_ON(atomic_read(&osdc->num_homeless));