Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client
[cascardo/linux.git] / net / ceph / osd_client.c
index a97e7b5..d9bf7a1 100644 (file)
@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
                ceph_osd_data_release(&op->notify.request_data);
                ceph_osd_data_release(&op->notify.response_data);
                break;
+       case CEPH_OSD_OP_LIST_WATCHERS:
+               ceph_osd_data_release(&op->list_watchers.response_data);
+               break;
        default:
                break;
        }
@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
        case CEPH_OSD_OP_NOTIFY:
                dst->notify.cookie = cpu_to_le64(src->notify.cookie);
                break;
+       case CEPH_OSD_OP_LIST_WATCHERS:
+               break;
        case CEPH_OSD_OP_SETALLOCHINT:
                dst->alloc_hint.expected_object_size =
                    cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
                        ceph_osdc_msg_data_add(req->r_reply,
                                               &op->extent.osd_data);
                        break;
+               case CEPH_OSD_OP_LIST_WATCHERS:
+                       ceph_osdc_msg_data_add(req->r_reply,
+                                              &op->list_watchers.response_data);
+                       break;
 
                /* both */
                case CEPH_OSD_OP_CALL:
@@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
        return ret;
 }
 
+static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
+{
+       u8 struct_v;
+       u32 struct_len;
+       int ret;
+
+       ret = ceph_start_decoding(p, end, 2, "watch_item_t",
+                                 &struct_v, &struct_len);
+       if (ret)
+               return ret;
+
+       ceph_decode_copy(p, &item->name, sizeof(item->name));
+       item->cookie = ceph_decode_64(p);
+       *p += 4; /* skip timeout_seconds */
+       if (struct_v >= 2) {
+               ceph_decode_copy(p, &item->addr, sizeof(item->addr));
+               ceph_decode_addr(&item->addr);
+       }
+
+       dout("%s %s%llu cookie %llu addr %s\n", __func__,
+            ENTITY_NAME(item->name), item->cookie,
+            ceph_pr_addr(&item->addr.in_addr));
+       return 0;
+}
+
+static int decode_watchers(void **p, void *end,
+                          struct ceph_watch_item **watchers,
+                          u32 *num_watchers)
+{
+       u8 struct_v;
+       u32 struct_len;
+       int i;
+       int ret;
+
+       ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
+                                 &struct_v, &struct_len);
+       if (ret)
+               return ret;
+
+       *num_watchers = ceph_decode_32(p);
+       *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
+       if (!*watchers)
+               return -ENOMEM;
+
+       for (i = 0; i < *num_watchers; i++) {
+               ret = decode_watcher(p, end, *watchers + i);
+               if (ret) {
+                       kfree(*watchers);
+                       return ret;
+               }
+       }
+
+       return 0;
+}
+
+/*
+ * On success, the caller is responsible for:
+ *
+ *     kfree(watchers);
+ */
+int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
+                           struct ceph_object_id *oid,
+                           struct ceph_object_locator *oloc,
+                           struct ceph_watch_item **watchers,
+                           u32 *num_watchers)
+{
+       struct ceph_osd_request *req;
+       struct page **pages;
+       int ret;
+
+       req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+       if (!req)
+               return -ENOMEM;
+
+       ceph_oid_copy(&req->r_base_oid, oid);
+       ceph_oloc_copy(&req->r_base_oloc, oloc);
+       req->r_flags = CEPH_OSD_FLAG_READ;
+
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages)) {
+               ret = PTR_ERR(pages);
+               goto out_put_req;
+       }
+
+       osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
+       ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
+                                                response_data),
+                                pages, PAGE_SIZE, 0, false, true);
+
+       ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_wait_request(osdc, req);
+       if (ret >= 0) {
+               void *p = page_address(pages[0]);
+               void *const end = p + req->r_ops[0].outdata_len;
+
+               ret = decode_watchers(&p, end, watchers, num_watchers);
+       }
+
+out_put_req:
+       ceph_osdc_put_request(req);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_list_watchers);
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked
  */
 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
 {
+       dout("%s osdc %p\n", __func__, osdc);
        flush_workqueue(osdc->notify_wq);
 }
 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
@@ -3909,6 +4027,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
 }
 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
 
+/*
+ * Execute an OSD class method on an object.
+ *
+ * @flags: CEPH_OSD_FLAG_*
+ * @resp_len: out param for reply length
+ */
+int ceph_osdc_call(struct ceph_osd_client *osdc,
+                  struct ceph_object_id *oid,
+                  struct ceph_object_locator *oloc,
+                  const char *class, const char *method,
+                  unsigned int flags,
+                  struct page *req_page, size_t req_len,
+                  struct page *resp_page, size_t *resp_len)
+{
+       struct ceph_osd_request *req;
+       int ret;
+
+       req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+       if (!req)
+               return -ENOMEM;
+
+       ceph_oid_copy(&req->r_base_oid, oid);
+       ceph_oloc_copy(&req->r_base_oloc, oloc);
+       req->r_flags = flags;
+
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
+       osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+       if (req_page)
+               osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
+                                                 0, false, false);
+       if (resp_page)
+               osd_req_op_cls_response_data_pages(req, 0, &resp_page,
+                                                  PAGE_SIZE, 0, false, false);
+
+       ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_wait_request(osdc, req);
+       if (ret >= 0) {
+               ret = req->r_ops[0].rval;
+               if (resp_page)
+                       *resp_len = req->r_ops[0].outdata_len;
+       }
+
+out_put_req:
+       ceph_osdc_put_request(req);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_call);
+
 /*
  * init, shutdown
  */