Merge tag 'dm-4.8-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/device...
[cascardo/linux.git] / net / ceph / osd_client.c
index 8946959..b5ec096 100644 (file)
@@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
 static void target_destroy(struct ceph_osd_request_target *t)
 {
        ceph_oid_destroy(&t->base_oid);
+       ceph_oloc_destroy(&t->base_oloc);
        ceph_oid_destroy(&t->target_oid);
+       ceph_oloc_destroy(&t->target_oloc);
 }
 
 /*
@@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
+static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+{
+       return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
+}
+
 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
@@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        int msg_size;
 
        WARN_ON(ceph_oid_empty(&req->r_base_oid));
+       WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
        /* create request message */
        msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
        msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
-       msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+       msg_size += CEPH_ENCODING_START_BLK_LEN +
+                       ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
        msg_size += 1 + 8 + 4 + 4; /* pgid */
        msg_size += 4 + req->r_base_oid.name_len; /* oid */
        msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
@@ -932,7 +941,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
                osd_req_op_init(req, which, opcode, 0);
        } else {
-               u32 object_size = le32_to_cpu(layout->fl_object_size);
+               u32 object_size = layout->object_size;
                u32 object_base = off - objoff;
                if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
                        if (truncate_size <= object_base) {
@@ -948,7 +957,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        req->r_flags = flags;
-       req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
+       req->r_base_oloc.pool = layout->pool_id;
+       req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
        ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
        req->r_snapid = vino.snap;
@@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
        p += sizeof(req->r_replay_version);
 
        /* oloc */
-       ceph_encode_8(&p, 4);
-       ceph_encode_8(&p, 4);
-       ceph_encode_32(&p, 8 + 4 + 4);
+       ceph_start_encoding(&p, 5, 4,
+                           ceph_oloc_encoding_size(&req->r_t.target_oloc));
        ceph_encode_64(&p, req->r_t.target_oloc.pool);
        ceph_encode_32(&p, -1); /* preferred */
        ceph_encode_32(&p, 0); /* key len */
+       if (req->r_t.target_oloc.pool_ns)
+               ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
+                                  req->r_t.target_oloc.pool_ns->len);
+       else
+               ceph_encode_32(&p, 0);
 
        /* pgid */
        ceph_encode_8(&p, 1);
@@ -2594,9 +2608,22 @@ static int ceph_oloc_decode(void **p, void *end,
        }
 
        if (struct_v >= 5) {
+               bool changed = false;
+
                len = ceph_decode_32(p);
                if (len > 0) {
-                       pr_warn("ceph_object_locator::nspace is set\n");
+                       ceph_decode_need(p, end, len, e_inval);
+                       if (!oloc->pool_ns ||
+                           ceph_compare_string(oloc->pool_ns, *p, len))
+                               changed = true;
+                       *p += len;
+               } else {
+                       if (oloc->pool_ns)
+                               changed = true;
+               }
+               if (changed) {
+                       /* redirect changes namespace */
+                       pr_warn("ceph_object_locator::nspace is changed\n");
                        goto e_inval;
                }
        }
@@ -2806,7 +2833,9 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
                goto out_unlock_session;
        }
 
+       m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
        ret = decode_MOSDOpReply(msg, &m);
+       m.redirect.oloc.pool_ns = NULL;
        if (ret) {
                pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
                       req->r_tid, ret);
@@ -2835,7 +2864,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
                unlink_request(osd, req);
                mutex_unlock(&osd->lock);
 
-               ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
+               /*
+                * Not ceph_oloc_copy() - changing pool_ns is not
+                * supported.
+                */
+               req->r_t.target_oloc.pool = m.redirect.oloc.pool;
                req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
                req->r_tid = 0;
                __submit_request(req, false);