libceph: resend lingering requests with a new tid
[cascardo/linux.git] / net / ceph / osd_client.c
index 648a215..84b0224 100644 (file)
@@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req);
+static void __unregister_request(struct ceph_osd_client *osdc,
+                                struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req);
+static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
@@ -892,6 +895,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+static void __kick_linger_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd *osd = req->r_osd;
+
+       /*
+        * Linger requests need to be resent with a new tid to avoid
+        * the dup op detection logic on the OSDs.  Achieve this with
+        * a re-register dance instead of open-coding.
+        */
+       ceph_osdc_get_request(req);
+       if (!list_empty(&req->r_linger_item))
+               __unregister_linger_request(osdc, req);
+       else
+               __unregister_request(osdc, req);
+       __register_request(osdc, req);
+       ceph_osdc_put_request(req);
+
+       /*
+        * Unless request has been registered as both normal and
+        * lingering, __unregister{,_linger}_request clears r_osd.
+        * However, here we need to preserve r_osd to make sure we
+        * requeue on the same OSD.
+        */
+       WARN_ON(req->r_osd || !osd);
+       req->r_osd = osd;
+
+       dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
+       __enqueue_request(req);
+}
+
 /*
  * Resubmit requests pending on the given osd.
  */
@@ -900,12 +934,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req, *nreq;
        LIST_HEAD(resend);
+       LIST_HEAD(resend_linger);
        int err;
 
-       dout("__kick_osd_requests osd%d\n", osd->o_osd);
+       dout("%s osd%d\n", __func__, osd->o_osd);
        err = __reset_osd(osdc, osd);
        if (err)
                return;
+
        /*
         * Build up a list of requests to resend by traversing the
         * osd's list of requests.  Requests for a given object are
@@ -926,33 +962,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
                if (!req->r_sent)
                        break;
-               list_move_tail(&req->r_req_lru_item, &resend);
-               dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
-               if (!req->r_linger)
+
+               if (!req->r_linger) {
+                       dout("%s requeueing %p tid %llu\n", __func__, req,
+                            req->r_tid);
+                       list_move_tail(&req->r_req_lru_item, &resend);
                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               } else {
+                       list_move_tail(&req->r_req_lru_item, &resend_linger);
+               }
        }
        list_splice(&resend, &osdc->req_unsent);
 
        /*
-        * Linger requests are re-registered before sending, which
-        * sets up a new tid for each.  We add them to the unsent
-        * list at the end to keep things in tid order.
+        * Both registered and not yet registered linger requests are
+        * enqueued with a new tid on the same OSD.  We add/move them
+        * to req_unsent/o_requests at the end to keep things in tid
+        * order.
         */
        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
                                 r_linger_osd_item) {
-               /*
-                * reregister request prior to unregistering linger so
-                * that r_osd is preserved.
-                */
-               BUG_ON(!list_empty(&req->r_req_lru_item));
-               __register_request(osdc, req);
-               list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               __unregister_linger_request(osdc, req);
-               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
+               WARN_ON(!list_empty(&req->r_req_lru_item));
+               __kick_linger_request(req);
        }
+
+       list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
+               __kick_linger_request(req);
 }
 
 /*