libceph: redo callbacks and factor out MOSDOpReply decoding
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:24 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 22:36:28 +0000 (00:36 +0200)
If you specify ACK | ONDISK and set ->r_unsafe_callback, both
->r_callback and ->r_unsafe_callback(true) are called on ack.  This is
very confusing.  Redo this so that only one of them is called:

    ->r_unsafe_callback(true), on ack
    ->r_unsafe_callback(false), on commit

or

    ->r_callback, on ack|commit

Decode everything in decode_MOSDOpReply() to reduce clutter.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/addr.c
fs/ceph/file.c
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index a11756a..f474184 100644 (file)
@@ -1765,8 +1765,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
                goto out_unlock;
        }
 
-       wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
-                         CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+       wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;
        osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
        ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
        ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
index e75fd0b..30fd49e 100644 (file)
@@ -770,6 +770,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
                list_add_tail(&req->r_unsafe_item,
                              &ci->i_unsafe_writes);
                spin_unlock(&ci->i_unsafe_lock);
+
+               complete_all(&req->r_completion);
        } else {
                spin_lock(&ci->i_unsafe_lock);
                list_del_init(&req->r_unsafe_item);
index 3bebd60..2415dc0 100644 (file)
@@ -162,13 +162,14 @@ struct ceph_osd_request {
        unsigned int            r_num_ops;
 
        int               r_result;
-       int               r_got_reply;
+       bool              r_got_reply;
        int               r_linger;
 
        struct ceph_osd_client *r_osdc;
        struct kref       r_kref;
        bool              r_mempool;
-       struct completion r_completion, r_safe_completion;
+       struct completion r_completion;
+       struct completion r_safe_completion;  /* fsync waiter */
        ceph_osdc_callback_t r_callback;
        ceph_osdc_unsafe_callback_t r_unsafe_callback;
        struct list_head  r_unsafe_item;
index 2a30c0b..baf2844 100644 (file)
@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
        return 0;
 }
 
+static void __complete_request(struct ceph_osd_request *req)
+{
+       if (req->r_callback)
+               req->r_callback(req);
+       else
+               complete_all(&req->r_completion);
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -1875,107 +1883,76 @@ e_inval:
        goto out;
 }
 
-static void complete_request(struct ceph_osd_request *req)
-{
-       complete_all(&req->r_safe_completion);  /* fsync waiter */
-}
+struct MOSDOpReply {
+       struct ceph_pg pgid;
+       u64 flags;
+       int result;
+       u32 epoch;
+       int num_ops;
+       u32 outdata_len[CEPH_OSD_MAX_OPS];
+       s32 rval[CEPH_OSD_MAX_OPS];
+       int retry_attempt;
+       struct ceph_eversion replay_version;
+       u64 user_version;
+       struct ceph_request_redirect redirect;
+};
 
-/*
- * handle osd op reply.  either call the callback if it is specified,
- * or do the completion to wake up the waiting thread.
- */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
 {
-       void *p, *end;
-       struct ceph_osd_request *req;
-       struct ceph_request_redirect redir;
-       u64 tid;
-       int object_len;
-       unsigned int numops;
-       int payload_len, flags;
-       s32 result;
-       s32 retry_attempt;
-       struct ceph_pg pg;
-       int err;
-       u32 reassert_epoch;
-       u64 reassert_version;
-       u32 osdmap_epoch;
-       int already_completed;
-       u32 bytes;
+       void *p = msg->front.iov_base;
+       void *const end = p + msg->front.iov_len;
+       u16 version = le16_to_cpu(msg->hdr.version);
+       struct ceph_eversion bad_replay_version;
        u8 decode_redir;
-       unsigned int i;
-
-       tid = le64_to_cpu(msg->hdr.tid);
-       dout("handle_reply %p tid %llu\n", msg, tid);
-
-       p = msg->front.iov_base;
-       end = p + msg->front.iov_len;
+       u32 len;
+       int ret;
+       int i;
 
-       ceph_decode_need(&p, end, 4, bad);
-       object_len = ceph_decode_32(&p);
-       ceph_decode_need(&p, end, object_len, bad);
-       p += object_len;
+       ceph_decode_32_safe(&p, end, len, e_inval);
+       ceph_decode_need(&p, end, len, e_inval);
+       p += len; /* skip oid */
 
-       err = ceph_decode_pgid(&p, end, &pg);
-       if (err)
-               goto bad;
+       ret = ceph_decode_pgid(&p, end, &m->pgid);
+       if (ret)
+               return ret;
 
-       ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
-       flags = ceph_decode_64(&p);
-       result = ceph_decode_32(&p);
-       reassert_epoch = ceph_decode_32(&p);
-       reassert_version = ceph_decode_64(&p);
-       osdmap_epoch = ceph_decode_32(&p);
+       ceph_decode_64_safe(&p, end, m->flags, e_inval);
+       ceph_decode_32_safe(&p, end, m->result, e_inval);
+       ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
+       memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
+       p += sizeof(bad_replay_version);
+       ceph_decode_32_safe(&p, end, m->epoch, e_inval);
 
-       /* lookup */
-       down_read(&osdc->map_sem);
-       mutex_lock(&osdc->request_mutex);
-       req = lookup_request(&osdc->requests, tid);
-       if (req == NULL) {
-               dout("handle_reply tid %llu dne\n", tid);
-               goto bad_mutex;
-       }
-       ceph_osdc_get_request(req);
+       ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
+       if (m->num_ops > ARRAY_SIZE(m->outdata_len))
+               goto e_inval;
 
-       dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
-            req, result);
-
-       ceph_decode_need(&p, end, 4, bad_put);
-       numops = ceph_decode_32(&p);
-       if (numops > CEPH_OSD_MAX_OPS)
-               goto bad_put;
-       if (numops != req->r_num_ops)
-               goto bad_put;
-       payload_len = 0;
-       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
-       for (i = 0; i < numops; i++) {
+       ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
+                        e_inval);
+       for (i = 0; i < m->num_ops; i++) {
                struct ceph_osd_op *op = p;
-               int len;
 
-               len = le32_to_cpu(op->payload_len);
-               req->r_ops[i].outdata_len = len;
-               dout(" op %d has %d bytes\n", i, len);
-               payload_len += len;
+               m->outdata_len[i] = le32_to_cpu(op->payload_len);
                p += sizeof(*op);
        }
-       bytes = le32_to_cpu(msg->hdr.data_len);
-       if (payload_len != bytes) {
-               pr_warn("sum of op payload lens %d != data_len %d\n",
-                       payload_len, bytes);
-               goto bad_put;
-       }
 
-       ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
-       retry_attempt = ceph_decode_32(&p);
-       for (i = 0; i < numops; i++)
-               req->r_ops[i].rval = ceph_decode_32(&p);
+       ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
+       for (i = 0; i < m->num_ops; i++)
+               ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
 
-       if (le16_to_cpu(msg->hdr.version) >= 6) {
-               p += 8 + 4; /* skip replay_version */
-               p += 8; /* skip user_version */
+       if (version >= 5) {
+               ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
+               memcpy(&m->replay_version, p, sizeof(m->replay_version));
+               p += sizeof(m->replay_version);
+               ceph_decode_64_safe(&p, end, m->user_version, e_inval);
+       } else {
+               m->replay_version = bad_replay_version; /* struct */
+               m->user_version = le64_to_cpu(m->replay_version.version);
+       }
 
-               if (le16_to_cpu(msg->hdr.version) >= 7)
-                       ceph_decode_8_safe(&p, end, decode_redir, bad_put);
+       if (version >= 6) {
+               if (version >= 7)
+                       ceph_decode_8_safe(&p, end, decode_redir, e_inval);
                else
                        decode_redir = 1;
        } else {
@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        }
 
        if (decode_redir) {
-               err = ceph_redirect_decode(&p, end, &redir);
-               if (err)
-                       goto bad_put;
+               ret = ceph_redirect_decode(&p, end, &m->redirect);
+               if (ret)
+                       return ret;
        } else {
-               redir.oloc.pool = -1;
+               ceph_oloc_init(&m->redirect.oloc);
        }
 
-       if (!ceph_oloc_empty(&redir.oloc)) {
-               dout("redirect pool %lld\n", redir.oloc.pool);
+       return 0;
+
+e_inval:
+       return -EINVAL;
+}
+
+/*
+ * We are done with @req if
+ *   - @m is a safe reply, or
+ *   - @m is an unsafe reply and we didn't want a safe one
+ */
+static bool done_request(const struct ceph_osd_request *req,
+                        const struct MOSDOpReply *m)
+{
+       return (m->result < 0 ||
+               (m->flags & CEPH_OSD_FLAG_ONDISK) ||
+               !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
+}
 
+/*
+ * handle osd op reply.  either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
+ *
+ * ->r_unsafe_callback is set? yes                     no
+ *
+ * first reply is OK (needed   r_cb/r_completion,      r_cb/r_completion,
+ * any or needed/got safe)     r_safe_completion       r_safe_completion
+ *
+ * first reply is unsafe       r_unsafe_cb(true)       (nothing)
+ *
+ * when we get the safe reply  r_unsafe_cb(false),     r_cb/r_completion,
+ *                             r_safe_completion       r_safe_completion
+ */
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+       struct ceph_osd_request *req;
+       struct MOSDOpReply m;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
+       u32 data_len = 0;
+       bool already_acked;
+       int ret;
+       int i;
+
+       dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+       down_read(&osdc->map_sem);
+       mutex_lock(&osdc->request_mutex);
+       req = lookup_request(&osdc->requests, tid);
+       if (!req) {
+               dout("%s no tid %llu\n", __func__, tid);
+               goto out_unlock;
+       }
+       ceph_osdc_get_request(req);
+
+       ret = decode_MOSDOpReply(msg, &m);
+       if (ret) {
+               pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
+                      req->r_tid, ret);
+               ceph_msg_dump(msg);
+               goto fail_request;
+       }
+       dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
+            __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
+            m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
+            le64_to_cpu(m.replay_version.version), m.user_version);
+
+       if (m.retry_attempt >= 0) {
+               if (m.retry_attempt != req->r_attempts - 1) {
+                       dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
+                            req, req->r_tid, m.retry_attempt,
+                            req->r_attempts - 1);
+                       goto out_put;
+               }
+       } else {
+               WARN_ON(1); /* MOSDOpReply v4 is assumed */
+       }
+
+       if (!ceph_oloc_empty(&m.redirect.oloc)) {
+               dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
+                    m.redirect.oloc.pool);
                __unregister_request(osdc, req);
 
-               ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc);
+               ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
 
                /*
                 * Start redirect requests with nofail=true.  If
@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                 * successfully.  In the future we might want to follow
                 * original request's nofail setting here.
                 */
-               err = __ceph_osdc_start_request(osdc, req, true);
-               BUG_ON(err);
+               ret = __ceph_osdc_start_request(osdc, req, true);
+               BUG_ON(ret);
 
-               goto out_unlock;
+               goto out_put;
        }
 
-       already_completed = req->r_got_reply;
-       if (!req->r_got_reply) {
-               req->r_result = result;
-               dout("handle_reply result %d bytes %d\n", req->r_result,
-                    bytes);
-               if (req->r_result == 0)
-                       req->r_result = bytes;
-
-               /* in case this is a write and we need to replay, */
-               req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
-               req->r_replay_version.version = cpu_to_le64(reassert_version);
-
-               req->r_got_reply = 1;
-       } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
-               dout("handle_reply tid %llu dup ack\n", tid);
-               goto out_unlock;
+       if (m.num_ops != req->r_num_ops) {
+               pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
+                      req->r_num_ops, req->r_tid);
+               goto fail_request;
        }
-
-       dout("handle_reply tid %llu flags %d\n", tid, flags);
-
-       if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
-               __register_linger_request(osdc, req);
-
-       /* either this is a read, or we got the safe response */
-       if (result < 0 ||
-           (flags & CEPH_OSD_FLAG_ONDISK) ||
-           ((flags & CEPH_OSD_FLAG_WRITE) == 0))
+       for (i = 0; i < req->r_num_ops; i++) {
+               dout(" req %p tid %llu op %d rval %d len %u\n", req,
+                    req->r_tid, i, m.rval[i], m.outdata_len[i]);
+               req->r_ops[i].rval = m.rval[i];
+               req->r_ops[i].outdata_len = m.outdata_len[i];
+               data_len += m.outdata_len[i];
+       }
+       if (data_len != le32_to_cpu(msg->hdr.data_len)) {
+               pr_err("sum of lens %u != %u for tid %llu\n", data_len,
+                      le32_to_cpu(msg->hdr.data_len), req->r_tid);
+               goto fail_request;
+       }
+       dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
+            req, req->r_tid, req->r_got_reply, m.result, data_len);
+
+       already_acked = req->r_got_reply;
+       if (!already_acked) {
+               req->r_result = m.result ?: data_len;
+               req->r_replay_version = m.replay_version; /* struct */
+               req->r_got_reply = true;
+       } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
+               dout("req %p tid %llu dup ack\n", req, req->r_tid);
+               goto out_put;
+       }
+
+       if (done_request(req, &m)) {
                __unregister_request(osdc, req);
+               if (req->r_linger) {
+                       WARN_ON(req->r_unsafe_callback);
+                       __register_linger_request(osdc, req);
+               }
+       }
 
        mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
 
-       if (!already_completed) {
-               if (req->r_unsafe_callback &&
-                   result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
-                       req->r_unsafe_callback(req, true);
-               if (req->r_callback)
-                       req->r_callback(req);
-               else
-                       complete_all(&req->r_completion);
-       }
-
-       if (flags & CEPH_OSD_FLAG_ONDISK) {
-               if (req->r_unsafe_callback && already_completed)
+       if (done_request(req, &m)) {
+               if (already_acked && req->r_unsafe_callback) {
+                       dout("req %p tid %llu safe-cb\n", req, req->r_tid);
                        req->r_unsafe_callback(req, false);
-               complete_request(req);
+               } else {
+                       dout("req %p tid %llu cb\n", req, req->r_tid);
+                       __complete_request(req);
+               }
+       } else {
+               if (req->r_unsafe_callback) {
+                       dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
+                       req->r_unsafe_callback(req, true);
+               } else {
+                       WARN_ON(1);
+               }
        }
+       if (m.flags & CEPH_OSD_FLAG_ONDISK)
+               complete_all(&req->r_safe_completion);
 
-out:
-       dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
-out_unlock:
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
-       goto out;
 
-bad_put:
+fail_request:
        req->r_result = -EIO;
        __unregister_request(osdc, req);
-       if (req->r_callback)
-               req->r_callback(req);
-       else
-               complete_all(&req->r_completion);
-       complete_request(req);
+       __complete_request(req);
+       complete_all(&req->r_safe_completion);
+out_put:
        ceph_osdc_put_request(req);
-bad_mutex:
+out_unlock:
        mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
-bad:
-       pr_err("corrupt osd_op_reply got %d %d\n",
-              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
-       ceph_msg_dump(msg);
 }
 
 static void reset_changed_osds(struct ceph_osd_client *osdc)
@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
        if (rc < 0) {
                dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
                ceph_osdc_cancel_request(req);
-               complete_request(req);
+
+               /* kludge - need to to wake ceph_osdc_sync() */
+               complete_all(&req->r_safe_completion);
                return rc;
        }