+
#include <linux/ceph/ceph_debug.h>
#include <linux/module.h>
#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
+static struct kmem_cache *ceph_osd_request_cache;
+
static const struct ceph_connection_operations osd_con_ops;
static void __send_queued(struct ceph_osd_client *osdc);
return 0;
}
+static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
+{
+ memset(osd_data, 0, sizeof (*osd_data));
+ osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
+}
+
+static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
+ struct page **pages, u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+ osd_data->pages = pages;
+ osd_data->length = length;
+ osd_data->alignment = alignment;
+ osd_data->pages_from_pool = pages_from_pool;
+ osd_data->own_pages = own_pages;
+}
+
+static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
+ struct ceph_pagelist *pagelist)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
+ osd_data->pagelist = pagelist;
+}
+
+#ifdef CONFIG_BLOCK
+static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
+ struct bio *bio, size_t bio_length)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
+ osd_data->bio = bio;
+ osd_data->bio_length = bio_length;
+}
+#endif /* CONFIG_BLOCK */
+
+#define osd_req_op_data(oreq, whch, typ, fld) \
+ ({ \
+ BUG_ON(whch >= (oreq)->r_num_ops); \
+ &(oreq)->r_ops[whch].typ.fld; \
+ })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
+{
+ BUG_ON(which >= osd_req->r_num_ops);
+
+ return &osd_req->r_ops[which].raw_data_in;
+}
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, extent, osd_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+
+struct ceph_osd_data *
+osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, cls, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
+
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_raw_data_in(osd_req, which);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
+
+void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
+
+#ifdef CONFIG_BLOCK
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+ unsigned int which, struct bio *bio, size_t bio_length)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_bio_init(osd_data, bio, bio_length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
+#endif /* CONFIG_BLOCK */
+
+static void osd_req_op_cls_request_info_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
+
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
+
+void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
+
+static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
+{
+ switch (osd_data->type) {
+ case CEPH_OSD_DATA_TYPE_NONE:
+ return 0;
+ case CEPH_OSD_DATA_TYPE_PAGES:
+ return osd_data->length;
+ case CEPH_OSD_DATA_TYPE_PAGELIST:
+ return (u64)osd_data->pagelist->length;
+#ifdef CONFIG_BLOCK
+ case CEPH_OSD_DATA_TYPE_BIO:
+ return (u64)osd_data->bio_length;
+#endif /* CONFIG_BLOCK */
+ default:
+ WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
+ return 0;
+ }
+}
+
+static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
+{
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+ int num_pages;
+
+ num_pages = calc_pages_for((u64)osd_data->alignment,
+ (u64)osd_data->length);
+ ceph_release_page_vector(osd_data->pages, num_pages);
+ }
+ ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+
+ switch (op->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ ceph_osd_data_release(&op->extent.osd_data);
+ break;
+ case CEPH_OSD_OP_CALL:
+ ceph_osd_data_release(&op->cls.request_info);
+ ceph_osd_data_release(&op->cls.request_data);
+ ceph_osd_data_release(&op->cls.response_data);
+ break;
+ default:
+ break;
+ }
+}
+
/*
* requests
*/
void ceph_osdc_release_request(struct kref *kref)
{
- int num_pages;
- struct ceph_osd_request *req = container_of(kref,
- struct ceph_osd_request,
- r_kref);
+ struct ceph_osd_request *req;
+ unsigned int which;
+ req = container_of(kref, struct ceph_osd_request, r_kref);
if (req->r_request)
ceph_msg_put(req->r_request);
if (req->r_reply) {
ceph_msg_put(req->r_reply);
}
- if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
- req->r_data_in.own_pages) {
- num_pages = calc_pages_for((u64)req->r_data_in.alignment,
- (u64)req->r_data_in.length);
- ceph_release_page_vector(req->r_data_in.pages, num_pages);
- }
- if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
- req->r_data_out.own_pages) {
- num_pages = calc_pages_for((u64)req->r_data_out.alignment,
- (u64)req->r_data_out.length);
- ceph_release_page_vector(req->r_data_out.pages, num_pages);
- }
+ for (which = 0; which < req->r_num_ops; which++)
+ osd_req_op_data_release(req, which);
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
- kfree(req);
+ kmem_cache_free(ceph_osd_request_cache, req);
+
}
EXPORT_SYMBOL(ceph_osdc_release_request);
struct ceph_msg *msg;
size_t msg_size;
+ BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
+ BUG_ON(num_ops > CEPH_OSD_MAX_OP);
+
msg_size = 4 + 4 + 8 + 8 + 4+8;
msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pg_t */
req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), gfp_flags);
+ req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
}
if (req == NULL)
return NULL;
req->r_osdc = osdc;
req->r_mempool = use_mempool;
+ req->r_num_ops = num_ops;
kref_init(&req->r_kref);
init_completion(&req->r_completion);
}
req->r_reply = msg;
- req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
- req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
-
/* create request message; allow space for oid */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
* other information associated with them. It also serves as a
* common init routine for all the other init functions, below.
*/
-void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
+static struct ceph_osd_req_op *
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode)
{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
BUG_ON(!osd_req_opcode_valid(opcode));
+ op = &osd_req->r_ops[which];
memset(op, 0, sizeof (*op));
-
op->op = opcode;
+
+ return op;
+}
+
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode)
+{
+ (void)_osd_req_op_init(osd_req, which, opcode);
}
+EXPORT_SYMBOL(osd_req_op_init);
-void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
+void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
- osd_req_op_init(op, opcode);
-
op->extent.offset = offset;
op->extent.length = length;
op->extent.truncate_size = truncate_size;
}
EXPORT_SYMBOL(osd_req_op_extent_init);
-void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length)
+void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
+ unsigned int which, u64 length)
{
- u64 previous = op->extent.length;
+ struct ceph_osd_req_op *op;
+ u64 previous;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+ previous = op->extent.length;
if (length == previous)
return; /* Nothing to do */
}
EXPORT_SYMBOL(osd_req_op_extent_update);
-void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
- const char *class, const char *method,
- const void *request_data, size_t request_data_size)
+void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode, const char *class, const char *method)
{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_pagelist *pagelist;
size_t payload_len = 0;
size_t size;
BUG_ON(opcode != CEPH_OSD_OP_CALL);
- osd_req_op_init(op, opcode);
+ pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+ BUG_ON(!pagelist);
+ ceph_pagelist_init(pagelist);
op->cls.class_name = class;
size = strlen(class);
BUG_ON(size > (size_t) U8_MAX);
op->cls.class_len = size;
+ ceph_pagelist_append(pagelist, class, size);
payload_len += size;
op->cls.method_name = method;
size = strlen(method);
BUG_ON(size > (size_t) U8_MAX);
op->cls.method_len = size;
+ ceph_pagelist_append(pagelist, method, size);
payload_len += size;
- op->cls.indata = request_data;
- BUG_ON(request_data_size > (size_t) U32_MAX);
- op->cls.indata_len = (u32) request_data_size;
- payload_len += request_data_size;
+ osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->cls.argc = 0; /* currently unused */
}
EXPORT_SYMBOL(osd_req_op_cls_init);
-void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
{
- BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
- osd_req_op_init(op, opcode);
+ BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
op->watch.cookie = cookie;
- /* op->watch.ver = version; */ /* XXX 3847 */
- op->watch.ver = cpu_to_le64(version);
+ op->watch.ver = version;
if (opcode == CEPH_OSD_OP_WATCH && flag)
- op->watch.flag = (u8) 1;
+ op->watch.flag = (u8)1;
}
EXPORT_SYMBOL(osd_req_op_watch_init);
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+ struct ceph_osd_data *osd_data)
+{
+ u64 length = ceph_osd_data_length(osd_data);
+
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ BUG_ON(length > (u64) SIZE_MAX);
+ if (length)
+ ceph_msg_data_add_pages(msg, osd_data->pages,
+ length, osd_data->alignment);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+ BUG_ON(!length);
+ ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+ ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+ } else {
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ }
+}
+
static u64 osd_req_encode_op(struct ceph_osd_request *req,
- struct ceph_osd_op *dst,
- struct ceph_osd_req_op *src)
+ struct ceph_osd_op *dst, unsigned int which)
{
- u64 out_data_len = 0;
- struct ceph_pagelist *pagelist;
+ struct ceph_osd_req_op *src;
+ struct ceph_osd_data *osd_data;
+ u64 request_data_len = 0;
+ u64 data_length;
+ BUG_ON(which >= req->r_num_ops);
+ src = &req->r_ops[which];
if (WARN_ON(!osd_req_opcode_valid(src->op))) {
pr_err("unrecognized osd opcode %d\n", src->op);
switch (src->op) {
case CEPH_OSD_OP_STAT:
+ osd_data = &src->raw_data_in;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
if (src->op == CEPH_OSD_OP_WRITE)
- out_data_len = src->extent.length;
+ request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
dst->extent.length = cpu_to_le64(src->extent.length);
dst->extent.truncate_size =
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
+ osd_data = &src->extent.osd_data;
+ if (src->op == CEPH_OSD_OP_WRITE)
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ else
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
- pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
- BUG_ON(!pagelist);
- ceph_pagelist_init(pagelist);
-
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
- dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
- ceph_pagelist_append(pagelist, src->cls.class_name,
- src->cls.class_len);
- ceph_pagelist_append(pagelist, src->cls.method_name,
- src->cls.method_len);
- ceph_pagelist_append(pagelist, src->cls.indata,
- src->cls.indata_len);
-
- req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST;
- req->r_data_out.pagelist = pagelist;
- out_data_len = pagelist->length;
+ osd_data = &src->cls.request_info;
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+ request_data_len = osd_data->pagelist->length;
+
+ osd_data = &src->cls.request_data;
+ data_length = ceph_osd_data_length(osd_data);
+ if (data_length) {
+ BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+ dst->cls.indata_len = cpu_to_le32(data_length);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ src->payload_len += data_length;
+ request_data_len += data_length;
+ }
+ osd_data = &src->cls.response_data;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
dst->op = cpu_to_le16(src->op);
dst->payload_len = cpu_to_le32(src->payload_len);
- return out_data_len;
+ return request_data_len;
}
-/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, unsigned int num_ops,
- struct ceph_osd_req_op *src_ops,
- struct ceph_snap_context *snapc, u64 snap_id,
- struct timespec *mtime)
-{
- struct ceph_msg *msg = req->r_request;
- struct ceph_osd_req_op *src_op;
- void *p;
- size_t msg_size;
- int flags = req->r_flags;
- u64 data_len;
- int i;
-
- req->r_num_ops = num_ops;
- req->r_snapid = snap_id;
- req->r_snapc = ceph_get_snap_context(snapc);
-
- /* encode request */
- msg->hdr.version = cpu_to_le16(4);
-
- p = msg->front.iov_base;
- ceph_encode_32(&p, 1); /* client_inc is always 1 */
- req->r_request_osdmap_epoch = p;
- p += 4;
- req->r_request_flags = p;
- p += 4;
- if (req->r_flags & CEPH_OSD_FLAG_WRITE)
- ceph_encode_timespec(p, mtime);
- p += sizeof(struct ceph_timespec);
- req->r_request_reassert_version = p;
- p += sizeof(struct ceph_eversion); /* will get filled in */
-
- /* oloc */
- ceph_encode_8(&p, 4);
- ceph_encode_8(&p, 4);
- ceph_encode_32(&p, 8 + 4 + 4);
- req->r_request_pool = p;
- p += 8;
- ceph_encode_32(&p, -1); /* preferred */
- ceph_encode_32(&p, 0); /* key len */
-
- ceph_encode_8(&p, 1);
- req->r_request_pgid = p;
- p += 8 + 4;
- ceph_encode_32(&p, -1); /* preferred */
-
- /* oid */
- ceph_encode_32(&p, req->r_oid_len);
- memcpy(p, req->r_oid, req->r_oid_len);
- dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
- p += req->r_oid_len;
-
- /* ops--can imply data */
- ceph_encode_16(&p, num_ops);
- src_op = src_ops;
- req->r_request_ops = p;
- data_len = 0;
- for (i = 0; i < num_ops; i++, src_op++) {
- data_len += osd_req_encode_op(req, p, src_op);
- p += sizeof(struct ceph_osd_op);
- }
-
- /* snaps */
- ceph_encode_64(&p, req->r_snapid);
- ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
- ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
- if (req->r_snapc) {
- for (i = 0; i < snapc->num_snaps; i++) {
- ceph_encode_64(&p, req->r_snapc->snaps[i]);
- }
- }
-
- req->r_request_attempts = p;
- p += 4;
-
- /* data */
- if (flags & CEPH_OSD_FLAG_WRITE) {
- u16 data_off;
-
- /*
- * The header "data_off" is a hint to the receiver
- * allowing it to align received data into its
- * buffers such that there's no need to re-copy
- * it before writing it to disk (direct I/O).
- */
- data_off = (u16) (off & 0xffff);
- req->r_request->hdr.data_off = cpu_to_le16(data_off);
- }
- req->r_request->hdr.data_len = cpu_to_le32(data_len);
-
- BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
- msg_size = p - msg->front.iov_base;
- msg->front.iov_len = msg_size;
- msg->hdr.front_len = cpu_to_le32(msg_size);
-
- dout("build_request msg_size was %d\n", (int)msg_size);
-}
-EXPORT_SYMBOL(ceph_osdc_build_request);
-
/*
* build new request AND message, calculate layout, and adjust file
* extent as needed.
struct ceph_file_layout *layout,
struct ceph_vino vino,
u64 off, u64 *plen, int num_ops,
- struct ceph_osd_req_op *ops,
int opcode, int flags,
struct ceph_snap_context *snapc,
u32 truncate_seq,
GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
+
req->r_flags = flags;
/* calculate max write size */
truncate_size = object_size;
}
- osd_req_op_extent_init(&ops[0], opcode, objoff, objlen,
+ osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
truncate_size, truncate_seq);
+
/*
* A second op in the ops array means the caller wants to
* also issue a include a 'startsync' command so that the
* osd will flush data quickly.
*/
if (num_ops > 1)
- osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC);
+ osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
req->r_file_layout = *layout; /* keep a copy */
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+ /* Mark the request unsafe if this is the first timet's being sent. */
+
+ if (!req->r_sent && req->r_unsafe_callback)
+ req->r_unsafe_callback(req, true);
req->r_sent = req->r_osd->o_incarnation;
+
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
}
/*
static void complete_request(struct ceph_osd_request *req)
{
- if (req->r_safe_callback)
- req->r_safe_callback(req, NULL);
+ if (req->r_unsafe_callback)
+ req->r_unsafe_callback(req, false);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
struct ceph_osd_request *req;
u64 tid;
int object_len;
- int numops, payload_len, flags;
+ unsigned int numops;
+ int payload_len, flags;
s32 result;
s32 retry_attempt;
struct ceph_pg pg;
u64 reassert_version;
u32 osdmap_epoch;
int already_completed;
- int i;
+ u32 bytes;
+ unsigned int i;
tid = le64_to_cpu(msg->hdr.tid);
dout("handle_reply %p tid %llu\n", msg, tid);
payload_len += len;
p += sizeof(*op);
}
- if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+ bytes = le32_to_cpu(msg->hdr.data_len);
+ if (payload_len != bytes) {
pr_warning("sum of op payload lens %d != data_len %d",
- payload_len, le32_to_cpu(msg->hdr.data_len));
+ payload_len, bytes);
goto bad_put;
}
req->r_reply_op_result[i] = ceph_decode_32(&p);
if (!req->r_got_reply) {
- unsigned int bytes;
req->r_result = result;
- bytes = le32_to_cpu(msg->hdr.data_len);
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);
if (req->r_result == 0)
return;
}
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
- struct ceph_osd_data *osd_data)
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
+ struct ceph_snap_context *snapc, u64 snap_id,
+ struct timespec *mtime)
{
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- BUG_ON(osd_data->length > (u64) SIZE_MAX);
- if (osd_data->length)
- ceph_msg_data_set_pages(msg, osd_data->pages,
- osd_data->length, osd_data->alignment);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
- BUG_ON(!osd_data->pagelist->length);
- ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
-#endif
- } else {
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ struct ceph_msg *msg = req->r_request;
+ void *p;
+ size_t msg_size;
+ int flags = req->r_flags;
+ u64 data_len;
+ unsigned int i;
+
+ req->r_snapid = snap_id;
+ req->r_snapc = ceph_get_snap_context(snapc);
+
+ /* encode request */
+ msg->hdr.version = cpu_to_le16(4);
+
+ p = msg->front.iov_base;
+ ceph_encode_32(&p, 1); /* client_inc is always 1 */
+ req->r_request_osdmap_epoch = p;
+ p += 4;
+ req->r_request_flags = p;
+ p += 4;
+ if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ ceph_encode_timespec(p, mtime);
+ p += sizeof(struct ceph_timespec);
+ req->r_request_reassert_version = p;
+ p += sizeof(struct ceph_eversion); /* will get filled in */
+
+ /* oloc */
+ ceph_encode_8(&p, 4);
+ ceph_encode_8(&p, 4);
+ ceph_encode_32(&p, 8 + 4 + 4);
+ req->r_request_pool = p;
+ p += 8;
+ ceph_encode_32(&p, -1); /* preferred */
+ ceph_encode_32(&p, 0); /* key len */
+
+ ceph_encode_8(&p, 1);
+ req->r_request_pgid = p;
+ p += 8 + 4;
+ ceph_encode_32(&p, -1); /* preferred */
+
+ /* oid */
+ ceph_encode_32(&p, req->r_oid_len);
+ memcpy(p, req->r_oid, req->r_oid_len);
+ dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+ p += req->r_oid_len;
+
+ /* ops--can imply data */
+ ceph_encode_16(&p, (u16)req->r_num_ops);
+ data_len = 0;
+ for (i = 0; i < req->r_num_ops; i++) {
+ data_len += osd_req_encode_op(req, p, i);
+ p += sizeof(struct ceph_osd_op);
}
+
+ /* snaps */
+ ceph_encode_64(&p, req->r_snapid);
+ ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+ ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+ if (req->r_snapc) {
+ for (i = 0; i < snapc->num_snaps; i++) {
+ ceph_encode_64(&p, req->r_snapc->snaps[i]);
+ }
+ }
+
+ req->r_request_attempts = p;
+ p += 4;
+
+ /* data */
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ u16 data_off;
+
+ /*
+ * The header "data_off" is a hint to the receiver
+ * allowing it to align received data into its
+ * buffers such that there's no need to re-copy
+ * it before writing it to disk (direct I/O).
+ */
+ data_off = (u16) (off & 0xffff);
+ req->r_request->hdr.data_off = cpu_to_le16(data_off);
+ }
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
+
+ BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+ msg_size = p - msg->front.iov_base;
+ msg->front.iov_len = msg_size;
+ msg->hdr.front_len = cpu_to_le32(msg_size);
+
+ dout("build_request msg_size was %d\n", (int)msg_size);
}
+EXPORT_SYMBOL(ceph_osdc_build_request);
/*
* Register request, send initial attempt.
{
int rc = 0;
- /* Set up response incoming data and request outgoing data fields */
-
- ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
- ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
-
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__register_request(osdc, req);
struct page **pages, int num_pages, int page_align)
{
struct ceph_osd_request *req;
- struct ceph_osd_data *osd_data;
- struct ceph_osd_req_op op;
int rc = 0;
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, truncate_seq, truncate_size,
false);
/* it may be a short read due to an object boundary */
- osd_data = &req->r_data_in;
- osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
- osd_data->pages = pages;
- osd_data->length = *plen;
- osd_data->alignment = page_align;
+ osd_req_op_extent_osd_data_pages(req, 0,
+ pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
- off, *plen, osd_data->length, page_align);
+ off, *plen, *plen, page_align);
- ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL);
+ ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
struct page **pages, int num_pages)
{
struct ceph_osd_request *req;
- struct ceph_osd_data *osd_data;
- struct ceph_osd_req_op op;
int rc = 0;
int page_align = off & ~PAGE_MASK;
BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
snapc, truncate_seq, truncate_size,
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
- osd_data = &req->r_data_out;
- osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
- osd_data->pages = pages;
- osd_data->length = len;
- osd_data->alignment = page_align;
- dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length);
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+ false, false);
+ dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
- ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime);
+ ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
}
EXPORT_SYMBOL(ceph_osdc_writepages);
+int ceph_osdc_setup(void)
+{
+ BUG_ON(ceph_osd_request_cache);
+ ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+ sizeof (struct ceph_osd_request),
+ __alignof__(struct ceph_osd_request),
+ 0, NULL);
+
+ return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+ BUG_ON(!ceph_osd_request_cache);
+ kmem_cache_destroy(ceph_osd_request_cache);
+ ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
/*
* handle incoming message
*/
m = ceph_msg_get(req->r_reply);
if (data_len > 0) {
- struct ceph_osd_data *osd_data = &req->r_data_in;
+ struct ceph_osd_data *osd_data;
+ /*
+ * XXX This is assuming there is only one op containing
+ * XXX page data. Probably OK for reads, but this
+ * XXX ought to be done more generally.
+ */
+ osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
if (osd_data->pages &&
unlikely(osd_data->length < data_len)) {