libceph: support for sending notifies
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:27 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:28 +0000 (01:15 +0200)
Implement ceph_osdc_notify() for sending notifies.

Due to the fact that the current messenger can't do read-in into
pagelists (it can only do write-out from them), I had to go with a page
vector for a NOTIFY_COMPLETE payload, for now.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/osd_client.h
include/linux/ceph/rados.h
net/ceph/debugfs.c
net/ceph/osd_client.c

index cd2dcb8..63054fa 100644 (file)
@@ -114,6 +114,11 @@ struct ceph_osd_req_op {
                struct {
                        struct ceph_osd_data request_data;
                } notify_ack;
+               struct {
+                       u64 cookie;
+                       struct ceph_osd_data request_data;
+                       struct ceph_osd_data response_data;
+               } notify;
                struct {
                        u64 expected_object_size;
                        u64 expected_write_size;
@@ -202,6 +207,7 @@ struct ceph_osd_linger_request {
        struct ceph_osd_client *osdc;
        u64 linger_id;
        bool committed;
+       bool is_watch;                  /* watch or notify */
 
        struct ceph_osd *osd;
        struct ceph_osd_request *reg_req;
@@ -220,14 +226,20 @@ struct ceph_osd_linger_request {
        struct list_head scan_item;
 
        struct completion reg_commit_wait;
+       struct completion notify_finish_wait;
        int reg_commit_error;
+       int notify_finish_error;
        int last_error;
 
        u32 register_gen;
+       u64 notify_id;
 
        rados_watchcb2_t wcb;
        rados_watcherrcb_t errcb;
        void *data;
+
+       struct page ***preply_pages;
+       size_t *preply_len;
 };
 
 struct ceph_osd_client {
@@ -397,5 +409,13 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
                         u64 cookie,
                         void *payload,
                         size_t payload_len);
+int ceph_osdc_notify(struct ceph_osd_client *osdc,
+                    struct ceph_object_id *oid,
+                    struct ceph_object_locator *oloc,
+                    void *payload,
+                    size_t payload_len,
+                    u32 timeout,
+                    struct page ***preply_pages,
+                    size_t *preply_len);
 #endif
 
index 204c8c9..5c0da61 100644 (file)
@@ -476,6 +476,9 @@ struct ceph_osd_op {
                        __u8 op;        /* CEPH_OSD_WATCH_OP_* */
                        __le32 gen;     /* registration generation */
                } __attribute__ ((packed)) watch;
+               struct {
+                       __le64 cookie;
+               } __attribute__ ((packed)) notify;
                struct {
                        __le64 offset, length;
                        __le64 src_offset;
index e64cb85..39f91c7 100644 (file)
@@ -206,8 +206,9 @@ static void dump_linger_request(struct seq_file *s,
        seq_printf(s, "%llu\t", lreq->linger_id);
        dump_target(s, &lreq->t);
 
-       seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen,
-                  lreq->committed ? "C" : "", lreq->last_error);
+       seq_printf(s, "\t%u\t%s%s/%d\n", lreq->register_gen,
+                  lreq->is_watch ? "W" : "N", lreq->committed ? "C" : "",
+                  lreq->last_error);
 }
 
 static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
index ca0a7b5..e6e3ab4 100644 (file)
@@ -334,6 +334,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        case CEPH_OSD_OP_NOTIFY_ACK:
                ceph_osd_data_release(&op->notify_ack.request_data);
                break;
+       case CEPH_OSD_OP_NOTIFY:
+               ceph_osd_data_release(&op->notify.request_data);
+               ceph_osd_data_release(&op->notify.response_data);
+               break;
        default:
                break;
        }
@@ -845,6 +849,9 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
                break;
        case CEPH_OSD_OP_NOTIFY_ACK:
                break;
+       case CEPH_OSD_OP_NOTIFY:
+               dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+               break;
        case CEPH_OSD_OP_SETALLOCHINT:
                dst->alloc_hint.expected_object_size =
                    cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1439,6 +1446,12 @@ static void setup_request_data(struct ceph_osd_request *req,
                        ceph_osdc_msg_data_add(req->r_reply,
                                               &op->cls.response_data);
                        break;
+               case CEPH_OSD_OP_NOTIFY:
+                       ceph_osdc_msg_data_add(msg,
+                                              &op->notify.request_data);
+                       ceph_osdc_msg_data_add(req->r_reply,
+                                              &op->notify.response_data);
+                       break;
                }
 
                data_len += op->indata_len;
@@ -1771,6 +1784,7 @@ linger_alloc(struct ceph_osd_client *osdc)
        RB_CLEAR_NODE(&lreq->osdc_node);
        INIT_LIST_HEAD(&lreq->scan_item);
        init_completion(&lreq->reg_commit_wait);
+       init_completion(&lreq->notify_finish_wait);
 
        lreq->osdc = osdc;
        target_init(&lreq->t);
@@ -1934,6 +1948,7 @@ static void do_watch_notify(struct work_struct *w)
                goto out;
        }
 
+       WARN_ON(!lreq->is_watch);
        dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
             __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
             lwork->notify.payload_len);
@@ -1997,6 +2012,24 @@ static void linger_commit_cb(struct ceph_osd_request *req)
        linger_reg_commit_complete(lreq, req->r_result);
        lreq->committed = true;
 
+       if (!lreq->is_watch) {
+               struct ceph_osd_data *osd_data =
+                   osd_req_op_data(req, 0, notify, response_data);
+               void *p = page_address(osd_data->pages[0]);
+
+               WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
+                       osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+
+               /* make note of the notify_id */
+               if (req->r_ops[0].outdata_len >= sizeof(u64)) {
+                       lreq->notify_id = ceph_decode_64(&p);
+                       dout("lreq %p notify_id %llu\n", lreq,
+                            lreq->notify_id);
+               } else {
+                       dout("lreq %p no notify_id\n", lreq);
+               }
+       }
+
        mutex_unlock(&lreq->lock);
        linger_put(lreq);
 }
@@ -2050,7 +2083,7 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
        req->r_mtime = lreq->mtime;
 
        mutex_lock(&lreq->lock);
-       if (lreq->committed) {
+       if (lreq->is_watch && lreq->committed) {
                WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
                        op->watch.cookie != lreq->linger_id);
                op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
@@ -2059,7 +2092,10 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
                     op->watch.gen);
                req->r_callback = linger_reconnect_cb;
        } else {
-               WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
+               if (!lreq->is_watch)
+                       lreq->notify_id = 0;
+               else
+                       WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
                dout("lreq %p register\n", lreq);
                req->r_callback = linger_commit_cb;
        }
@@ -2147,7 +2183,7 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
  */
 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
 {
-       if (lreq->ping_req->r_osd)
+       if (lreq->is_watch && lreq->ping_req->r_osd)
                cancel_linger_request(lreq->ping_req);
        if (lreq->reg_req->r_osd)
                cancel_linger_request(lreq->reg_req);
@@ -2174,6 +2210,15 @@ static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
        return ret ?: lreq->reg_commit_error;
 }
 
+static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
+{
+       int ret;
+
+       dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
+       ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
+       return ret ?: lreq->notify_finish_error;
+}
+
 /*
  * Timeout callback, called every N seconds.  When 1 or more OSD
  * requests has been active for more than N seconds, we send a keepalive
@@ -2220,7 +2265,7 @@ static void handle_timeout(struct work_struct *work)
                        found = true;
 
                        mutex_lock(&lreq->lock);
-                       if (lreq->committed && !lreq->last_error)
+                       if (lreq->is_watch && lreq->committed && !lreq->last_error)
                                send_linger_ping(lreq);
                        mutex_unlock(&lreq->lock);
                }
@@ -3032,6 +3077,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
        u8 proto_ver, opcode;
        u64 cookie, notify_id;
        u64 notifier_id = 0;
+       s32 return_code = 0;
        void *payload = NULL;
        u32 payload_len = 0;
 
@@ -3049,7 +3095,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
        }
 
        if (le16_to_cpu(msg->hdr.version) >= 2)
-               p += 4; /* skip return_code */
+               ceph_decode_32_safe(&p, end, return_code, bad);
 
        if (le16_to_cpu(msg->hdr.version) >= 3)
                ceph_decode_64_safe(&p, end, notifier_id, bad);
@@ -3063,13 +3109,38 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
        }
 
        mutex_lock(&lreq->lock);
-       dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
-            lreq);
+       dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
+            opcode, cookie, lreq, lreq->is_watch);
        if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
                if (!lreq->last_error) {
                        lreq->last_error = -ENOTCONN;
                        queue_watch_error(lreq);
                }
+       } else if (!lreq->is_watch) {
+               /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
+               if (lreq->notify_id && lreq->notify_id != notify_id) {
+                       dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
+                            lreq->notify_id, notify_id);
+               } else if (!completion_done(&lreq->notify_finish_wait)) {
+                       struct ceph_msg_data *data =
+                           list_first_entry_or_null(&msg->data,
+                                                    struct ceph_msg_data,
+                                                    links);
+
+                       if (data) {
+                               if (lreq->preply_pages) {
+                                       WARN_ON(data->type !=
+                                                       CEPH_MSG_DATA_PAGES);
+                                       *lreq->preply_pages = data->pages;
+                                       *lreq->preply_len = data->length;
+                               } else {
+                                       ceph_release_page_vector(data->pages,
+                                              calc_pages_for(0, data->length));
+                               }
+                       }
+                       lreq->notify_finish_error = return_code;
+                       complete_all(&lreq->notify_finish_wait);
+               }
        } else {
                /* CEPH_WATCH_EVENT_NOTIFY */
                lwork = lwork_alloc(lreq, do_watch_notify);
@@ -3241,6 +3312,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
        if (!lreq)
                return ERR_PTR(-ENOMEM);
 
+       lreq->is_watch = true;
        lreq->wcb = wcb;
        lreq->errcb = errcb;
        lreq->data = data;
@@ -3395,6 +3467,116 @@ out_put_req:
 }
 EXPORT_SYMBOL(ceph_osdc_notify_ack);
 
+static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
+                                 u64 cookie, u32 prot_ver, u32 timeout,
+                                 void *payload, size_t payload_len)
+{
+       struct ceph_osd_req_op *op;
+       struct ceph_pagelist *pl;
+       int ret;
+
+       op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
+       op->notify.cookie = cookie;
+
+       pl = kmalloc(sizeof(*pl), GFP_NOIO);
+       if (!pl)
+               return -ENOMEM;
+
+       ceph_pagelist_init(pl);
+       ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
+       ret |= ceph_pagelist_encode_32(pl, timeout);
+       ret |= ceph_pagelist_encode_32(pl, payload_len);
+       ret |= ceph_pagelist_append(pl, payload, payload_len);
+       if (ret) {
+               ceph_pagelist_release(pl);
+               return -ENOMEM;
+       }
+
+       ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
+       op->indata_len = pl->length;
+       return 0;
+}
+
+/*
+ * @timeout: in seconds
+ *
+ * @preply_{pages,len} are initialized both on success and error.
+ * The caller is responsible for:
+ *
+ *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
+ */
+int ceph_osdc_notify(struct ceph_osd_client *osdc,
+                    struct ceph_object_id *oid,
+                    struct ceph_object_locator *oloc,
+                    void *payload,
+                    size_t payload_len,
+                    u32 timeout,
+                    struct page ***preply_pages,
+                    size_t *preply_len)
+{
+       struct ceph_osd_linger_request *lreq;
+       struct page **pages;
+       int ret;
+
+       WARN_ON(!timeout);
+       if (preply_pages) {
+               *preply_pages = NULL;
+               *preply_len = 0;
+       }
+
+       lreq = linger_alloc(osdc);
+       if (!lreq)
+               return -ENOMEM;
+
+       lreq->preply_pages = preply_pages;
+       lreq->preply_len = preply_len;
+
+       ceph_oid_copy(&lreq->t.base_oid, oid);
+       ceph_oloc_copy(&lreq->t.base_oloc, oloc);
+       lreq->t.flags = CEPH_OSD_FLAG_READ;
+
+       lreq->reg_req = alloc_linger_request(lreq);
+       if (!lreq->reg_req) {
+               ret = -ENOMEM;
+               goto out_put_lreq;
+       }
+
+       /* for notify_id */
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages)) {
+               ret = PTR_ERR(pages);
+               goto out_put_lreq;
+       }
+
+       down_write(&osdc->lock);
+       linger_register(lreq); /* before osd_req_op_* */
+       ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
+                                    timeout, payload, payload_len);
+       if (ret) {
+               linger_unregister(lreq);
+               up_write(&osdc->lock);
+               ceph_release_page_vector(pages, 1);
+               goto out_put_lreq;
+       }
+       ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
+                                                response_data),
+                                pages, PAGE_SIZE, 0, false, true);
+       linger_submit(lreq);
+       up_write(&osdc->lock);
+
+       ret = linger_reg_commit_wait(lreq);
+       if (!ret)
+               ret = linger_notify_finish_wait(lreq);
+       else
+               dout("lreq %p failed to initiate notify %d\n", lreq, ret);
+
+       linger_cancel(lreq);
+out_put_lreq:
+       linger_put(lreq);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_notify);
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked
@@ -3693,19 +3875,51 @@ out_unlock_osdc:
        return m;
 }
 
+/*
+ * TODO: switch to a msg-owned pagelist
+ */
+static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
+{
+       struct ceph_msg *m;
+       int type = le16_to_cpu(hdr->type);
+       u32 front_len = le32_to_cpu(hdr->front_len);
+       u32 data_len = le32_to_cpu(hdr->data_len);
+
+       m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+       if (!m)
+               return NULL;
+
+       if (data_len) {
+               struct page **pages;
+               struct ceph_osd_data osd_data;
+
+               pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
+                                              GFP_NOIO);
+               if (!pages) {
+                       ceph_msg_put(m);
+                       return NULL;
+               }
+
+               ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
+                                        false);
+               ceph_osdc_msg_data_add(m, &osd_data);
+       }
+
+       return m;
+}
+
 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
                                  struct ceph_msg_header *hdr,
                                  int *skip)
 {
        struct ceph_osd *osd = con->private;
        int type = le16_to_cpu(hdr->type);
-       int front = le32_to_cpu(hdr->front_len);
 
        *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY:
-               return ceph_msg_new(type, front, GFP_NOFS, false);
+               return alloc_msg_with_page_vector(hdr);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);
        default: