3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These four fields never change for a given rbd image */
110 /* The remaining fields need to be updated occasionally */
112 struct ceph_snap_context *snapc;
121 * An rbd image specification.
123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
124 * identify an image. Each rbd_dev structure includes a pointer to
125 * an rbd_spec structure that encapsulates this identity.
127 * Each of the id's in an rbd_spec has an associated name. For a
128 * user-mapped image, the names are supplied and the id's associated
129 * with them are looked up. For a layered image, a parent image is
130 * defined by the tuple, and the names are looked up.
132 * An rbd_dev structure contains a parent_spec pointer which is
133 * non-null if the image it represents is a child in a layered
134 * image. This pointer will refer to the rbd_spec structure used
135 * by the parent rbd_dev for its own identity (i.e., the structure
136 * is shared between the parent and child).
138 * Since these structures are populated once, during the discovery
139 * phase of image construction, they are effectively immutable so
140 * we make no effort to synchronize access to them.
142 * Note that code herein does not assume the image name is known (it
143 * could be a null pointer).
147 const char *pool_name;
149 const char *image_id;
150 const char *image_name;
153 const char *snap_name;
159 * an instance of the client. multiple devices may share an rbd client.
162 struct ceph_client *client;
164 struct list_head node;
167 struct rbd_img_request;
168 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
170 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
172 struct rbd_obj_request;
173 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
175 enum obj_request_type {
176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
186 struct rbd_obj_request {
187 const char *object_name;
188 u64 offset; /* object start byte */
189 u64 length; /* bytes from offset */
193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
209 struct rbd_obj_request *obj_request; /* STAT op */
211 struct rbd_img_request *img_request;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
217 u32 which; /* posn image request list */
219 enum obj_request_type type;
221 struct bio *bio_list;
227 struct page **copyup_pages;
229 struct ceph_osd_request *osd_req;
231 u64 xferred; /* bytes transferred */
234 rbd_obj_callback_t callback;
235 struct completion completion;
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
246 struct rbd_img_request {
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
252 u64 snap_id; /* for reads */
253 struct ceph_snap_context *snapc; /* for writes */
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
259 struct page **copyup_pages;
260 spinlock_t completion_lock;/* protects next_completion */
262 rbd_img_callback_t callback;
263 u64 xferred;/* aggregate bytes transferred */
264 int result; /* first nonzero obj_request result */
266 u32 obj_request_count;
267 struct list_head obj_requests; /* rbd_obj_request structs */
272 #define for_each_obj_request(ireq, oreq) \
273 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_from(ireq, oreq) \
275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
276 #define for_each_obj_request_safe(ireq, oreq, n) \
277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
289 int dev_id; /* blkdev unique id */
291 int major; /* blkdev assigned major */
292 struct gendisk *disk; /* blkdev's gendisk and rq */
294 u32 image_format; /* Either 1 or 2 */
295 struct rbd_client *rbd_client;
297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
299 spinlock_t lock; /* queue, flags, open_count */
301 struct rbd_image_header header;
302 unsigned long flags; /* possibly lock protected */
303 struct rbd_spec *spec;
307 struct ceph_file_layout layout;
309 struct ceph_osd_event *watch_event;
310 struct rbd_obj_request *watch_request;
312 struct rbd_spec *parent_spec;
314 struct rbd_device *parent;
316 /* protects updating the header */
317 struct rw_semaphore header_rwsem;
319 struct rbd_mapping mapping;
321 struct list_head node;
325 unsigned long open_count; /* protected by lock */
329 * Flag bits for rbd_dev->flags. If atomicity is required,
330 * rbd_dev->lock is used to protect access.
332 * Currently, only the "removing" flag (which is coupled with the
333 * "open_count" field) requires atomic access.
336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
340 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
342 static LIST_HEAD(rbd_dev_list); /* devices */
343 static DEFINE_SPINLOCK(rbd_dev_list_lock);
345 static LIST_HEAD(rbd_client_list); /* clients */
346 static DEFINE_SPINLOCK(rbd_client_list_lock);
348 /* Slab caches for frequently-allocated structures */
350 static struct kmem_cache *rbd_img_request_cache;
351 static struct kmem_cache *rbd_obj_request_cache;
352 static struct kmem_cache *rbd_segment_name_cache;
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356 static void rbd_dev_device_release(struct device *dev);
358 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
360 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
362 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
364 static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add),
366 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
370 static struct bus_type rbd_bus_type = {
372 .bus_attrs = rbd_bus_attrs,
375 static void rbd_root_dev_release(struct device *dev)
379 static struct device rbd_root_dev = {
381 .release = rbd_root_dev_release,
384 static __printf(2, 3)
385 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
387 struct va_format vaf;
395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 else if (rbd_dev->disk)
397 printk(KERN_WARNING "%s: %s: %pV\n",
398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 printk(KERN_WARNING "%s: image %s: %pV\n",
401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 printk(KERN_WARNING "%s: id %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 RBD_DRV_NAME, rbd_dev, &vaf);
412 #define rbd_assert(expr) \
413 if (unlikely(!(expr))) { \
414 printk(KERN_ERR "\nAssertion failure in %s() " \
416 "\trbd_assert(%s);\n\n", \
417 __func__, __LINE__, #expr); \
420 #else /* !RBD_DEBUG */
421 # define rbd_assert(expr) ((void) 0)
422 #endif /* !RBD_DEBUG */
424 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
425 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
428 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
430 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
432 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
436 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
438 static int rbd_open(struct block_device *bdev, fmode_t mode)
440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
441 bool removing = false;
443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
446 spin_lock_irq(&rbd_dev->lock);
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
450 rbd_dev->open_count++;
451 spin_unlock_irq(&rbd_dev->lock);
455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
456 (void) get_device(&rbd_dev->dev);
457 set_device_ro(bdev, rbd_dev->mapping.read_only);
458 mutex_unlock(&ctl_mutex);
463 static int rbd_release(struct gendisk *disk, fmode_t mode)
465 struct rbd_device *rbd_dev = disk->private_data;
466 unsigned long open_count_before;
468 spin_lock_irq(&rbd_dev->lock);
469 open_count_before = rbd_dev->open_count--;
470 spin_unlock_irq(&rbd_dev->lock);
471 rbd_assert(open_count_before > 0);
473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
474 put_device(&rbd_dev->dev);
475 mutex_unlock(&ctl_mutex);
480 static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
483 .release = rbd_release,
487 * Initialize an rbd client instance.
490 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
492 struct rbd_client *rbdc;
495 dout("%s:\n", __func__);
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
506 if (IS_ERR(rbdc->client))
508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
510 ret = ceph_open_session(rbdc->client);
514 spin_lock(&rbd_client_list_lock);
515 list_add_tail(&rbdc->node, &rbd_client_list);
516 spin_unlock(&rbd_client_list_lock);
518 mutex_unlock(&ctl_mutex);
519 dout("%s: rbdc %p\n", __func__, rbdc);
524 ceph_destroy_client(rbdc->client);
526 mutex_unlock(&ctl_mutex);
530 ceph_destroy_options(ceph_opts);
531 dout("%s: error %d\n", __func__, ret);
536 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
538 kref_get(&rbdc->kref);
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
547 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
549 struct rbd_client *client_node;
552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
558 __rbd_get_client(client_node);
564 spin_unlock(&rbd_client_list_lock);
566 return found ? client_node : NULL;
576 /* string args above */
579 /* Boolean args above */
583 static match_table_t rbd_opts_tokens = {
585 /* string args above */
586 {Opt_read_only, "read_only"},
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
598 #define RBD_READ_ONLY_DEFAULT false
600 static int parse_rbd_opts_token(char *c, void *private)
602 struct rbd_options *rbd_opts = private;
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
606 token = match_token(c, rbd_opts_tokens, argstr);
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
613 pr_err("bad mount option arg (not int) "
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
624 dout("got token %d\n", token);
629 rbd_opts->read_only = true;
632 rbd_opts->read_only = false;
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
645 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
647 struct rbd_client *rbdc;
649 rbdc = rbd_client_find(ceph_opts);
650 if (rbdc) /* using an existing client */
651 ceph_destroy_options(ceph_opts);
653 rbdc = rbd_client_create(ceph_opts);
659 * Destroy ceph client
661 * Caller must hold rbd_client_list_lock.
663 static void rbd_client_release(struct kref *kref)
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
667 dout("%s: rbdc %p\n", __func__, rbdc);
668 spin_lock(&rbd_client_list_lock);
669 list_del(&rbdc->node);
670 spin_unlock(&rbd_client_list_lock);
672 ceph_destroy_client(rbdc->client);
677 * Drop reference to ceph client node. If it's not referenced anymore, release
680 static void rbd_put_client(struct rbd_client *rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
686 static bool rbd_image_format_valid(u32 image_format)
688 return image_format == 1 || image_format == 2;
691 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
700 /* The bio layer requires at least sector-sized I/O */
702 if (ondisk->options.order < SECTOR_SHIFT)
705 /* If we use u64 in a few spots we may be able to loosen this */
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
731 * Create a new header structure, translate header format from the on-disk
734 static int rbd_header_from_disk(struct rbd_image_header *header,
735 struct rbd_image_header_ondisk *ondisk)
742 memset(header, 0, sizeof (*header));
744 snap_count = le32_to_cpu(ondisk->snap_count);
746 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
748 if (!header->object_prefix)
750 memcpy(header->object_prefix, ondisk->object_prefix, len);
751 header->object_prefix[len] = '\0';
754 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
756 /* Save a copy of the snapshot names */
758 if (snap_names_len > (u64) SIZE_MAX)
760 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
761 if (!header->snap_names)
764 * Note that rbd_dev_v1_header_read() guarantees
765 * the ondisk buffer we're working with has
766 * snap_names_len bytes beyond the end of the
767 * snapshot id array, this memcpy() is safe.
769 memcpy(header->snap_names, &ondisk->snaps[snap_count],
772 /* Record each snapshot's size */
774 size = snap_count * sizeof (*header->snap_sizes);
775 header->snap_sizes = kmalloc(size, GFP_KERNEL);
776 if (!header->snap_sizes)
778 for (i = 0; i < snap_count; i++)
779 header->snap_sizes[i] =
780 le64_to_cpu(ondisk->snaps[i].image_size);
782 header->snap_names = NULL;
783 header->snap_sizes = NULL;
786 header->features = 0; /* No features support in v1 images */
787 header->obj_order = ondisk->options.order;
788 header->crypt_type = ondisk->options.crypt_type;
789 header->comp_type = ondisk->options.comp_type;
791 /* Allocate and fill in the snapshot context */
793 header->image_size = le64_to_cpu(ondisk->image_size);
795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
799 for (i = 0; i < snap_count; i++)
800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
805 kfree(header->snap_sizes);
806 header->snap_sizes = NULL;
807 kfree(header->snap_names);
808 header->snap_names = NULL;
809 kfree(header->object_prefix);
810 header->object_prefix = NULL;
815 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
817 const char *snap_name;
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
821 /* Skip over names until we find the one we are looking for */
823 snap_name = rbd_dev->header.snap_names;
825 snap_name += strlen(snap_name) + 1;
827 return kstrdup(snap_name, GFP_KERNEL);
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
834 static int snapid_compare_reverse(const void *s1, const void *s2)
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
839 if (snap_id1 < snap_id2)
841 return snap_id1 == snap_id2 ? 0 : -1;
845 * Search a snapshot context to see if the given snapshot id is
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
854 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
865 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
877 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
879 if (snap_id == CEPH_NOSNAP)
880 return RBD_SNAP_HEAD_NAME;
882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883 if (rbd_dev->image_format == 1)
884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
889 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
902 *snap_size = rbd_dev->header.snap_sizes[which];
907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
916 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
932 *snap_features = features;
937 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
939 u64 snap_id = rbd_dev->spec->snap_id;
944 ret = rbd_snap_size(rbd_dev, snap_id, &size);
947 ret = rbd_snap_features(rbd_dev, snap_id, &features);
951 rbd_dev->mapping.size = size;
952 rbd_dev->mapping.features = features;
954 /* If we are mapping a snapshot it must be marked read-only */
956 if (snap_id != CEPH_NOSNAP)
957 rbd_dev->mapping.read_only = true;
962 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
964 rbd_dev->mapping.size = 0;
965 rbd_dev->mapping.features = 0;
966 rbd_dev->mapping.read_only = true;
969 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
975 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
978 segment = offset >> rbd_dev->header.obj_order;
979 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
980 rbd_dev->header.object_prefix, segment);
981 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
982 pr_err("error formatting segment name for #%llu (%d)\n",
991 static void rbd_segment_name_free(const char *name)
993 /* The explicit cast here is needed to drop the const qualifier */
995 kmem_cache_free(rbd_segment_name_cache, (void *)name);
998 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1000 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1002 return offset & (segment_size - 1);
1005 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1006 u64 offset, u64 length)
1008 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1010 offset &= segment_size - 1;
1012 rbd_assert(length <= U64_MAX - offset);
1013 if (offset + length > segment_size)
1014 length = segment_size - offset;
1020 * returns the size of an object in the image
1022 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1024 return 1 << header->obj_order;
1031 static void bio_chain_put(struct bio *chain)
1037 chain = chain->bi_next;
1043 * zeros a bio chain, starting at specific offset
1045 static void zero_bio_chain(struct bio *chain, int start_ofs)
1048 unsigned long flags;
1054 bio_for_each_segment(bv, chain, i) {
1055 if (pos + bv->bv_len > start_ofs) {
1056 int remainder = max(start_ofs - pos, 0);
1057 buf = bvec_kmap_irq(bv, &flags);
1058 memset(buf + remainder, 0,
1059 bv->bv_len - remainder);
1060 bvec_kunmap_irq(buf, &flags);
1065 chain = chain->bi_next;
1070 * similar to zero_bio_chain(), zeros data defined by a page array,
1071 * starting at the given byte offset from the start of the array and
1072 * continuing up to the given end offset. The pages array is
1073 * assumed to be big enough to hold all bytes up to the end.
1075 static void zero_pages(struct page **pages, u64 offset, u64 end)
1077 struct page **page = &pages[offset >> PAGE_SHIFT];
1079 rbd_assert(end > offset);
1080 rbd_assert(end - offset <= (u64)SIZE_MAX);
1081 while (offset < end) {
1084 unsigned long flags;
1087 page_offset = (size_t)(offset & ~PAGE_MASK);
1088 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1089 local_irq_save(flags);
1090 kaddr = kmap_atomic(*page);
1091 memset(kaddr + page_offset, 0, length);
1092 kunmap_atomic(kaddr);
1093 local_irq_restore(flags);
1101 * Clone a portion of a bio, starting at the given byte offset
1102 * and continuing for the number of bytes indicated.
1104 static struct bio *bio_clone_range(struct bio *bio_src,
1105 unsigned int offset,
1113 unsigned short end_idx;
1114 unsigned short vcnt;
1117 /* Handle the easy case for the caller */
1119 if (!offset && len == bio_src->bi_size)
1120 return bio_clone(bio_src, gfpmask);
1122 if (WARN_ON_ONCE(!len))
1124 if (WARN_ON_ONCE(len > bio_src->bi_size))
1126 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1129 /* Find first affected segment... */
1132 __bio_for_each_segment(bv, bio_src, idx, 0) {
1133 if (resid < bv->bv_len)
1135 resid -= bv->bv_len;
1139 /* ...and the last affected segment */
1142 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1143 if (resid <= bv->bv_len)
1145 resid -= bv->bv_len;
1147 vcnt = end_idx - idx + 1;
1149 /* Build the clone */
1151 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1153 return NULL; /* ENOMEM */
1155 bio->bi_bdev = bio_src->bi_bdev;
1156 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1157 bio->bi_rw = bio_src->bi_rw;
1158 bio->bi_flags |= 1 << BIO_CLONED;
1161 * Copy over our part of the bio_vec, then update the first
1162 * and last (or only) entries.
1164 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1165 vcnt * sizeof (struct bio_vec));
1166 bio->bi_io_vec[0].bv_offset += voff;
1168 bio->bi_io_vec[0].bv_len -= voff;
1169 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1171 bio->bi_io_vec[0].bv_len = len;
1174 bio->bi_vcnt = vcnt;
1182 * Clone a portion of a bio chain, starting at the given byte offset
1183 * into the first bio in the source chain and continuing for the
1184 * number of bytes indicated. The result is another bio chain of
1185 * exactly the given length, or a null pointer on error.
1187 * The bio_src and offset parameters are both in-out. On entry they
1188 * refer to the first source bio and the offset into that bio where
1189 * the start of data to be cloned is located.
1191 * On return, bio_src is updated to refer to the bio in the source
1192 * chain that contains first un-cloned byte, and *offset will
1193 * contain the offset of that byte within that bio.
1195 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1196 unsigned int *offset,
1200 struct bio *bi = *bio_src;
1201 unsigned int off = *offset;
1202 struct bio *chain = NULL;
1205 /* Build up a chain of clone bios up to the limit */
1207 if (!bi || off >= bi->bi_size || !len)
1208 return NULL; /* Nothing to clone */
1212 unsigned int bi_size;
1216 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1217 goto out_err; /* EINVAL; ran out of bio's */
1219 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1220 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1222 goto out_err; /* ENOMEM */
1225 end = &bio->bi_next;
1228 if (off == bi->bi_size) {
1239 bio_chain_put(chain);
1245 * The default/initial value for all object request flags is 0. For
1246 * each flag, once its value is set to 1 it is never reset to 0
1249 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1251 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1252 struct rbd_device *rbd_dev;
1254 rbd_dev = obj_request->img_request->rbd_dev;
1255 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1260 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1263 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1266 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1268 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1269 struct rbd_device *rbd_dev = NULL;
1271 if (obj_request_img_data_test(obj_request))
1272 rbd_dev = obj_request->img_request->rbd_dev;
1273 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1278 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1281 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1285 * This sets the KNOWN flag after (possibly) setting the EXISTS
1286 * flag. The latter is set based on the "exists" value provided.
1288 * Note that for our purposes once an object exists it never goes
1289 * away again. It's possible that the response from two existence
1290 * checks are separated by the creation of the target object, and
1291 * the first ("doesn't exist") response arrives *after* the second
1292 * ("does exist"). In that case we ignore the second one.
1294 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1298 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1299 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1303 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1306 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1309 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1312 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1315 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1317 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1318 atomic_read(&obj_request->kref.refcount));
1319 kref_get(&obj_request->kref);
1322 static void rbd_obj_request_destroy(struct kref *kref);
1323 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1325 rbd_assert(obj_request != NULL);
1326 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1327 atomic_read(&obj_request->kref.refcount));
1328 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1331 static void rbd_img_request_get(struct rbd_img_request *img_request)
1333 dout("%s: img %p (was %d)\n", __func__, img_request,
1334 atomic_read(&img_request->kref.refcount));
1335 kref_get(&img_request->kref);
1338 static void rbd_img_request_destroy(struct kref *kref);
1339 static void rbd_img_request_put(struct rbd_img_request *img_request)
1341 rbd_assert(img_request != NULL);
1342 dout("%s: img %p (was %d)\n", __func__, img_request,
1343 atomic_read(&img_request->kref.refcount));
1344 kref_put(&img_request->kref, rbd_img_request_destroy);
1347 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1348 struct rbd_obj_request *obj_request)
1350 rbd_assert(obj_request->img_request == NULL);
1352 /* Image request now owns object's original reference */
1353 obj_request->img_request = img_request;
1354 obj_request->which = img_request->obj_request_count;
1355 rbd_assert(!obj_request_img_data_test(obj_request));
1356 obj_request_img_data_set(obj_request);
1357 rbd_assert(obj_request->which != BAD_WHICH);
1358 img_request->obj_request_count++;
1359 list_add_tail(&obj_request->links, &img_request->obj_requests);
1360 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1361 obj_request->which);
1364 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1365 struct rbd_obj_request *obj_request)
1367 rbd_assert(obj_request->which != BAD_WHICH);
1369 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1370 obj_request->which);
1371 list_del(&obj_request->links);
1372 rbd_assert(img_request->obj_request_count > 0);
1373 img_request->obj_request_count--;
1374 rbd_assert(obj_request->which == img_request->obj_request_count);
1375 obj_request->which = BAD_WHICH;
1376 rbd_assert(obj_request_img_data_test(obj_request));
1377 rbd_assert(obj_request->img_request == img_request);
1378 obj_request->img_request = NULL;
1379 obj_request->callback = NULL;
1380 rbd_obj_request_put(obj_request);
1383 static bool obj_request_type_valid(enum obj_request_type type)
1386 case OBJ_REQUEST_NODATA:
1387 case OBJ_REQUEST_BIO:
1388 case OBJ_REQUEST_PAGES:
1395 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1396 struct rbd_obj_request *obj_request)
1398 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1400 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1403 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1406 dout("%s: img %p\n", __func__, img_request);
1409 * If no error occurred, compute the aggregate transfer
1410 * count for the image request. We could instead use
1411 * atomic64_cmpxchg() to update it as each object request
1412 * completes; not clear which way is better off hand.
1414 if (!img_request->result) {
1415 struct rbd_obj_request *obj_request;
1418 for_each_obj_request(img_request, obj_request)
1419 xferred += obj_request->xferred;
1420 img_request->xferred = xferred;
1423 if (img_request->callback)
1424 img_request->callback(img_request);
1426 rbd_img_request_put(img_request);
1429 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1431 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1433 dout("%s: obj %p\n", __func__, obj_request);
1435 return wait_for_completion_interruptible(&obj_request->completion);
1439 * The default/initial value for all image request flags is 0. Each
1440 * is conditionally set to 1 at image request initialization time
1441 * and currently never change thereafter.
1443 static void img_request_write_set(struct rbd_img_request *img_request)
1445 set_bit(IMG_REQ_WRITE, &img_request->flags);
1449 static bool img_request_write_test(struct rbd_img_request *img_request)
1452 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1455 static void img_request_child_set(struct rbd_img_request *img_request)
1457 set_bit(IMG_REQ_CHILD, &img_request->flags);
1461 static bool img_request_child_test(struct rbd_img_request *img_request)
1464 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1467 static void img_request_layered_set(struct rbd_img_request *img_request)
1469 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1473 static bool img_request_layered_test(struct rbd_img_request *img_request)
1476 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1480 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1482 u64 xferred = obj_request->xferred;
1483 u64 length = obj_request->length;
1485 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1486 obj_request, obj_request->img_request, obj_request->result,
1489 * ENOENT means a hole in the image. We zero-fill the
1490 * entire length of the request. A short read also implies
1491 * zero-fill to the end of the request. Either way we
1492 * update the xferred count to indicate the whole request
1495 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1496 if (obj_request->result == -ENOENT) {
1497 if (obj_request->type == OBJ_REQUEST_BIO)
1498 zero_bio_chain(obj_request->bio_list, 0);
1500 zero_pages(obj_request->pages, 0, length);
1501 obj_request->result = 0;
1502 obj_request->xferred = length;
1503 } else if (xferred < length && !obj_request->result) {
1504 if (obj_request->type == OBJ_REQUEST_BIO)
1505 zero_bio_chain(obj_request->bio_list, xferred);
1507 zero_pages(obj_request->pages, xferred, length);
1508 obj_request->xferred = length;
1510 obj_request_done_set(obj_request);
1513 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1515 dout("%s: obj %p cb %p\n", __func__, obj_request,
1516 obj_request->callback);
1517 if (obj_request->callback)
1518 obj_request->callback(obj_request);
1520 complete_all(&obj_request->completion);
1523 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1525 dout("%s: obj %p\n", __func__, obj_request);
1526 obj_request_done_set(obj_request);
1529 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1531 struct rbd_img_request *img_request = NULL;
1532 struct rbd_device *rbd_dev = NULL;
1533 bool layered = false;
1535 if (obj_request_img_data_test(obj_request)) {
1536 img_request = obj_request->img_request;
1537 layered = img_request && img_request_layered_test(img_request);
1538 rbd_dev = img_request->rbd_dev;
1541 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1542 obj_request, img_request, obj_request->result,
1543 obj_request->xferred, obj_request->length);
1544 if (layered && obj_request->result == -ENOENT &&
1545 obj_request->img_offset < rbd_dev->parent_overlap)
1546 rbd_img_parent_read(obj_request);
1547 else if (img_request)
1548 rbd_img_obj_request_read_callback(obj_request);
1550 obj_request_done_set(obj_request);
1553 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1555 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1556 obj_request->result, obj_request->length);
1558 * There is no such thing as a successful short write. Set
1559 * it to our originally-requested length.
1561 obj_request->xferred = obj_request->length;
1562 obj_request_done_set(obj_request);
1566 * For a simple stat call there's nothing to do. We'll do more if
1567 * this is part of a write sequence for a layered image.
1569 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1571 dout("%s: obj %p\n", __func__, obj_request);
1572 obj_request_done_set(obj_request);
1575 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1576 struct ceph_msg *msg)
1578 struct rbd_obj_request *obj_request = osd_req->r_priv;
1581 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1582 rbd_assert(osd_req == obj_request->osd_req);
1583 if (obj_request_img_data_test(obj_request)) {
1584 rbd_assert(obj_request->img_request);
1585 rbd_assert(obj_request->which != BAD_WHICH);
1587 rbd_assert(obj_request->which == BAD_WHICH);
1590 if (osd_req->r_result < 0)
1591 obj_request->result = osd_req->r_result;
1593 BUG_ON(osd_req->r_num_ops > 2);
1596 * We support a 64-bit length, but ultimately it has to be
1597 * passed to blk_end_request(), which takes an unsigned int.
1599 obj_request->xferred = osd_req->r_reply_op_len[0];
1600 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1601 opcode = osd_req->r_ops[0].op;
1603 case CEPH_OSD_OP_READ:
1604 rbd_osd_read_callback(obj_request);
1606 case CEPH_OSD_OP_WRITE:
1607 rbd_osd_write_callback(obj_request);
1609 case CEPH_OSD_OP_STAT:
1610 rbd_osd_stat_callback(obj_request);
1612 case CEPH_OSD_OP_CALL:
1613 case CEPH_OSD_OP_NOTIFY_ACK:
1614 case CEPH_OSD_OP_WATCH:
1615 rbd_osd_trivial_callback(obj_request);
1618 rbd_warn(NULL, "%s: unsupported op %hu\n",
1619 obj_request->object_name, (unsigned short) opcode);
1623 if (obj_request_done_test(obj_request))
1624 rbd_obj_request_complete(obj_request);
1627 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1629 struct rbd_img_request *img_request = obj_request->img_request;
1630 struct ceph_osd_request *osd_req = obj_request->osd_req;
1633 rbd_assert(osd_req != NULL);
1635 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1636 ceph_osdc_build_request(osd_req, obj_request->offset,
1637 NULL, snap_id, NULL);
1640 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1642 struct rbd_img_request *img_request = obj_request->img_request;
1643 struct ceph_osd_request *osd_req = obj_request->osd_req;
1644 struct ceph_snap_context *snapc;
1645 struct timespec mtime = CURRENT_TIME;
1647 rbd_assert(osd_req != NULL);
1649 snapc = img_request ? img_request->snapc : NULL;
1650 ceph_osdc_build_request(osd_req, obj_request->offset,
1651 snapc, CEPH_NOSNAP, &mtime);
1654 static struct ceph_osd_request *rbd_osd_req_create(
1655 struct rbd_device *rbd_dev,
1657 struct rbd_obj_request *obj_request)
1659 struct ceph_snap_context *snapc = NULL;
1660 struct ceph_osd_client *osdc;
1661 struct ceph_osd_request *osd_req;
1663 if (obj_request_img_data_test(obj_request)) {
1664 struct rbd_img_request *img_request = obj_request->img_request;
1666 rbd_assert(write_request ==
1667 img_request_write_test(img_request));
1669 snapc = img_request->snapc;
1672 /* Allocate and initialize the request, for the single op */
1674 osdc = &rbd_dev->rbd_client->client->osdc;
1675 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1677 return NULL; /* ENOMEM */
1680 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1682 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1684 osd_req->r_callback = rbd_osd_req_callback;
1685 osd_req->r_priv = obj_request;
1687 osd_req->r_oid_len = strlen(obj_request->object_name);
1688 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1689 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1691 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1697 * Create a copyup osd request based on the information in the
1698 * object request supplied. A copyup request has two osd ops,
1699 * a copyup method call, and a "normal" write request.
1701 static struct ceph_osd_request *
1702 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1704 struct rbd_img_request *img_request;
1705 struct ceph_snap_context *snapc;
1706 struct rbd_device *rbd_dev;
1707 struct ceph_osd_client *osdc;
1708 struct ceph_osd_request *osd_req;
1710 rbd_assert(obj_request_img_data_test(obj_request));
1711 img_request = obj_request->img_request;
1712 rbd_assert(img_request);
1713 rbd_assert(img_request_write_test(img_request));
1715 /* Allocate and initialize the request, for the two ops */
1717 snapc = img_request->snapc;
1718 rbd_dev = img_request->rbd_dev;
1719 osdc = &rbd_dev->rbd_client->client->osdc;
1720 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1722 return NULL; /* ENOMEM */
1724 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1725 osd_req->r_callback = rbd_osd_req_callback;
1726 osd_req->r_priv = obj_request;
1728 osd_req->r_oid_len = strlen(obj_request->object_name);
1729 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1730 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1732 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1738 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1740 ceph_osdc_put_request(osd_req);
1743 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1745 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1746 u64 offset, u64 length,
1747 enum obj_request_type type)
1749 struct rbd_obj_request *obj_request;
1753 rbd_assert(obj_request_type_valid(type));
1755 size = strlen(object_name) + 1;
1756 name = kmalloc(size, GFP_KERNEL);
1760 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1766 obj_request->object_name = memcpy(name, object_name, size);
1767 obj_request->offset = offset;
1768 obj_request->length = length;
1769 obj_request->flags = 0;
1770 obj_request->which = BAD_WHICH;
1771 obj_request->type = type;
1772 INIT_LIST_HEAD(&obj_request->links);
1773 init_completion(&obj_request->completion);
1774 kref_init(&obj_request->kref);
1776 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1777 offset, length, (int)type, obj_request);
1782 static void rbd_obj_request_destroy(struct kref *kref)
1784 struct rbd_obj_request *obj_request;
1786 obj_request = container_of(kref, struct rbd_obj_request, kref);
1788 dout("%s: obj %p\n", __func__, obj_request);
1790 rbd_assert(obj_request->img_request == NULL);
1791 rbd_assert(obj_request->which == BAD_WHICH);
1793 if (obj_request->osd_req)
1794 rbd_osd_req_destroy(obj_request->osd_req);
1796 rbd_assert(obj_request_type_valid(obj_request->type));
1797 switch (obj_request->type) {
1798 case OBJ_REQUEST_NODATA:
1799 break; /* Nothing to do */
1800 case OBJ_REQUEST_BIO:
1801 if (obj_request->bio_list)
1802 bio_chain_put(obj_request->bio_list);
1804 case OBJ_REQUEST_PAGES:
1805 if (obj_request->pages)
1806 ceph_release_page_vector(obj_request->pages,
1807 obj_request->page_count);
1811 kfree(obj_request->object_name);
1812 obj_request->object_name = NULL;
1813 kmem_cache_free(rbd_obj_request_cache, obj_request);
1817 * Caller is responsible for filling in the list of object requests
1818 * that comprises the image request, and the Linux request pointer
1819 * (if there is one).
1821 static struct rbd_img_request *rbd_img_request_create(
1822 struct rbd_device *rbd_dev,
1823 u64 offset, u64 length,
1827 struct rbd_img_request *img_request;
1829 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1833 if (write_request) {
1834 down_read(&rbd_dev->header_rwsem);
1835 ceph_get_snap_context(rbd_dev->header.snapc);
1836 up_read(&rbd_dev->header_rwsem);
1839 img_request->rq = NULL;
1840 img_request->rbd_dev = rbd_dev;
1841 img_request->offset = offset;
1842 img_request->length = length;
1843 img_request->flags = 0;
1844 if (write_request) {
1845 img_request_write_set(img_request);
1846 img_request->snapc = rbd_dev->header.snapc;
1848 img_request->snap_id = rbd_dev->spec->snap_id;
1851 img_request_child_set(img_request);
1852 if (rbd_dev->parent_spec)
1853 img_request_layered_set(img_request);
1854 spin_lock_init(&img_request->completion_lock);
1855 img_request->next_completion = 0;
1856 img_request->callback = NULL;
1857 img_request->result = 0;
1858 img_request->obj_request_count = 0;
1859 INIT_LIST_HEAD(&img_request->obj_requests);
1860 kref_init(&img_request->kref);
1862 rbd_img_request_get(img_request); /* Avoid a warning */
1863 rbd_img_request_put(img_request); /* TEMPORARY */
1865 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1866 write_request ? "write" : "read", offset, length,
1872 static void rbd_img_request_destroy(struct kref *kref)
1874 struct rbd_img_request *img_request;
1875 struct rbd_obj_request *obj_request;
1876 struct rbd_obj_request *next_obj_request;
1878 img_request = container_of(kref, struct rbd_img_request, kref);
1880 dout("%s: img %p\n", __func__, img_request);
1882 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1883 rbd_img_obj_request_del(img_request, obj_request);
1884 rbd_assert(img_request->obj_request_count == 0);
1886 if (img_request_write_test(img_request))
1887 ceph_put_snap_context(img_request->snapc);
1889 if (img_request_child_test(img_request))
1890 rbd_obj_request_put(img_request->obj_request);
1892 kmem_cache_free(rbd_img_request_cache, img_request);
1895 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1897 struct rbd_img_request *img_request;
1898 unsigned int xferred;
1902 rbd_assert(obj_request_img_data_test(obj_request));
1903 img_request = obj_request->img_request;
1905 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1906 xferred = (unsigned int)obj_request->xferred;
1907 result = obj_request->result;
1909 struct rbd_device *rbd_dev = img_request->rbd_dev;
1911 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1912 img_request_write_test(img_request) ? "write" : "read",
1913 obj_request->length, obj_request->img_offset,
1914 obj_request->offset);
1915 rbd_warn(rbd_dev, " result %d xferred %x\n",
1917 if (!img_request->result)
1918 img_request->result = result;
1921 /* Image object requests don't own their page array */
1923 if (obj_request->type == OBJ_REQUEST_PAGES) {
1924 obj_request->pages = NULL;
1925 obj_request->page_count = 0;
1928 if (img_request_child_test(img_request)) {
1929 rbd_assert(img_request->obj_request != NULL);
1930 more = obj_request->which < img_request->obj_request_count - 1;
1932 rbd_assert(img_request->rq != NULL);
1933 more = blk_end_request(img_request->rq, result, xferred);
1939 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1941 struct rbd_img_request *img_request;
1942 u32 which = obj_request->which;
1945 rbd_assert(obj_request_img_data_test(obj_request));
1946 img_request = obj_request->img_request;
1948 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1949 rbd_assert(img_request != NULL);
1950 rbd_assert(img_request->obj_request_count > 0);
1951 rbd_assert(which != BAD_WHICH);
1952 rbd_assert(which < img_request->obj_request_count);
1953 rbd_assert(which >= img_request->next_completion);
1955 spin_lock_irq(&img_request->completion_lock);
1956 if (which != img_request->next_completion)
1959 for_each_obj_request_from(img_request, obj_request) {
1961 rbd_assert(which < img_request->obj_request_count);
1963 if (!obj_request_done_test(obj_request))
1965 more = rbd_img_obj_end_request(obj_request);
1969 rbd_assert(more ^ (which == img_request->obj_request_count));
1970 img_request->next_completion = which;
1972 spin_unlock_irq(&img_request->completion_lock);
1975 rbd_img_request_complete(img_request);
1979 * Split up an image request into one or more object requests, each
1980 * to a different object. The "type" parameter indicates whether
1981 * "data_desc" is the pointer to the head of a list of bio
1982 * structures, or the base of a page array. In either case this
1983 * function assumes data_desc describes memory sufficient to hold
1984 * all data described by the image request.
1986 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1987 enum obj_request_type type,
1990 struct rbd_device *rbd_dev = img_request->rbd_dev;
1991 struct rbd_obj_request *obj_request = NULL;
1992 struct rbd_obj_request *next_obj_request;
1993 bool write_request = img_request_write_test(img_request);
1994 struct bio *bio_list;
1995 unsigned int bio_offset = 0;
1996 struct page **pages;
2001 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2002 (int)type, data_desc);
2004 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2005 img_offset = img_request->offset;
2006 resid = img_request->length;
2007 rbd_assert(resid > 0);
2009 if (type == OBJ_REQUEST_BIO) {
2010 bio_list = data_desc;
2011 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2013 rbd_assert(type == OBJ_REQUEST_PAGES);
2018 struct ceph_osd_request *osd_req;
2019 const char *object_name;
2023 object_name = rbd_segment_name(rbd_dev, img_offset);
2026 offset = rbd_segment_offset(rbd_dev, img_offset);
2027 length = rbd_segment_length(rbd_dev, img_offset, resid);
2028 obj_request = rbd_obj_request_create(object_name,
2029 offset, length, type);
2030 /* object request has its own copy of the object name */
2031 rbd_segment_name_free(object_name);
2035 if (type == OBJ_REQUEST_BIO) {
2036 unsigned int clone_size;
2038 rbd_assert(length <= (u64)UINT_MAX);
2039 clone_size = (unsigned int)length;
2040 obj_request->bio_list =
2041 bio_chain_clone_range(&bio_list,
2045 if (!obj_request->bio_list)
2048 unsigned int page_count;
2050 obj_request->pages = pages;
2051 page_count = (u32)calc_pages_for(offset, length);
2052 obj_request->page_count = page_count;
2053 if ((offset + length) & ~PAGE_MASK)
2054 page_count--; /* more on last page */
2055 pages += page_count;
2058 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2062 obj_request->osd_req = osd_req;
2063 obj_request->callback = rbd_img_obj_callback;
2065 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2067 if (type == OBJ_REQUEST_BIO)
2068 osd_req_op_extent_osd_data_bio(osd_req, 0,
2069 obj_request->bio_list, length);
2071 osd_req_op_extent_osd_data_pages(osd_req, 0,
2072 obj_request->pages, length,
2073 offset & ~PAGE_MASK, false, false);
2076 rbd_osd_req_format_write(obj_request);
2078 rbd_osd_req_format_read(obj_request);
2080 obj_request->img_offset = img_offset;
2081 rbd_img_obj_request_add(img_request, obj_request);
2083 img_offset += length;
2090 rbd_obj_request_put(obj_request);
2092 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2093 rbd_obj_request_put(obj_request);
2099 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2101 struct rbd_img_request *img_request;
2102 struct rbd_device *rbd_dev;
2106 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2107 rbd_assert(obj_request_img_data_test(obj_request));
2108 img_request = obj_request->img_request;
2109 rbd_assert(img_request);
2111 rbd_dev = img_request->rbd_dev;
2112 rbd_assert(rbd_dev);
2113 length = (u64)1 << rbd_dev->header.obj_order;
2114 page_count = (u32)calc_pages_for(0, length);
2116 rbd_assert(obj_request->copyup_pages);
2117 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2118 obj_request->copyup_pages = NULL;
2121 * We want the transfer count to reflect the size of the
2122 * original write request. There is no such thing as a
2123 * successful short write, so if the request was successful
2124 * we can just set it to the originally-requested length.
2126 if (!obj_request->result)
2127 obj_request->xferred = obj_request->length;
2129 /* Finish up with the normal image object callback */
2131 rbd_img_obj_callback(obj_request);
2135 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2137 struct rbd_obj_request *orig_request;
2138 struct ceph_osd_request *osd_req;
2139 struct ceph_osd_client *osdc;
2140 struct rbd_device *rbd_dev;
2141 struct page **pages;
2146 rbd_assert(img_request_child_test(img_request));
2148 /* First get what we need from the image request */
2150 pages = img_request->copyup_pages;
2151 rbd_assert(pages != NULL);
2152 img_request->copyup_pages = NULL;
2154 orig_request = img_request->obj_request;
2155 rbd_assert(orig_request != NULL);
2156 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2157 result = img_request->result;
2158 obj_size = img_request->length;
2159 xferred = img_request->xferred;
2161 rbd_dev = img_request->rbd_dev;
2162 rbd_assert(rbd_dev);
2163 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2165 rbd_img_request_put(img_request);
2170 /* Allocate the new copyup osd request for the original request */
2173 rbd_assert(!orig_request->osd_req);
2174 osd_req = rbd_osd_req_create_copyup(orig_request);
2177 orig_request->osd_req = osd_req;
2178 orig_request->copyup_pages = pages;
2180 /* Initialize the copyup op */
2182 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2183 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2186 /* Then the original write request op */
2188 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2189 orig_request->offset,
2190 orig_request->length, 0, 0);
2191 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2192 orig_request->length);
2194 rbd_osd_req_format_write(orig_request);
2196 /* All set, send it off. */
2198 orig_request->callback = rbd_img_obj_copyup_callback;
2199 osdc = &rbd_dev->rbd_client->client->osdc;
2200 result = rbd_obj_request_submit(osdc, orig_request);
2204 /* Record the error code and complete the request */
2206 orig_request->result = result;
2207 orig_request->xferred = 0;
2208 obj_request_done_set(orig_request);
2209 rbd_obj_request_complete(orig_request);
2213 * Read from the parent image the range of data that covers the
2214 * entire target of the given object request. This is used for
2215 * satisfying a layered image write request when the target of an
2216 * object request from the image request does not exist.
2218 * A page array big enough to hold the returned data is allocated
2219 * and supplied to rbd_img_request_fill() as the "data descriptor."
2220 * When the read completes, this page array will be transferred to
2221 * the original object request for the copyup operation.
2223 * If an error occurs, record it as the result of the original
2224 * object request and mark it done so it gets completed.
2226 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2228 struct rbd_img_request *img_request = NULL;
2229 struct rbd_img_request *parent_request = NULL;
2230 struct rbd_device *rbd_dev;
2233 struct page **pages = NULL;
2237 rbd_assert(obj_request_img_data_test(obj_request));
2238 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2240 img_request = obj_request->img_request;
2241 rbd_assert(img_request != NULL);
2242 rbd_dev = img_request->rbd_dev;
2243 rbd_assert(rbd_dev->parent != NULL);
2246 * First things first. The original osd request is of no
2247 * use to use any more, we'll need a new one that can hold
2248 * the two ops in a copyup request. We'll get that later,
2249 * but for now we can release the old one.
2251 rbd_osd_req_destroy(obj_request->osd_req);
2252 obj_request->osd_req = NULL;
2255 * Determine the byte range covered by the object in the
2256 * child image to which the original request was to be sent.
2258 img_offset = obj_request->img_offset - obj_request->offset;
2259 length = (u64)1 << rbd_dev->header.obj_order;
2262 * There is no defined parent data beyond the parent
2263 * overlap, so limit what we read at that boundary if
2266 if (img_offset + length > rbd_dev->parent_overlap) {
2267 rbd_assert(img_offset < rbd_dev->parent_overlap);
2268 length = rbd_dev->parent_overlap - img_offset;
2272 * Allocate a page array big enough to receive the data read
2275 page_count = (u32)calc_pages_for(0, length);
2276 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2277 if (IS_ERR(pages)) {
2278 result = PTR_ERR(pages);
2284 parent_request = rbd_img_request_create(rbd_dev->parent,
2287 if (!parent_request)
2289 rbd_obj_request_get(obj_request);
2290 parent_request->obj_request = obj_request;
2292 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2295 parent_request->copyup_pages = pages;
2297 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2298 result = rbd_img_request_submit(parent_request);
2302 parent_request->copyup_pages = NULL;
2303 parent_request->obj_request = NULL;
2304 rbd_obj_request_put(obj_request);
2307 ceph_release_page_vector(pages, page_count);
2309 rbd_img_request_put(parent_request);
2310 obj_request->result = result;
2311 obj_request->xferred = 0;
2312 obj_request_done_set(obj_request);
2317 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2319 struct rbd_obj_request *orig_request;
2322 rbd_assert(!obj_request_img_data_test(obj_request));
2325 * All we need from the object request is the original
2326 * request and the result of the STAT op. Grab those, then
2327 * we're done with the request.
2329 orig_request = obj_request->obj_request;
2330 obj_request->obj_request = NULL;
2331 rbd_assert(orig_request);
2332 rbd_assert(orig_request->img_request);
2334 result = obj_request->result;
2335 obj_request->result = 0;
2337 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2338 obj_request, orig_request, result,
2339 obj_request->xferred, obj_request->length);
2340 rbd_obj_request_put(obj_request);
2342 rbd_assert(orig_request);
2343 rbd_assert(orig_request->img_request);
2346 * Our only purpose here is to determine whether the object
2347 * exists, and we don't want to treat the non-existence as
2348 * an error. If something else comes back, transfer the
2349 * error to the original request and complete it now.
2352 obj_request_existence_set(orig_request, true);
2353 } else if (result == -ENOENT) {
2354 obj_request_existence_set(orig_request, false);
2355 } else if (result) {
2356 orig_request->result = result;
2361 * Resubmit the original request now that we have recorded
2362 * whether the target object exists.
2364 orig_request->result = rbd_img_obj_request_submit(orig_request);
2366 if (orig_request->result)
2367 rbd_obj_request_complete(orig_request);
2368 rbd_obj_request_put(orig_request);
2371 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2373 struct rbd_obj_request *stat_request;
2374 struct rbd_device *rbd_dev;
2375 struct ceph_osd_client *osdc;
2376 struct page **pages = NULL;
2382 * The response data for a STAT call consists of:
2389 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2390 page_count = (u32)calc_pages_for(0, size);
2391 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2393 return PTR_ERR(pages);
2396 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2401 rbd_obj_request_get(obj_request);
2402 stat_request->obj_request = obj_request;
2403 stat_request->pages = pages;
2404 stat_request->page_count = page_count;
2406 rbd_assert(obj_request->img_request);
2407 rbd_dev = obj_request->img_request->rbd_dev;
2408 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2410 if (!stat_request->osd_req)
2412 stat_request->callback = rbd_img_obj_exists_callback;
2414 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2415 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2417 rbd_osd_req_format_read(stat_request);
2419 osdc = &rbd_dev->rbd_client->client->osdc;
2420 ret = rbd_obj_request_submit(osdc, stat_request);
2423 rbd_obj_request_put(obj_request);
2428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2430 struct rbd_img_request *img_request;
2431 struct rbd_device *rbd_dev;
2434 rbd_assert(obj_request_img_data_test(obj_request));
2436 img_request = obj_request->img_request;
2437 rbd_assert(img_request);
2438 rbd_dev = img_request->rbd_dev;
2441 * Only writes to layered images need special handling.
2442 * Reads and non-layered writes are simple object requests.
2443 * Layered writes that start beyond the end of the overlap
2444 * with the parent have no parent data, so they too are
2445 * simple object requests. Finally, if the target object is
2446 * known to already exist, its parent data has already been
2447 * copied, so a write to the object can also be handled as a
2448 * simple object request.
2450 if (!img_request_write_test(img_request) ||
2451 !img_request_layered_test(img_request) ||
2452 rbd_dev->parent_overlap <= obj_request->img_offset ||
2453 ((known = obj_request_known_test(obj_request)) &&
2454 obj_request_exists_test(obj_request))) {
2456 struct rbd_device *rbd_dev;
2457 struct ceph_osd_client *osdc;
2459 rbd_dev = obj_request->img_request->rbd_dev;
2460 osdc = &rbd_dev->rbd_client->client->osdc;
2462 return rbd_obj_request_submit(osdc, obj_request);
2466 * It's a layered write. The target object might exist but
2467 * we may not know that yet. If we know it doesn't exist,
2468 * start by reading the data for the full target object from
2469 * the parent so we can use it for a copyup to the target.
2472 return rbd_img_obj_parent_read_full(obj_request);
2474 /* We don't know whether the target exists. Go find out. */
2476 return rbd_img_obj_exists_submit(obj_request);
2479 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2481 struct rbd_obj_request *obj_request;
2482 struct rbd_obj_request *next_obj_request;
2484 dout("%s: img %p\n", __func__, img_request);
2485 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2488 ret = rbd_img_obj_request_submit(obj_request);
2496 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2498 struct rbd_obj_request *obj_request;
2499 struct rbd_device *rbd_dev;
2502 rbd_assert(img_request_child_test(img_request));
2504 obj_request = img_request->obj_request;
2505 rbd_assert(obj_request);
2506 rbd_assert(obj_request->img_request);
2508 obj_request->result = img_request->result;
2509 if (obj_request->result)
2513 * We need to zero anything beyond the parent overlap
2514 * boundary. Since rbd_img_obj_request_read_callback()
2515 * will zero anything beyond the end of a short read, an
2516 * easy way to do this is to pretend the data from the
2517 * parent came up short--ending at the overlap boundary.
2519 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2520 obj_end = obj_request->img_offset + obj_request->length;
2521 rbd_dev = obj_request->img_request->rbd_dev;
2522 if (obj_end > rbd_dev->parent_overlap) {
2525 if (obj_request->img_offset < rbd_dev->parent_overlap)
2526 xferred = rbd_dev->parent_overlap -
2527 obj_request->img_offset;
2529 obj_request->xferred = min(img_request->xferred, xferred);
2531 obj_request->xferred = img_request->xferred;
2534 rbd_img_request_put(img_request);
2535 rbd_img_obj_request_read_callback(obj_request);
2536 rbd_obj_request_complete(obj_request);
2539 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2541 struct rbd_device *rbd_dev;
2542 struct rbd_img_request *img_request;
2545 rbd_assert(obj_request_img_data_test(obj_request));
2546 rbd_assert(obj_request->img_request != NULL);
2547 rbd_assert(obj_request->result == (s32) -ENOENT);
2548 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2550 rbd_dev = obj_request->img_request->rbd_dev;
2551 rbd_assert(rbd_dev->parent != NULL);
2552 /* rbd_read_finish(obj_request, obj_request->length); */
2553 img_request = rbd_img_request_create(rbd_dev->parent,
2554 obj_request->img_offset,
2555 obj_request->length,
2561 rbd_obj_request_get(obj_request);
2562 img_request->obj_request = obj_request;
2564 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2565 obj_request->bio_list);
2569 img_request->callback = rbd_img_parent_read_callback;
2570 result = rbd_img_request_submit(img_request);
2577 rbd_img_request_put(img_request);
2578 obj_request->result = result;
2579 obj_request->xferred = 0;
2580 obj_request_done_set(obj_request);
2583 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2585 struct rbd_obj_request *obj_request;
2586 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2589 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2590 OBJ_REQUEST_NODATA);
2595 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2596 if (!obj_request->osd_req)
2598 obj_request->callback = rbd_obj_request_put;
2600 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2602 rbd_osd_req_format_read(obj_request);
2604 ret = rbd_obj_request_submit(osdc, obj_request);
2607 rbd_obj_request_put(obj_request);
2612 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2614 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2620 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2621 rbd_dev->header_name, (unsigned long long)notify_id,
2622 (unsigned int)opcode);
2623 ret = rbd_dev_refresh(rbd_dev);
2625 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2627 rbd_obj_notify_ack(rbd_dev, notify_id);
2631 * Request sync osd watch/unwatch. The value of "start" determines
2632 * whether a watch request is being initiated or torn down.
2634 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2636 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2637 struct rbd_obj_request *obj_request;
2640 rbd_assert(start ^ !!rbd_dev->watch_event);
2641 rbd_assert(start ^ !!rbd_dev->watch_request);
2644 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2645 &rbd_dev->watch_event);
2648 rbd_assert(rbd_dev->watch_event != NULL);
2652 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2653 OBJ_REQUEST_NODATA);
2657 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2658 if (!obj_request->osd_req)
2662 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2664 ceph_osdc_unregister_linger_request(osdc,
2665 rbd_dev->watch_request->osd_req);
2667 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2668 rbd_dev->watch_event->cookie, 0, start);
2669 rbd_osd_req_format_write(obj_request);
2671 ret = rbd_obj_request_submit(osdc, obj_request);
2674 ret = rbd_obj_request_wait(obj_request);
2677 ret = obj_request->result;
2682 * A watch request is set to linger, so the underlying osd
2683 * request won't go away until we unregister it. We retain
2684 * a pointer to the object request during that time (in
2685 * rbd_dev->watch_request), so we'll keep a reference to
2686 * it. We'll drop that reference (below) after we've
2690 rbd_dev->watch_request = obj_request;
2695 /* We have successfully torn down the watch request */
2697 rbd_obj_request_put(rbd_dev->watch_request);
2698 rbd_dev->watch_request = NULL;
2700 /* Cancel the event if we're tearing down, or on error */
2701 ceph_osdc_cancel_event(rbd_dev->watch_event);
2702 rbd_dev->watch_event = NULL;
2704 rbd_obj_request_put(obj_request);
2710 * Synchronous osd object method call. Returns the number of bytes
2711 * returned in the outbound buffer, or a negative error code.
2713 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2714 const char *object_name,
2715 const char *class_name,
2716 const char *method_name,
2717 const void *outbound,
2718 size_t outbound_size,
2720 size_t inbound_size)
2722 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2723 struct rbd_obj_request *obj_request;
2724 struct page **pages;
2729 * Method calls are ultimately read operations. The result
2730 * should placed into the inbound buffer provided. They
2731 * also supply outbound data--parameters for the object
2732 * method. Currently if this is present it will be a
2735 page_count = (u32)calc_pages_for(0, inbound_size);
2736 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2738 return PTR_ERR(pages);
2741 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2746 obj_request->pages = pages;
2747 obj_request->page_count = page_count;
2749 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2750 if (!obj_request->osd_req)
2753 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2754 class_name, method_name);
2755 if (outbound_size) {
2756 struct ceph_pagelist *pagelist;
2758 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2762 ceph_pagelist_init(pagelist);
2763 ceph_pagelist_append(pagelist, outbound, outbound_size);
2764 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2767 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2768 obj_request->pages, inbound_size,
2770 rbd_osd_req_format_read(obj_request);
2772 ret = rbd_obj_request_submit(osdc, obj_request);
2775 ret = rbd_obj_request_wait(obj_request);
2779 ret = obj_request->result;
2783 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2784 ret = (int)obj_request->xferred;
2785 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2788 rbd_obj_request_put(obj_request);
2790 ceph_release_page_vector(pages, page_count);
2795 static void rbd_request_fn(struct request_queue *q)
2796 __releases(q->queue_lock) __acquires(q->queue_lock)
2798 struct rbd_device *rbd_dev = q->queuedata;
2799 bool read_only = rbd_dev->mapping.read_only;
2803 while ((rq = blk_fetch_request(q))) {
2804 bool write_request = rq_data_dir(rq) == WRITE;
2805 struct rbd_img_request *img_request;
2809 /* Ignore any non-FS requests that filter through. */
2811 if (rq->cmd_type != REQ_TYPE_FS) {
2812 dout("%s: non-fs request type %d\n", __func__,
2813 (int) rq->cmd_type);
2814 __blk_end_request_all(rq, 0);
2818 /* Ignore/skip any zero-length requests */
2820 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2821 length = (u64) blk_rq_bytes(rq);
2824 dout("%s: zero-length request\n", __func__);
2825 __blk_end_request_all(rq, 0);
2829 spin_unlock_irq(q->queue_lock);
2831 /* Disallow writes to a read-only device */
2833 if (write_request) {
2837 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2841 * Quit early if the mapped snapshot no longer
2842 * exists. It's still possible the snapshot will
2843 * have disappeared by the time our request arrives
2844 * at the osd, but there's no sense in sending it if
2847 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2848 dout("request for non-existent snapshot");
2849 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2855 if (offset && length > U64_MAX - offset + 1) {
2856 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2858 goto end_request; /* Shouldn't happen */
2862 if (offset + length > rbd_dev->mapping.size) {
2863 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2864 offset, length, rbd_dev->mapping.size);
2869 img_request = rbd_img_request_create(rbd_dev, offset, length,
2870 write_request, false);
2874 img_request->rq = rq;
2876 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2879 result = rbd_img_request_submit(img_request);
2881 rbd_img_request_put(img_request);
2883 spin_lock_irq(q->queue_lock);
2885 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2886 write_request ? "write" : "read",
2887 length, offset, result);
2889 __blk_end_request_all(rq, result);
2895 * a queue callback. Makes sure that we don't create a bio that spans across
2896 * multiple osd objects. One exception would be with a single page bios,
2897 * which we handle later at bio_chain_clone_range()
2899 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2900 struct bio_vec *bvec)
2902 struct rbd_device *rbd_dev = q->queuedata;
2903 sector_t sector_offset;
2904 sector_t sectors_per_obj;
2905 sector_t obj_sector_offset;
2909 * Find how far into its rbd object the partition-relative
2910 * bio start sector is to offset relative to the enclosing
2913 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2914 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2915 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2918 * Compute the number of bytes from that offset to the end
2919 * of the object. Account for what's already used by the bio.
2921 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2922 if (ret > bmd->bi_size)
2923 ret -= bmd->bi_size;
2928 * Don't send back more than was asked for. And if the bio
2929 * was empty, let the whole thing through because: "Note
2930 * that a block device *must* allow a single page to be
2931 * added to an empty bio."
2933 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2934 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2935 ret = (int) bvec->bv_len;
2940 static void rbd_free_disk(struct rbd_device *rbd_dev)
2942 struct gendisk *disk = rbd_dev->disk;
2947 rbd_dev->disk = NULL;
2948 if (disk->flags & GENHD_FL_UP) {
2951 blk_cleanup_queue(disk->queue);
2956 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2957 const char *object_name,
2958 u64 offset, u64 length, void *buf)
2961 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2962 struct rbd_obj_request *obj_request;
2963 struct page **pages = NULL;
2968 page_count = (u32) calc_pages_for(offset, length);
2969 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2971 ret = PTR_ERR(pages);
2974 obj_request = rbd_obj_request_create(object_name, offset, length,
2979 obj_request->pages = pages;
2980 obj_request->page_count = page_count;
2982 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2983 if (!obj_request->osd_req)
2986 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2987 offset, length, 0, 0);
2988 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2990 obj_request->length,
2991 obj_request->offset & ~PAGE_MASK,
2993 rbd_osd_req_format_read(obj_request);
2995 ret = rbd_obj_request_submit(osdc, obj_request);
2998 ret = rbd_obj_request_wait(obj_request);
3002 ret = obj_request->result;
3006 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3007 size = (size_t) obj_request->xferred;
3008 ceph_copy_from_page_vector(pages, buf, 0, size);
3009 rbd_assert(size <= (size_t)INT_MAX);
3013 rbd_obj_request_put(obj_request);
3015 ceph_release_page_vector(pages, page_count);
3021 * Read the complete header for the given rbd device.
3023 * Returns a pointer to a dynamically-allocated buffer containing
3024 * the complete and validated header. Caller can pass the address
3025 * of a variable that will be filled in with the version of the
3026 * header object at the time it was read.
3028 * Returns a pointer-coded errno if a failure occurs.
3030 static struct rbd_image_header_ondisk *
3031 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3033 struct rbd_image_header_ondisk *ondisk = NULL;
3040 * The complete header will include an array of its 64-bit
3041 * snapshot ids, followed by the names of those snapshots as
3042 * a contiguous block of NUL-terminated strings. Note that
3043 * the number of snapshots could change by the time we read
3044 * it in, in which case we re-read it.
3051 size = sizeof (*ondisk);
3052 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3054 ondisk = kmalloc(size, GFP_KERNEL);
3056 return ERR_PTR(-ENOMEM);
3058 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3062 if ((size_t)ret < size) {
3064 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3068 if (!rbd_dev_ondisk_valid(ondisk)) {
3070 rbd_warn(rbd_dev, "invalid header");
3074 names_size = le64_to_cpu(ondisk->snap_names_len);
3075 want_count = snap_count;
3076 snap_count = le32_to_cpu(ondisk->snap_count);
3077 } while (snap_count != want_count);
3084 return ERR_PTR(ret);
3088 * reload the ondisk the header
3090 static int rbd_read_header(struct rbd_device *rbd_dev,
3091 struct rbd_image_header *header)
3093 struct rbd_image_header_ondisk *ondisk;
3096 ondisk = rbd_dev_v1_header_read(rbd_dev);
3098 return PTR_ERR(ondisk);
3099 ret = rbd_header_from_disk(header, ondisk);
3106 * only read the first part of the ondisk header, without the snaps info
3108 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3111 struct rbd_image_header h;
3113 ret = rbd_read_header(rbd_dev, &h);
3117 down_write(&rbd_dev->header_rwsem);
3119 /* Update image size, and check for resize of mapped image */
3120 rbd_dev->header.image_size = h.image_size;
3121 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3122 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3123 rbd_dev->mapping.size = rbd_dev->header.image_size;
3125 /* rbd_dev->header.object_prefix shouldn't change */
3126 kfree(rbd_dev->header.snap_sizes);
3127 kfree(rbd_dev->header.snap_names);
3128 /* osd requests may still refer to snapc */
3129 ceph_put_snap_context(rbd_dev->header.snapc);
3131 rbd_dev->header.image_size = h.image_size;
3132 rbd_dev->header.snapc = h.snapc;
3133 rbd_dev->header.snap_names = h.snap_names;
3134 rbd_dev->header.snap_sizes = h.snap_sizes;
3135 /* Free the extra copy of the object prefix */
3136 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3137 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3138 kfree(h.object_prefix);
3140 up_write(&rbd_dev->header_rwsem);
3146 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3147 * has disappeared from the (just updated) snapshot context.
3149 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3153 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3156 snap_id = rbd_dev->spec->snap_id;
3157 if (snap_id == CEPH_NOSNAP)
3160 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3161 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3164 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3169 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3170 mapping_size = rbd_dev->mapping.size;
3171 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3172 if (rbd_dev->image_format == 1)
3173 ret = rbd_dev_v1_refresh(rbd_dev);
3175 ret = rbd_dev_v2_refresh(rbd_dev);
3177 /* If it's a mapped snapshot, validate its EXISTS flag */
3179 rbd_exists_validate(rbd_dev);
3180 mutex_unlock(&ctl_mutex);
3181 if (mapping_size != rbd_dev->mapping.size) {
3184 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3185 dout("setting size to %llu sectors", (unsigned long long)size);
3186 set_capacity(rbd_dev->disk, size);
3187 revalidate_disk(rbd_dev->disk);
3193 static int rbd_init_disk(struct rbd_device *rbd_dev)
3195 struct gendisk *disk;
3196 struct request_queue *q;
3199 /* create gendisk info */
3200 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3204 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3206 disk->major = rbd_dev->major;
3207 disk->first_minor = 0;
3208 disk->fops = &rbd_bd_ops;
3209 disk->private_data = rbd_dev;
3211 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3215 /* We use the default size, but let's be explicit about it. */
3216 blk_queue_physical_block_size(q, SECTOR_SIZE);
3218 /* set io sizes to object size */
3219 segment_size = rbd_obj_bytes(&rbd_dev->header);
3220 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3221 blk_queue_max_segment_size(q, segment_size);
3222 blk_queue_io_min(q, segment_size);
3223 blk_queue_io_opt(q, segment_size);
3225 blk_queue_merge_bvec(q, rbd_merge_bvec);
3228 q->queuedata = rbd_dev;
3230 rbd_dev->disk = disk;
3243 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3245 return container_of(dev, struct rbd_device, dev);
3248 static ssize_t rbd_size_show(struct device *dev,
3249 struct device_attribute *attr, char *buf)
3251 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3253 return sprintf(buf, "%llu\n",
3254 (unsigned long long)rbd_dev->mapping.size);
3258 * Note this shows the features for whatever's mapped, which is not
3259 * necessarily the base image.
3261 static ssize_t rbd_features_show(struct device *dev,
3262 struct device_attribute *attr, char *buf)
3264 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3266 return sprintf(buf, "0x%016llx\n",
3267 (unsigned long long)rbd_dev->mapping.features);
3270 static ssize_t rbd_major_show(struct device *dev,
3271 struct device_attribute *attr, char *buf)
3273 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3276 return sprintf(buf, "%d\n", rbd_dev->major);
3278 return sprintf(buf, "(none)\n");
3282 static ssize_t rbd_client_id_show(struct device *dev,
3283 struct device_attribute *attr, char *buf)
3285 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287 return sprintf(buf, "client%lld\n",
3288 ceph_client_id(rbd_dev->rbd_client->client));
3291 static ssize_t rbd_pool_show(struct device *dev,
3292 struct device_attribute *attr, char *buf)
3294 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3296 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3299 static ssize_t rbd_pool_id_show(struct device *dev,
3300 struct device_attribute *attr, char *buf)
3302 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3304 return sprintf(buf, "%llu\n",
3305 (unsigned long long) rbd_dev->spec->pool_id);
3308 static ssize_t rbd_name_show(struct device *dev,
3309 struct device_attribute *attr, char *buf)
3311 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3313 if (rbd_dev->spec->image_name)
3314 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3316 return sprintf(buf, "(unknown)\n");
3319 static ssize_t rbd_image_id_show(struct device *dev,
3320 struct device_attribute *attr, char *buf)
3322 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3324 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3328 * Shows the name of the currently-mapped snapshot (or
3329 * RBD_SNAP_HEAD_NAME for the base image).
3331 static ssize_t rbd_snap_show(struct device *dev,
3332 struct device_attribute *attr,
3335 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3337 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3341 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3342 * for the parent image. If there is no parent, simply shows
3343 * "(no parent image)".
3345 static ssize_t rbd_parent_show(struct device *dev,
3346 struct device_attribute *attr,
3349 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3350 struct rbd_spec *spec = rbd_dev->parent_spec;
3355 return sprintf(buf, "(no parent image)\n");
3357 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3358 (unsigned long long) spec->pool_id, spec->pool_name);
3363 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3364 spec->image_name ? spec->image_name : "(unknown)");
3369 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3370 (unsigned long long) spec->snap_id, spec->snap_name);
3375 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3380 return (ssize_t) (bufp - buf);
3383 static ssize_t rbd_image_refresh(struct device *dev,
3384 struct device_attribute *attr,
3388 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3391 ret = rbd_dev_refresh(rbd_dev);
3393 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3395 return ret < 0 ? ret : size;
3398 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3399 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3400 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3401 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3402 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3403 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3404 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3405 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3406 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3407 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3408 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3410 static struct attribute *rbd_attrs[] = {
3411 &dev_attr_size.attr,
3412 &dev_attr_features.attr,
3413 &dev_attr_major.attr,
3414 &dev_attr_client_id.attr,
3415 &dev_attr_pool.attr,
3416 &dev_attr_pool_id.attr,
3417 &dev_attr_name.attr,
3418 &dev_attr_image_id.attr,
3419 &dev_attr_current_snap.attr,
3420 &dev_attr_parent.attr,
3421 &dev_attr_refresh.attr,
3425 static struct attribute_group rbd_attr_group = {
3429 static const struct attribute_group *rbd_attr_groups[] = {
3434 static void rbd_sysfs_dev_release(struct device *dev)
3438 static struct device_type rbd_device_type = {
3440 .groups = rbd_attr_groups,
3441 .release = rbd_sysfs_dev_release,
3444 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3446 kref_get(&spec->kref);
3451 static void rbd_spec_free(struct kref *kref);
3452 static void rbd_spec_put(struct rbd_spec *spec)
3455 kref_put(&spec->kref, rbd_spec_free);
3458 static struct rbd_spec *rbd_spec_alloc(void)
3460 struct rbd_spec *spec;
3462 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3465 kref_init(&spec->kref);
3470 static void rbd_spec_free(struct kref *kref)
3472 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3474 kfree(spec->pool_name);
3475 kfree(spec->image_id);
3476 kfree(spec->image_name);
3477 kfree(spec->snap_name);
3481 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3482 struct rbd_spec *spec)
3484 struct rbd_device *rbd_dev;
3486 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3490 spin_lock_init(&rbd_dev->lock);
3492 INIT_LIST_HEAD(&rbd_dev->node);
3493 init_rwsem(&rbd_dev->header_rwsem);
3495 rbd_dev->spec = spec;
3496 rbd_dev->rbd_client = rbdc;
3498 /* Initialize the layout used for all rbd requests */
3500 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3501 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3502 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3503 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3508 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3510 rbd_put_client(rbd_dev->rbd_client);
3511 rbd_spec_put(rbd_dev->spec);
3516 * Get the size and object order for an image snapshot, or if
3517 * snap_id is CEPH_NOSNAP, gets this information for the base
3520 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3521 u8 *order, u64 *snap_size)
3523 __le64 snapid = cpu_to_le64(snap_id);
3528 } __attribute__ ((packed)) size_buf = { 0 };
3530 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3532 &snapid, sizeof (snapid),
3533 &size_buf, sizeof (size_buf));
3534 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3537 if (ret < sizeof (size_buf))
3541 *order = size_buf.order;
3542 *snap_size = le64_to_cpu(size_buf.size);
3544 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3545 (unsigned long long)snap_id, (unsigned int)*order,
3546 (unsigned long long)*snap_size);
3551 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3553 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3554 &rbd_dev->header.obj_order,
3555 &rbd_dev->header.image_size);
3558 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3564 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3568 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3569 "rbd", "get_object_prefix", NULL, 0,
3570 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3571 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3576 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3577 p + ret, NULL, GFP_NOIO);
3580 if (IS_ERR(rbd_dev->header.object_prefix)) {
3581 ret = PTR_ERR(rbd_dev->header.object_prefix);
3582 rbd_dev->header.object_prefix = NULL;
3584 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3592 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3595 __le64 snapid = cpu_to_le64(snap_id);
3599 } __attribute__ ((packed)) features_buf = { 0 };
3603 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3604 "rbd", "get_features",
3605 &snapid, sizeof (snapid),
3606 &features_buf, sizeof (features_buf));
3607 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3610 if (ret < sizeof (features_buf))
3613 incompat = le64_to_cpu(features_buf.incompat);
3614 if (incompat & ~RBD_FEATURES_SUPPORTED)
3617 *snap_features = le64_to_cpu(features_buf.features);
3619 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3620 (unsigned long long)snap_id,
3621 (unsigned long long)*snap_features,
3622 (unsigned long long)le64_to_cpu(features_buf.incompat));
3627 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3629 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3630 &rbd_dev->header.features);
3633 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3635 struct rbd_spec *parent_spec;
3637 void *reply_buf = NULL;
3645 parent_spec = rbd_spec_alloc();
3649 size = sizeof (__le64) + /* pool_id */
3650 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3651 sizeof (__le64) + /* snap_id */
3652 sizeof (__le64); /* overlap */
3653 reply_buf = kmalloc(size, GFP_KERNEL);
3659 snapid = cpu_to_le64(CEPH_NOSNAP);
3660 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3661 "rbd", "get_parent",
3662 &snapid, sizeof (snapid),
3664 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3669 end = reply_buf + ret;
3671 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3672 if (parent_spec->pool_id == CEPH_NOPOOL)
3673 goto out; /* No parent? No problem. */
3675 /* The ceph file layout needs to fit pool id in 32 bits */
3678 if (parent_spec->pool_id > (u64)U32_MAX) {
3679 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3680 (unsigned long long)parent_spec->pool_id, U32_MAX);
3684 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3685 if (IS_ERR(image_id)) {
3686 ret = PTR_ERR(image_id);
3689 parent_spec->image_id = image_id;
3690 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3691 ceph_decode_64_safe(&p, end, overlap, out_err);
3693 rbd_dev->parent_overlap = overlap;
3694 rbd_dev->parent_spec = parent_spec;
3695 parent_spec = NULL; /* rbd_dev now owns this */
3700 rbd_spec_put(parent_spec);
3705 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3709 __le64 stripe_count;
3710 } __attribute__ ((packed)) striping_info_buf = { 0 };
3711 size_t size = sizeof (striping_info_buf);
3718 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3719 "rbd", "get_stripe_unit_count", NULL, 0,
3720 (char *)&striping_info_buf, size);
3721 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3728 * We don't actually support the "fancy striping" feature
3729 * (STRIPINGV2) yet, but if the striping sizes are the
3730 * defaults the behavior is the same as before. So find
3731 * out, and only fail if the image has non-default values.
3734 obj_size = (u64)1 << rbd_dev->header.obj_order;
3735 p = &striping_info_buf;
3736 stripe_unit = ceph_decode_64(&p);
3737 if (stripe_unit != obj_size) {
3738 rbd_warn(rbd_dev, "unsupported stripe unit "
3739 "(got %llu want %llu)",
3740 stripe_unit, obj_size);
3743 stripe_count = ceph_decode_64(&p);
3744 if (stripe_count != 1) {
3745 rbd_warn(rbd_dev, "unsupported stripe count "
3746 "(got %llu want 1)", stripe_count);
3749 rbd_dev->header.stripe_unit = stripe_unit;
3750 rbd_dev->header.stripe_count = stripe_count;
3755 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3757 size_t image_id_size;
3762 void *reply_buf = NULL;
3764 char *image_name = NULL;
3767 rbd_assert(!rbd_dev->spec->image_name);
3769 len = strlen(rbd_dev->spec->image_id);
3770 image_id_size = sizeof (__le32) + len;
3771 image_id = kmalloc(image_id_size, GFP_KERNEL);
3776 end = image_id + image_id_size;
3777 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3779 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3780 reply_buf = kmalloc(size, GFP_KERNEL);
3784 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3785 "rbd", "dir_get_name",
3786 image_id, image_id_size,
3791 end = reply_buf + ret;
3793 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3794 if (IS_ERR(image_name))
3797 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3805 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3807 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3808 const char *snap_name;
3811 /* Skip over names until we find the one we are looking for */
3813 snap_name = rbd_dev->header.snap_names;
3814 while (which < snapc->num_snaps) {
3815 if (!strcmp(name, snap_name))
3816 return snapc->snaps[which];
3817 snap_name += strlen(snap_name) + 1;
3823 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3825 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3830 for (which = 0; !found && which < snapc->num_snaps; which++) {
3831 const char *snap_name;
3833 snap_id = snapc->snaps[which];
3834 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3835 if (IS_ERR(snap_name))
3837 found = !strcmp(name, snap_name);
3840 return found ? snap_id : CEPH_NOSNAP;
3844 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3845 * no snapshot by that name is found, or if an error occurs.
3847 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3849 if (rbd_dev->image_format == 1)
3850 return rbd_v1_snap_id_by_name(rbd_dev, name);
3852 return rbd_v2_snap_id_by_name(rbd_dev, name);
3856 * When an rbd image has a parent image, it is identified by the
3857 * pool, image, and snapshot ids (not names). This function fills
3858 * in the names for those ids. (It's OK if we can't figure out the
3859 * name for an image id, but the pool and snapshot ids should always
3860 * exist and have names.) All names in an rbd spec are dynamically
3863 * When an image being mapped (not a parent) is probed, we have the
3864 * pool name and pool id, image name and image id, and the snapshot
3865 * name. The only thing we're missing is the snapshot id.
3867 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3869 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3870 struct rbd_spec *spec = rbd_dev->spec;
3871 const char *pool_name;
3872 const char *image_name;
3873 const char *snap_name;
3877 * An image being mapped will have the pool name (etc.), but
3878 * we need to look up the snapshot id.
3880 if (spec->pool_name) {
3881 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3884 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3885 if (snap_id == CEPH_NOSNAP)
3887 spec->snap_id = snap_id;
3889 spec->snap_id = CEPH_NOSNAP;
3895 /* Get the pool name; we have to make our own copy of this */
3897 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3899 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3902 pool_name = kstrdup(pool_name, GFP_KERNEL);
3906 /* Fetch the image name; tolerate failure here */
3908 image_name = rbd_dev_image_name(rbd_dev);
3910 rbd_warn(rbd_dev, "unable to get image name");
3912 /* Look up the snapshot name, and make a copy */
3914 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3920 spec->pool_name = pool_name;
3921 spec->image_name = image_name;
3922 spec->snap_name = snap_name;
3932 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3941 struct ceph_snap_context *snapc;
3945 * We'll need room for the seq value (maximum snapshot id),
3946 * snapshot count, and array of that many snapshot ids.
3947 * For now we have a fixed upper limit on the number we're
3948 * prepared to receive.
3950 size = sizeof (__le64) + sizeof (__le32) +
3951 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3952 reply_buf = kzalloc(size, GFP_KERNEL);
3956 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3957 "rbd", "get_snapcontext", NULL, 0,
3959 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3964 end = reply_buf + ret;
3966 ceph_decode_64_safe(&p, end, seq, out);
3967 ceph_decode_32_safe(&p, end, snap_count, out);
3970 * Make sure the reported number of snapshot ids wouldn't go
3971 * beyond the end of our buffer. But before checking that,
3972 * make sure the computed size of the snapshot context we
3973 * allocate is representable in a size_t.
3975 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3980 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3984 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3990 for (i = 0; i < snap_count; i++)
3991 snapc->snaps[i] = ceph_decode_64(&p);
3993 ceph_put_snap_context(rbd_dev->header.snapc);
3994 rbd_dev->header.snapc = snapc;
3996 dout(" snap context seq = %llu, snap_count = %u\n",
3997 (unsigned long long)seq, (unsigned int)snap_count);
4004 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4015 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4016 reply_buf = kmalloc(size, GFP_KERNEL);
4018 return ERR_PTR(-ENOMEM);
4020 snapid = cpu_to_le64(snap_id);
4021 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4022 "rbd", "get_snapshot_name",
4023 &snapid, sizeof (snapid),
4025 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4027 snap_name = ERR_PTR(ret);
4032 end = reply_buf + ret;
4033 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4034 if (IS_ERR(snap_name))
4037 dout(" snap_id 0x%016llx snap_name = %s\n",
4038 (unsigned long long)snap_id, snap_name);
4045 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4049 down_write(&rbd_dev->header_rwsem);
4051 ret = rbd_dev_v2_image_size(rbd_dev);
4054 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4055 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4056 rbd_dev->mapping.size = rbd_dev->header.image_size;
4058 ret = rbd_dev_v2_snap_context(rbd_dev);
4059 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4063 up_write(&rbd_dev->header_rwsem);
4068 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4073 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4075 dev = &rbd_dev->dev;
4076 dev->bus = &rbd_bus_type;
4077 dev->type = &rbd_device_type;
4078 dev->parent = &rbd_root_dev;
4079 dev->release = rbd_dev_device_release;
4080 dev_set_name(dev, "%d", rbd_dev->dev_id);
4081 ret = device_register(dev);
4083 mutex_unlock(&ctl_mutex);
4088 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4090 device_unregister(&rbd_dev->dev);
4093 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4096 * Get a unique rbd identifier for the given new rbd_dev, and add
4097 * the rbd_dev to the global list. The minimum rbd id is 1.
4099 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4101 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4103 spin_lock(&rbd_dev_list_lock);
4104 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4105 spin_unlock(&rbd_dev_list_lock);
4106 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4107 (unsigned long long) rbd_dev->dev_id);
4111 * Remove an rbd_dev from the global list, and record that its
4112 * identifier is no longer in use.
4114 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4116 struct list_head *tmp;
4117 int rbd_id = rbd_dev->dev_id;
4120 rbd_assert(rbd_id > 0);
4122 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4123 (unsigned long long) rbd_dev->dev_id);
4124 spin_lock(&rbd_dev_list_lock);
4125 list_del_init(&rbd_dev->node);
4128 * If the id being "put" is not the current maximum, there
4129 * is nothing special we need to do.
4131 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4132 spin_unlock(&rbd_dev_list_lock);
4137 * We need to update the current maximum id. Search the
4138 * list to find out what it is. We're more likely to find
4139 * the maximum at the end, so search the list backward.
4142 list_for_each_prev(tmp, &rbd_dev_list) {
4143 struct rbd_device *rbd_dev;
4145 rbd_dev = list_entry(tmp, struct rbd_device, node);
4146 if (rbd_dev->dev_id > max_id)
4147 max_id = rbd_dev->dev_id;
4149 spin_unlock(&rbd_dev_list_lock);
4152 * The max id could have been updated by rbd_dev_id_get(), in
4153 * which case it now accurately reflects the new maximum.
4154 * Be careful not to overwrite the maximum value in that
4157 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4158 dout(" max dev id has been reset\n");
4162 * Skips over white space at *buf, and updates *buf to point to the
4163 * first found non-space character (if any). Returns the length of
4164 * the token (string of non-white space characters) found. Note
4165 * that *buf must be terminated with '\0'.
4167 static inline size_t next_token(const char **buf)
4170 * These are the characters that produce nonzero for
4171 * isspace() in the "C" and "POSIX" locales.
4173 const char *spaces = " \f\n\r\t\v";
4175 *buf += strspn(*buf, spaces); /* Find start of token */
4177 return strcspn(*buf, spaces); /* Return token length */
4181 * Finds the next token in *buf, and if the provided token buffer is
4182 * big enough, copies the found token into it. The result, if
4183 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4184 * must be terminated with '\0' on entry.
4186 * Returns the length of the token found (not including the '\0').
4187 * Return value will be 0 if no token is found, and it will be >=
4188 * token_size if the token would not fit.
4190 * The *buf pointer will be updated to point beyond the end of the
4191 * found token. Note that this occurs even if the token buffer is
4192 * too small to hold it.
4194 static inline size_t copy_token(const char **buf,
4200 len = next_token(buf);
4201 if (len < token_size) {
4202 memcpy(token, *buf, len);
4203 *(token + len) = '\0';
4211 * Finds the next token in *buf, dynamically allocates a buffer big
4212 * enough to hold a copy of it, and copies the token into the new
4213 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4214 * that a duplicate buffer is created even for a zero-length token.
4216 * Returns a pointer to the newly-allocated duplicate, or a null
4217 * pointer if memory for the duplicate was not available. If
4218 * the lenp argument is a non-null pointer, the length of the token
4219 * (not including the '\0') is returned in *lenp.
4221 * If successful, the *buf pointer will be updated to point beyond
4222 * the end of the found token.
4224 * Note: uses GFP_KERNEL for allocation.
4226 static inline char *dup_token(const char **buf, size_t *lenp)
4231 len = next_token(buf);
4232 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4235 *(dup + len) = '\0';
4245 * Parse the options provided for an "rbd add" (i.e., rbd image
4246 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4247 * and the data written is passed here via a NUL-terminated buffer.
4248 * Returns 0 if successful or an error code otherwise.
4250 * The information extracted from these options is recorded in
4251 * the other parameters which return dynamically-allocated
4254 * The address of a pointer that will refer to a ceph options
4255 * structure. Caller must release the returned pointer using
4256 * ceph_destroy_options() when it is no longer needed.
4258 * Address of an rbd options pointer. Fully initialized by
4259 * this function; caller must release with kfree().
4261 * Address of an rbd image specification pointer. Fully
4262 * initialized by this function based on parsed options.
4263 * Caller must release with rbd_spec_put().
4265 * The options passed take this form:
4266 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4269 * A comma-separated list of one or more monitor addresses.
4270 * A monitor address is an ip address, optionally followed
4271 * by a port number (separated by a colon).
4272 * I.e.: ip1[:port1][,ip2[:port2]...]
4274 * A comma-separated list of ceph and/or rbd options.
4276 * The name of the rados pool containing the rbd image.
4278 * The name of the image in that pool to map.
4280 * An optional snapshot id. If provided, the mapping will
4281 * present data from the image at the time that snapshot was
4282 * created. The image head is used if no snapshot id is
4283 * provided. Snapshot mappings are always read-only.
4285 static int rbd_add_parse_args(const char *buf,
4286 struct ceph_options **ceph_opts,
4287 struct rbd_options **opts,
4288 struct rbd_spec **rbd_spec)
4292 const char *mon_addrs;
4294 size_t mon_addrs_size;
4295 struct rbd_spec *spec = NULL;
4296 struct rbd_options *rbd_opts = NULL;
4297 struct ceph_options *copts;
4300 /* The first four tokens are required */
4302 len = next_token(&buf);
4304 rbd_warn(NULL, "no monitor address(es) provided");
4308 mon_addrs_size = len + 1;
4312 options = dup_token(&buf, NULL);
4316 rbd_warn(NULL, "no options provided");
4320 spec = rbd_spec_alloc();
4324 spec->pool_name = dup_token(&buf, NULL);
4325 if (!spec->pool_name)
4327 if (!*spec->pool_name) {
4328 rbd_warn(NULL, "no pool name provided");
4332 spec->image_name = dup_token(&buf, NULL);
4333 if (!spec->image_name)
4335 if (!*spec->image_name) {
4336 rbd_warn(NULL, "no image name provided");
4341 * Snapshot name is optional; default is to use "-"
4342 * (indicating the head/no snapshot).
4344 len = next_token(&buf);
4346 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4347 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4348 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4349 ret = -ENAMETOOLONG;
4352 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4355 *(snap_name + len) = '\0';
4356 spec->snap_name = snap_name;
4358 /* Initialize all rbd options to the defaults */
4360 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4364 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4366 copts = ceph_parse_options(options, mon_addrs,
4367 mon_addrs + mon_addrs_size - 1,
4368 parse_rbd_opts_token, rbd_opts);
4369 if (IS_ERR(copts)) {
4370 ret = PTR_ERR(copts);
4391 * An rbd format 2 image has a unique identifier, distinct from the
4392 * name given to it by the user. Internally, that identifier is
4393 * what's used to specify the names of objects related to the image.
4395 * A special "rbd id" object is used to map an rbd image name to its
4396 * id. If that object doesn't exist, then there is no v2 rbd image
4397 * with the supplied name.
4399 * This function will record the given rbd_dev's image_id field if
4400 * it can be determined, and in that case will return 0. If any
4401 * errors occur a negative errno will be returned and the rbd_dev's
4402 * image_id field will be unchanged (and should be NULL).
4404 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4413 * When probing a parent image, the image id is already
4414 * known (and the image name likely is not). There's no
4415 * need to fetch the image id again in this case. We
4416 * do still need to set the image format though.
4418 if (rbd_dev->spec->image_id) {
4419 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4425 * First, see if the format 2 image id file exists, and if
4426 * so, get the image's persistent id from it.
4428 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4429 object_name = kmalloc(size, GFP_NOIO);
4432 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4433 dout("rbd id object name is %s\n", object_name);
4435 /* Response will be an encoded string, which includes a length */
4437 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4438 response = kzalloc(size, GFP_NOIO);
4444 /* If it doesn't exist we'll assume it's a format 1 image */
4446 ret = rbd_obj_method_sync(rbd_dev, object_name,
4447 "rbd", "get_id", NULL, 0,
4448 response, RBD_IMAGE_ID_LEN_MAX);
4449 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4450 if (ret == -ENOENT) {
4451 image_id = kstrdup("", GFP_KERNEL);
4452 ret = image_id ? 0 : -ENOMEM;
4454 rbd_dev->image_format = 1;
4455 } else if (ret > sizeof (__le32)) {
4458 image_id = ceph_extract_encoded_string(&p, p + ret,
4460 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4462 rbd_dev->image_format = 2;
4468 rbd_dev->spec->image_id = image_id;
4469 dout("image_id is %s\n", image_id);
4478 /* Undo whatever state changes are made by v1 or v2 image probe */
4480 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4482 struct rbd_image_header *header;
4484 rbd_dev_remove_parent(rbd_dev);
4485 rbd_spec_put(rbd_dev->parent_spec);
4486 rbd_dev->parent_spec = NULL;
4487 rbd_dev->parent_overlap = 0;
4489 /* Free dynamic fields from the header, then zero it out */
4491 header = &rbd_dev->header;
4492 ceph_put_snap_context(header->snapc);
4493 kfree(header->snap_sizes);
4494 kfree(header->snap_names);
4495 kfree(header->object_prefix);
4496 memset(header, 0, sizeof (*header));
4499 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4503 /* Populate rbd image metadata */
4505 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4509 /* Version 1 images have no parent (no layering) */
4511 rbd_dev->parent_spec = NULL;
4512 rbd_dev->parent_overlap = 0;
4514 dout("discovered version 1 image, header name is %s\n",
4515 rbd_dev->header_name);
4520 kfree(rbd_dev->header_name);
4521 rbd_dev->header_name = NULL;
4522 kfree(rbd_dev->spec->image_id);
4523 rbd_dev->spec->image_id = NULL;
4528 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4532 ret = rbd_dev_v2_image_size(rbd_dev);
4536 /* Get the object prefix (a.k.a. block_name) for the image */
4538 ret = rbd_dev_v2_object_prefix(rbd_dev);
4542 /* Get the and check features for the image */
4544 ret = rbd_dev_v2_features(rbd_dev);
4548 /* If the image supports layering, get the parent info */
4550 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4551 ret = rbd_dev_v2_parent_info(rbd_dev);
4555 * Print a warning if this image has a parent.
4556 * Don't print it if the image now being probed
4557 * is itself a parent. We can tell at this point
4558 * because we won't know its pool name yet (just its
4561 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4562 rbd_warn(rbd_dev, "WARNING: kernel layering "
4563 "is EXPERIMENTAL!");
4566 /* If the image supports fancy striping, get its parameters */
4568 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4569 ret = rbd_dev_v2_striping_info(rbd_dev);
4574 /* crypto and compression type aren't (yet) supported for v2 images */
4576 rbd_dev->header.crypt_type = 0;
4577 rbd_dev->header.comp_type = 0;
4579 /* Get the snapshot context, plus the header version */
4581 ret = rbd_dev_v2_snap_context(rbd_dev);
4585 dout("discovered version 2 image, header name is %s\n",
4586 rbd_dev->header_name);
4590 rbd_dev->parent_overlap = 0;
4591 rbd_spec_put(rbd_dev->parent_spec);
4592 rbd_dev->parent_spec = NULL;
4593 kfree(rbd_dev->header_name);
4594 rbd_dev->header_name = NULL;
4595 kfree(rbd_dev->header.object_prefix);
4596 rbd_dev->header.object_prefix = NULL;
4601 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4603 struct rbd_device *parent = NULL;
4604 struct rbd_spec *parent_spec;
4605 struct rbd_client *rbdc;
4608 if (!rbd_dev->parent_spec)
4611 * We need to pass a reference to the client and the parent
4612 * spec when creating the parent rbd_dev. Images related by
4613 * parent/child relationships always share both.
4615 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4616 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4619 parent = rbd_dev_create(rbdc, parent_spec);
4623 ret = rbd_dev_image_probe(parent);
4626 rbd_dev->parent = parent;
4631 rbd_spec_put(rbd_dev->parent_spec);
4632 kfree(rbd_dev->header_name);
4633 rbd_dev_destroy(parent);
4635 rbd_put_client(rbdc);
4636 rbd_spec_put(parent_spec);
4642 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4646 ret = rbd_dev_mapping_set(rbd_dev);
4650 /* generate unique id: find highest unique id, add one */
4651 rbd_dev_id_get(rbd_dev);
4653 /* Fill in the device name, now that we have its id. */
4654 BUILD_BUG_ON(DEV_NAME_LEN
4655 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4656 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4658 /* Get our block major device number. */
4660 ret = register_blkdev(0, rbd_dev->name);
4663 rbd_dev->major = ret;
4665 /* Set up the blkdev mapping. */
4667 ret = rbd_init_disk(rbd_dev);
4669 goto err_out_blkdev;
4671 ret = rbd_bus_add_dev(rbd_dev);
4675 /* Everything's ready. Announce the disk to the world. */
4677 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4678 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4679 add_disk(rbd_dev->disk);
4681 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4682 (unsigned long long) rbd_dev->mapping.size);
4687 rbd_free_disk(rbd_dev);
4689 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4691 rbd_dev_id_put(rbd_dev);
4692 rbd_dev_mapping_clear(rbd_dev);
4697 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4699 struct rbd_spec *spec = rbd_dev->spec;
4702 /* Record the header object name for this rbd image. */
4704 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4706 if (rbd_dev->image_format == 1)
4707 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4709 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4711 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4712 if (!rbd_dev->header_name)
4715 if (rbd_dev->image_format == 1)
4716 sprintf(rbd_dev->header_name, "%s%s",
4717 spec->image_name, RBD_SUFFIX);
4719 sprintf(rbd_dev->header_name, "%s%s",
4720 RBD_HEADER_PREFIX, spec->image_id);
4724 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4728 rbd_dev_unprobe(rbd_dev);
4729 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4731 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4732 kfree(rbd_dev->header_name);
4733 rbd_dev->header_name = NULL;
4734 rbd_dev->image_format = 0;
4735 kfree(rbd_dev->spec->image_id);
4736 rbd_dev->spec->image_id = NULL;
4738 rbd_dev_destroy(rbd_dev);
4742 * Probe for the existence of the header object for the given rbd
4743 * device. For format 2 images this includes determining the image
4746 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4752 * Get the id from the image id object. If it's not a
4753 * format 2 image, we'll get ENOENT back, and we'll assume
4754 * it's a format 1 image.
4756 ret = rbd_dev_image_id(rbd_dev);
4759 rbd_assert(rbd_dev->spec->image_id);
4760 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4762 ret = rbd_dev_header_name(rbd_dev);
4764 goto err_out_format;
4766 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4768 goto out_header_name;
4770 if (rbd_dev->image_format == 1)
4771 ret = rbd_dev_v1_probe(rbd_dev);
4773 ret = rbd_dev_v2_probe(rbd_dev);
4777 ret = rbd_dev_spec_update(rbd_dev);
4781 ret = rbd_dev_probe_parent(rbd_dev);
4786 rbd_dev_unprobe(rbd_dev);
4788 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4790 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4792 kfree(rbd_dev->header_name);
4793 rbd_dev->header_name = NULL;
4795 rbd_dev->image_format = 0;
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4799 dout("probe failed, returning %d\n", ret);
4804 static ssize_t rbd_add(struct bus_type *bus,
4808 struct rbd_device *rbd_dev = NULL;
4809 struct ceph_options *ceph_opts = NULL;
4810 struct rbd_options *rbd_opts = NULL;
4811 struct rbd_spec *spec = NULL;
4812 struct rbd_client *rbdc;
4813 struct ceph_osd_client *osdc;
4816 if (!try_module_get(THIS_MODULE))
4819 /* parse add command */
4820 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4822 goto err_out_module;
4824 rbdc = rbd_get_client(ceph_opts);
4829 ceph_opts = NULL; /* rbd_dev client now owns this */
4832 osdc = &rbdc->client->osdc;
4833 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4835 goto err_out_client;
4836 spec->pool_id = (u64)rc;
4838 /* The ceph file layout needs to fit pool id in 32 bits */
4840 if (spec->pool_id > (u64)U32_MAX) {
4841 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4842 (unsigned long long)spec->pool_id, U32_MAX);
4844 goto err_out_client;
4847 rbd_dev = rbd_dev_create(rbdc, spec);
4849 goto err_out_client;
4850 rbdc = NULL; /* rbd_dev now owns this */
4851 spec = NULL; /* rbd_dev now owns this */
4853 rbd_dev->mapping.read_only = rbd_opts->read_only;
4855 rbd_opts = NULL; /* done with this */
4857 rc = rbd_dev_image_probe(rbd_dev);
4859 goto err_out_rbd_dev;
4861 rc = rbd_dev_device_setup(rbd_dev);
4865 rbd_dev_image_release(rbd_dev);
4867 rbd_dev_destroy(rbd_dev);
4869 rbd_put_client(rbdc);
4872 ceph_destroy_options(ceph_opts);
4876 module_put(THIS_MODULE);
4878 dout("Error adding device %s\n", buf);
4883 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4885 struct list_head *tmp;
4886 struct rbd_device *rbd_dev;
4888 spin_lock(&rbd_dev_list_lock);
4889 list_for_each(tmp, &rbd_dev_list) {
4890 rbd_dev = list_entry(tmp, struct rbd_device, node);
4891 if (rbd_dev->dev_id == dev_id) {
4892 spin_unlock(&rbd_dev_list_lock);
4896 spin_unlock(&rbd_dev_list_lock);
4900 static void rbd_dev_device_release(struct device *dev)
4902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4904 rbd_free_disk(rbd_dev);
4905 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4906 rbd_dev_mapping_clear(rbd_dev);
4907 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4909 rbd_dev_id_put(rbd_dev);
4910 rbd_dev_mapping_clear(rbd_dev);
4913 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4915 while (rbd_dev->parent) {
4916 struct rbd_device *first = rbd_dev;
4917 struct rbd_device *second = first->parent;
4918 struct rbd_device *third;
4921 * Follow to the parent with no grandparent and
4924 while (second && (third = second->parent)) {
4929 rbd_dev_image_release(second);
4930 first->parent = NULL;
4931 first->parent_overlap = 0;
4933 rbd_assert(first->parent_spec);
4934 rbd_spec_put(first->parent_spec);
4935 first->parent_spec = NULL;
4939 static ssize_t rbd_remove(struct bus_type *bus,
4943 struct rbd_device *rbd_dev = NULL;
4948 ret = strict_strtoul(buf, 10, &ul);
4952 /* convert to int; abort if we lost anything in the conversion */
4953 target_id = (int) ul;
4954 if (target_id != ul)
4957 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4959 rbd_dev = __rbd_get_dev(target_id);
4965 spin_lock_irq(&rbd_dev->lock);
4966 if (rbd_dev->open_count)
4969 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4970 spin_unlock_irq(&rbd_dev->lock);
4974 rbd_bus_del_dev(rbd_dev);
4975 rbd_dev_image_release(rbd_dev);
4976 module_put(THIS_MODULE);
4978 mutex_unlock(&ctl_mutex);
4984 * create control files in sysfs
4987 static int rbd_sysfs_init(void)
4991 ret = device_register(&rbd_root_dev);
4995 ret = bus_register(&rbd_bus_type);
4997 device_unregister(&rbd_root_dev);
5002 static void rbd_sysfs_cleanup(void)
5004 bus_unregister(&rbd_bus_type);
5005 device_unregister(&rbd_root_dev);
5008 static int rbd_slab_init(void)
5010 rbd_assert(!rbd_img_request_cache);
5011 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012 sizeof (struct rbd_img_request),
5013 __alignof__(struct rbd_img_request),
5015 if (!rbd_img_request_cache)
5018 rbd_assert(!rbd_obj_request_cache);
5019 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020 sizeof (struct rbd_obj_request),
5021 __alignof__(struct rbd_obj_request),
5023 if (!rbd_obj_request_cache)
5026 rbd_assert(!rbd_segment_name_cache);
5027 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029 if (rbd_segment_name_cache)
5032 if (rbd_obj_request_cache) {
5033 kmem_cache_destroy(rbd_obj_request_cache);
5034 rbd_obj_request_cache = NULL;
5037 kmem_cache_destroy(rbd_img_request_cache);
5038 rbd_img_request_cache = NULL;
5043 static void rbd_slab_exit(void)
5045 rbd_assert(rbd_segment_name_cache);
5046 kmem_cache_destroy(rbd_segment_name_cache);
5047 rbd_segment_name_cache = NULL;
5049 rbd_assert(rbd_obj_request_cache);
5050 kmem_cache_destroy(rbd_obj_request_cache);
5051 rbd_obj_request_cache = NULL;
5053 rbd_assert(rbd_img_request_cache);
5054 kmem_cache_destroy(rbd_img_request_cache);
5055 rbd_img_request_cache = NULL;
5058 static int __init rbd_init(void)
5062 if (!libceph_compatible(NULL)) {
5063 rbd_warn(NULL, "libceph incompatibility (quitting)");
5067 rc = rbd_slab_init();
5070 rc = rbd_sysfs_init();
5074 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5079 static void __exit rbd_exit(void)
5081 rbd_sysfs_cleanup();
5085 module_init(rbd_init);
5086 module_exit(rbd_exit);
5088 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090 MODULE_DESCRIPTION("rados block device");
5092 /* following authorship retained from original osdblk.c */
5093 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5095 MODULE_LICENSE("GPL");