2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 rbd_get_dev(rbd_dev);
251 set_device_ro(bdev, rbd_dev->read_only);
253 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 struct rbd_device *rbd_dev = disk->private_data;
263 rbd_put_dev(rbd_dev);
268 static const struct block_device_operations rbd_bd_ops = {
269 .owner = THIS_MODULE,
271 .release = rbd_release,
275 * Initialize an rbd client instance.
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279 struct rbd_options *rbd_opts)
281 struct rbd_client *rbdc;
284 dout("rbd_client_create\n");
285 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
289 kref_init(&rbdc->kref);
290 INIT_LIST_HEAD(&rbdc->node);
292 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295 if (IS_ERR(rbdc->client))
297 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
299 ret = ceph_open_session(rbdc->client);
303 rbdc->rbd_opts = rbd_opts;
305 spin_lock(&rbd_client_list_lock);
306 list_add_tail(&rbdc->node, &rbd_client_list);
307 spin_unlock(&rbd_client_list_lock);
309 mutex_unlock(&ctl_mutex);
311 dout("rbd_client_create created %p\n", rbdc);
315 ceph_destroy_client(rbdc->client);
317 mutex_unlock(&ctl_mutex);
321 ceph_destroy_options(ceph_opts);
326 * Find a ceph client with specific addr and configuration.
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
330 struct rbd_client *client_node;
332 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335 list_for_each_entry(client_node, &rbd_client_list, node)
336 if (!ceph_compare_options(ceph_opts, client_node->client))
349 /* string args above */
352 static match_table_t rbd_opts_tokens = {
353 {Opt_notify_timeout, "notify_timeout=%d"},
355 /* string args above */
359 static int parse_rbd_opts_token(char *c, void *private)
361 struct rbd_options *rbd_opts = private;
362 substring_t argstr[MAX_OPT_ARGS];
363 int token, intval, ret;
365 token = match_token(c, rbd_opts_tokens, argstr);
369 if (token < Opt_last_int) {
370 ret = match_int(&argstr[0], &intval);
372 pr_err("bad mount option arg (not int) "
376 dout("got int token %d val %d\n", token, intval);
377 } else if (token > Opt_last_int && token < Opt_last_string) {
378 dout("got string token %d val %s\n", token,
381 dout("got token %d\n", token);
385 case Opt_notify_timeout:
386 rbd_opts->notify_timeout = intval;
395 * Get a ceph client with specific addr and configuration, if one does
396 * not exist create it.
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
402 struct rbd_client *rbdc;
403 struct ceph_options *ceph_opts;
404 struct rbd_options *rbd_opts;
406 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408 return ERR_PTR(-ENOMEM);
410 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412 ceph_opts = ceph_parse_options(options, mon_addr,
413 mon_addr + mon_addr_len,
414 parse_rbd_opts_token, rbd_opts);
415 if (IS_ERR(ceph_opts)) {
417 return ERR_CAST(ceph_opts);
420 spin_lock(&rbd_client_list_lock);
421 rbdc = __rbd_client_find(ceph_opts);
423 /* using an existing client */
424 kref_get(&rbdc->kref);
425 spin_unlock(&rbd_client_list_lock);
427 ceph_destroy_options(ceph_opts);
432 spin_unlock(&rbd_client_list_lock);
434 rbdc = rbd_client_create(ceph_opts, rbd_opts);
443 * Destroy ceph client
445 * Caller must hold rbd_client_list_lock.
447 static void rbd_client_release(struct kref *kref)
449 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451 dout("rbd_release_client %p\n", rbdc);
452 spin_lock(&rbd_client_list_lock);
453 list_del(&rbdc->node);
454 spin_unlock(&rbd_client_list_lock);
456 ceph_destroy_client(rbdc->client);
457 kfree(rbdc->rbd_opts);
462 * Drop reference to ceph client node. If it's not referenced anymore, release
465 static void rbd_put_client(struct rbd_device *rbd_dev)
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 rbd_dev->rbd_client = NULL;
472 * Destroy requests collection
474 static void rbd_coll_release(struct kref *kref)
476 struct rbd_req_coll *coll =
477 container_of(kref, struct rbd_req_coll, kref);
479 dout("rbd_coll_release %p\n", coll);
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
485 return !memcmp(&ondisk->text,
486 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
490 * Create a new header structure, translate header format from the on-disk
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494 struct rbd_image_header_ondisk *ondisk,
499 if (!rbd_dev_ondisk_valid(ondisk))
502 snap_count = le32_to_cpu(ondisk->snap_count);
503 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
506 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507 snap_count * sizeof(u64),
512 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514 header->snap_names = kmalloc(header->snap_names_len,
516 if (!header->snap_names)
518 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
520 if (!header->snap_sizes)
523 header->snap_names = NULL;
524 header->snap_sizes = NULL;
527 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529 if (!header->object_prefix)
532 memcpy(header->object_prefix, ondisk->block_name,
533 sizeof(ondisk->block_name));
534 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536 header->image_size = le64_to_cpu(ondisk->image_size);
537 header->obj_order = ondisk->options.order;
538 header->crypt_type = ondisk->options.crypt_type;
539 header->comp_type = ondisk->options.comp_type;
541 atomic_set(&header->snapc->nref, 1);
542 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543 header->snapc->num_snaps = snap_count;
544 header->total_snaps = snap_count;
546 if (snap_count && allocated_snaps == snap_count) {
547 for (i = 0; i < snap_count; i++) {
548 header->snapc->snaps[i] =
549 le64_to_cpu(ondisk->snaps[i].id);
550 header->snap_sizes[i] =
551 le64_to_cpu(ondisk->snaps[i].image_size);
554 /* copy snapshot names */
555 memcpy(header->snap_names, &ondisk->snaps[i],
556 header->snap_names_len);
562 kfree(header->snap_sizes);
564 kfree(header->snap_names);
566 kfree(header->snapc);
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
574 char *p = header->snap_names;
576 for (i = 0; i < header->total_snaps; i++) {
577 if (!strcmp(snap_name, p)) {
579 /* Found it. Pass back its id and/or size */
582 *seq = header->snapc->snaps[i];
584 *size = header->snap_sizes[i];
587 p += strlen(p) + 1; /* Skip ahead to the next name */
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
596 down_write(&rbd_dev->header_rwsem);
598 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599 sizeof (RBD_SNAP_HEAD_NAME))) {
600 rbd_dev->snap_id = CEPH_NOSNAP;
601 rbd_dev->snap_exists = false;
602 rbd_dev->read_only = 0;
604 *size = rbd_dev->header.image_size;
608 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
612 rbd_dev->snap_id = snap_id;
613 rbd_dev->snap_exists = true;
614 rbd_dev->read_only = 1;
619 up_write(&rbd_dev->header_rwsem);
623 static void rbd_header_free(struct rbd_image_header *header)
625 kfree(header->object_prefix);
626 kfree(header->snap_sizes);
627 kfree(header->snap_names);
628 ceph_put_snap_context(header->snapc);
632 * get the actual striped segment name, offset and length
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635 const char *object_prefix,
637 char *seg_name, u64 *segofs)
639 u64 seg = ofs >> header->obj_order;
642 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643 "%s.%012llx", object_prefix, seg);
645 ofs = ofs & ((1 << header->obj_order) - 1);
646 len = min_t(u64, len, (1 << header->obj_order) - ofs);
654 static int rbd_get_num_segments(struct rbd_image_header *header,
657 u64 start_seg = ofs >> header->obj_order;
658 u64 end_seg = (ofs + len - 1) >> header->obj_order;
659 return end_seg - start_seg + 1;
663 * returns the size of an object in the image
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
667 return 1 << header->obj_order;
674 static void bio_chain_put(struct bio *chain)
680 chain = chain->bi_next;
686 * zeros a bio chain, starting at specific offset
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 bio_for_each_segment(bv, chain, i) {
698 if (pos + bv->bv_len > start_ofs) {
699 int remainder = max(start_ofs - pos, 0);
700 buf = bvec_kmap_irq(bv, &flags);
701 memset(buf + remainder, 0,
702 bv->bv_len - remainder);
703 bvec_kunmap_irq(buf, &flags);
708 chain = chain->bi_next;
713 * bio_chain_clone - clone a chain of bios up to a certain length.
714 * might return a bio_pair that will need to be released.
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717 struct bio_pair **bp,
718 int len, gfp_t gfpmask)
720 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
724 bio_pair_release(*bp);
728 while (old_chain && (total < len)) {
729 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
733 if (total + old_chain->bi_size > len) {
737 * this split can only happen with a single paged bio,
738 * split_bio will BUG_ON if this is not the case
740 dout("bio_chain_clone split! total=%d remaining=%d"
742 total, len - total, old_chain->bi_size);
744 /* split the bio. We'll release it either in the next
745 call, or it will have to be released outside */
746 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
750 __bio_clone(tmp, &bp->bio1);
754 __bio_clone(tmp, old_chain);
755 *next = old_chain->bi_next;
759 gfpmask &= ~__GFP_WAIT;
763 new_chain = tail = tmp;
768 old_chain = old_chain->bi_next;
770 total += tmp->bi_size;
776 tail->bi_next = NULL;
783 dout("bio_chain_clone with err\n");
784 bio_chain_put(new_chain);
789 * helpers for osd request op vectors.
791 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
792 int opcode, u32 payload_len)
794 struct ceph_osd_req_op *ops;
796 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
803 * op extent offset and length will be set later on
804 * in calc_raw_layout()
806 ops[0].payload_len = payload_len;
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
816 static void rbd_coll_end_req_index(struct request *rq,
817 struct rbd_req_coll *coll,
821 struct request_queue *q;
824 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
825 coll, index, ret, (unsigned long long) len);
831 blk_end_request(rq, ret, len);
837 spin_lock_irq(q->queue_lock);
838 coll->status[index].done = 1;
839 coll->status[index].rc = ret;
840 coll->status[index].bytes = len;
841 max = min = coll->num_done;
842 while (max < coll->total && coll->status[max].done)
845 for (i = min; i<max; i++) {
846 __blk_end_request(rq, coll->status[i].rc,
847 coll->status[i].bytes);
849 kref_put(&coll->kref, rbd_coll_release);
851 spin_unlock_irq(q->queue_lock);
854 static void rbd_coll_end_req(struct rbd_request *req,
857 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
861 * Send ceph osd request
863 static int rbd_do_request(struct request *rq,
864 struct rbd_device *rbd_dev,
865 struct ceph_snap_context *snapc,
867 const char *object_name, u64 ofs, u64 len,
872 struct ceph_osd_req_op *ops,
873 struct rbd_req_coll *coll,
875 void (*rbd_cb)(struct ceph_osd_request *req,
876 struct ceph_msg *msg),
877 struct ceph_osd_request **linger_req,
880 struct ceph_osd_request *req;
881 struct ceph_file_layout *layout;
884 struct timespec mtime = CURRENT_TIME;
885 struct rbd_request *req_data;
886 struct ceph_osd_request_head *reqhead;
887 struct ceph_osd_client *osdc;
889 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
892 rbd_coll_end_req_index(rq, coll, coll_index,
898 req_data->coll = coll;
899 req_data->coll_index = coll_index;
902 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
903 (unsigned long long) ofs, (unsigned long long) len);
905 osdc = &rbd_dev->rbd_client->client->osdc;
906 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907 false, GFP_NOIO, pages, bio);
913 req->r_callback = rbd_cb;
917 req_data->pages = pages;
920 req->r_priv = req_data;
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
925 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
926 req->r_oid_len = strlen(req->r_oid);
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
934 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
937 ceph_osdc_build_request(req, ofs, &len,
941 req->r_oid, req->r_oid_len);
944 ceph_osdc_set_request_linger(osdc, req);
948 ret = ceph_osdc_start_request(osdc, req, false);
953 ret = ceph_osdc_wait_request(osdc, req);
955 *ver = le64_to_cpu(req->r_reassert_version.version);
956 dout("reassert_ver=%llu\n",
958 le64_to_cpu(req->r_reassert_version.version));
959 ceph_osdc_put_request(req);
964 bio_chain_put(req_data->bio);
965 ceph_osdc_put_request(req);
967 rbd_coll_end_req(req_data, ret, len);
973 * Ceph osd op callback
975 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
977 struct rbd_request *req_data = req->r_priv;
978 struct ceph_osd_reply_head *replyhead;
979 struct ceph_osd_op *op;
985 replyhead = msg->front.iov_base;
986 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
987 op = (void *)(replyhead + 1);
988 rc = le32_to_cpu(replyhead->result);
989 bytes = le64_to_cpu(op->extent.length);
990 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
992 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
993 (unsigned long long) bytes, read_op, (int) rc);
995 if (rc == -ENOENT && read_op) {
996 zero_bio_chain(req_data->bio, 0);
998 } else if (rc == 0 && read_op && bytes < req_data->len) {
999 zero_bio_chain(req_data->bio, bytes);
1000 bytes = req_data->len;
1003 rbd_coll_end_req(req_data, rc, bytes);
1006 bio_chain_put(req_data->bio);
1008 ceph_osdc_put_request(req);
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014 ceph_osdc_put_request(req);
1018 * Do a synchronous ceph osd operation
1020 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1021 struct ceph_snap_context *snapc,
1024 struct ceph_osd_req_op *ops,
1025 const char *object_name,
1028 struct ceph_osd_request **linger_req,
1032 struct page **pages;
1035 BUG_ON(ops == NULL);
1037 num_pages = calc_pages_for(ofs , len);
1038 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1040 return PTR_ERR(pages);
1042 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1043 object_name, ofs, len, NULL,
1053 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1054 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1057 ceph_release_page_vector(pages, num_pages);
1062 * Do an asynchronous ceph osd operation
1064 static int rbd_do_op(struct request *rq,
1065 struct rbd_device *rbd_dev,
1066 struct ceph_snap_context *snapc,
1068 int opcode, int flags,
1071 struct rbd_req_coll *coll,
1078 struct ceph_osd_req_op *ops;
1081 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1085 seg_len = rbd_get_segment(&rbd_dev->header,
1086 rbd_dev->header.object_prefix,
1088 seg_name, &seg_ofs);
1090 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1093 ops = rbd_create_rw_ops(1, opcode, payload_len);
1097 /* we've taken care of segment sizes earlier when we
1098 cloned the bios. We should never have a segment
1099 truncated at this point */
1100 BUG_ON(seg_len < len);
1102 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1103 seg_name, seg_ofs, seg_len,
1109 rbd_req_cb, 0, NULL);
1111 rbd_destroy_ops(ops);
1118 * Request async osd write
1120 static int rbd_req_write(struct request *rq,
1121 struct rbd_device *rbd_dev,
1122 struct ceph_snap_context *snapc,
1125 struct rbd_req_coll *coll,
1128 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1130 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1131 ofs, len, bio, coll, coll_index);
1135 * Request async osd read
1137 static int rbd_req_read(struct request *rq,
1138 struct rbd_device *rbd_dev,
1142 struct rbd_req_coll *coll,
1145 return rbd_do_op(rq, rbd_dev, NULL,
1149 ofs, len, bio, coll, coll_index);
1153 * Request sync osd read
1155 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1157 const char *object_name,
1162 struct ceph_osd_req_op *ops;
1165 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1169 ret = rbd_req_sync_op(rbd_dev, NULL,
1172 ops, object_name, ofs, len, buf, NULL, ver);
1173 rbd_destroy_ops(ops);
1179 * Request sync osd watch
1181 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1185 struct ceph_osd_req_op *ops;
1188 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1192 ops[0].watch.ver = cpu_to_le64(ver);
1193 ops[0].watch.cookie = notify_id;
1194 ops[0].watch.flag = 0;
1196 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1197 rbd_dev->header_name, 0, 0, NULL,
1202 rbd_simple_req_cb, 0, NULL);
1204 rbd_destroy_ops(ops);
1208 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1210 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1217 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1218 rbd_dev->header_name, (unsigned long long) notify_id,
1219 (unsigned int) opcode);
1220 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1221 rc = __rbd_refresh_header(rbd_dev);
1222 hver = rbd_dev->header.obj_version;
1223 mutex_unlock(&ctl_mutex);
1225 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1226 " update snaps: %d\n", rbd_dev->major, rc);
1228 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1232 * Request sync osd watch
1234 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1236 struct ceph_osd_req_op *ops;
1237 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1240 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1244 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1245 (void *)rbd_dev, &rbd_dev->watch_event);
1249 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1250 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1251 ops[0].watch.flag = 1;
1253 ret = rbd_req_sync_op(rbd_dev, NULL,
1255 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1257 rbd_dev->header_name,
1259 &rbd_dev->watch_request, NULL);
1264 rbd_destroy_ops(ops);
1268 ceph_osdc_cancel_event(rbd_dev->watch_event);
1269 rbd_dev->watch_event = NULL;
1271 rbd_destroy_ops(ops);
1276 * Request sync osd unwatch
1278 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1280 struct ceph_osd_req_op *ops;
1283 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1287 ops[0].watch.ver = 0;
1288 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1289 ops[0].watch.flag = 0;
1291 ret = rbd_req_sync_op(rbd_dev, NULL,
1293 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1295 rbd_dev->header_name,
1296 0, 0, NULL, NULL, NULL);
1299 rbd_destroy_ops(ops);
1300 ceph_osdc_cancel_event(rbd_dev->watch_event);
1301 rbd_dev->watch_event = NULL;
1305 struct rbd_notify_info {
1306 struct rbd_device *rbd_dev;
1309 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1311 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1315 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1316 rbd_dev->header_name, (unsigned long long) notify_id,
1317 (unsigned int) opcode);
1321 * Request sync osd notify
1323 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1325 struct ceph_osd_req_op *ops;
1326 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1327 struct ceph_osd_event *event;
1328 struct rbd_notify_info info;
1329 int payload_len = sizeof(u32) + sizeof(u32);
1332 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1336 info.rbd_dev = rbd_dev;
1338 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1339 (void *)&info, &event);
1343 ops[0].watch.ver = 1;
1344 ops[0].watch.flag = 1;
1345 ops[0].watch.cookie = event->cookie;
1346 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1347 ops[0].watch.timeout = 12;
1349 ret = rbd_req_sync_op(rbd_dev, NULL,
1351 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1353 rbd_dev->header_name,
1354 0, 0, NULL, NULL, NULL);
1358 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1359 dout("ceph_osdc_wait_event returned %d\n", ret);
1360 rbd_destroy_ops(ops);
1364 ceph_osdc_cancel_event(event);
1366 rbd_destroy_ops(ops);
1371 * Request sync osd read
1373 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1374 const char *object_name,
1375 const char *class_name,
1376 const char *method_name,
1381 struct ceph_osd_req_op *ops;
1382 int class_name_len = strlen(class_name);
1383 int method_name_len = strlen(method_name);
1386 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1387 class_name_len + method_name_len + len);
1391 ops[0].cls.class_name = class_name;
1392 ops[0].cls.class_len = (__u8) class_name_len;
1393 ops[0].cls.method_name = method_name;
1394 ops[0].cls.method_len = (__u8) method_name_len;
1395 ops[0].cls.argc = 0;
1396 ops[0].cls.indata = data;
1397 ops[0].cls.indata_len = len;
1399 ret = rbd_req_sync_op(rbd_dev, NULL,
1401 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1403 object_name, 0, 0, NULL, NULL, ver);
1405 rbd_destroy_ops(ops);
1407 dout("cls_exec returned %d\n", ret);
1411 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1413 struct rbd_req_coll *coll =
1414 kzalloc(sizeof(struct rbd_req_coll) +
1415 sizeof(struct rbd_req_status) * num_reqs,
1420 coll->total = num_reqs;
1421 kref_init(&coll->kref);
1426 * block device queue callback
1428 static void rbd_rq_fn(struct request_queue *q)
1430 struct rbd_device *rbd_dev = q->queuedata;
1432 struct bio_pair *bp = NULL;
1434 while ((rq = blk_fetch_request(q))) {
1436 struct bio *rq_bio, *next_bio = NULL;
1441 int num_segs, cur_seg = 0;
1442 struct rbd_req_coll *coll;
1443 struct ceph_snap_context *snapc;
1445 /* peek at request from block layer */
1449 dout("fetched request\n");
1451 /* filter out block requests we don't understand */
1452 if ((rq->cmd_type != REQ_TYPE_FS)) {
1453 __blk_end_request_all(rq, 0);
1457 /* deduce our operation (read, write) */
1458 do_write = (rq_data_dir(rq) == WRITE);
1460 size = blk_rq_bytes(rq);
1461 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1463 if (do_write && rbd_dev->read_only) {
1464 __blk_end_request_all(rq, -EROFS);
1468 spin_unlock_irq(q->queue_lock);
1470 down_read(&rbd_dev->header_rwsem);
1472 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1473 up_read(&rbd_dev->header_rwsem);
1474 dout("request for non-existent snapshot");
1475 spin_lock_irq(q->queue_lock);
1476 __blk_end_request_all(rq, -ENXIO);
1480 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1482 up_read(&rbd_dev->header_rwsem);
1484 dout("%s 0x%x bytes at 0x%llx\n",
1485 do_write ? "write" : "read",
1486 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1488 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1489 coll = rbd_alloc_coll(num_segs);
1491 spin_lock_irq(q->queue_lock);
1492 __blk_end_request_all(rq, -ENOMEM);
1493 ceph_put_snap_context(snapc);
1498 /* a bio clone to be passed down to OSD req */
1499 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1500 op_size = rbd_get_segment(&rbd_dev->header,
1501 rbd_dev->header.object_prefix,
1504 kref_get(&coll->kref);
1505 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1506 op_size, GFP_ATOMIC);
1508 rbd_coll_end_req_index(rq, coll, cur_seg,
1514 /* init OSD command: write or read */
1516 rbd_req_write(rq, rbd_dev,
1522 rbd_req_read(rq, rbd_dev,
1535 kref_put(&coll->kref, rbd_coll_release);
1538 bio_pair_release(bp);
1539 spin_lock_irq(q->queue_lock);
1541 ceph_put_snap_context(snapc);
1546 * a queue callback. Makes sure that we don't create a bio that spans across
1547 * multiple osd objects. One exception would be with a single page bios,
1548 * which we handle later at bio_chain_clone
1550 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1551 struct bio_vec *bvec)
1553 struct rbd_device *rbd_dev = q->queuedata;
1554 unsigned int chunk_sectors;
1556 unsigned int bio_sectors;
1559 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1560 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1561 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1563 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1564 + bio_sectors)) << SECTOR_SHIFT;
1566 max = 0; /* bio_add cannot handle a negative return */
1567 if (max <= bvec->bv_len && bio_sectors == 0)
1568 return bvec->bv_len;
1572 static void rbd_free_disk(struct rbd_device *rbd_dev)
1574 struct gendisk *disk = rbd_dev->disk;
1579 rbd_header_free(&rbd_dev->header);
1581 if (disk->flags & GENHD_FL_UP)
1584 blk_cleanup_queue(disk->queue);
1589 * reload the ondisk the header
1591 static int rbd_read_header(struct rbd_device *rbd_dev,
1592 struct rbd_image_header *header)
1595 struct rbd_image_header_ondisk *dh;
1601 * First reads the fixed-size header to determine the number
1602 * of snapshots, then re-reads it, along with all snapshot
1603 * records as well as their stored names.
1607 dh = kmalloc(len, GFP_KERNEL);
1611 rc = rbd_req_sync_read(rbd_dev,
1613 rbd_dev->header_name,
1619 rc = rbd_header_from_disk(header, dh, snap_count);
1622 pr_warning("unrecognized header format"
1624 rbd_dev->image_name);
1628 if (snap_count == header->total_snaps)
1631 snap_count = header->total_snaps;
1632 len = sizeof (*dh) +
1633 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1634 header->snap_names_len;
1636 rbd_header_free(header);
1639 header->obj_version = ver;
1649 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1650 const char *snap_name,
1653 int name_len = strlen(snap_name);
1657 struct ceph_mon_client *monc;
1659 /* we should create a snapshot only if we're pointing at the head */
1660 if (rbd_dev->snap_id != CEPH_NOSNAP)
1663 monc = &rbd_dev->rbd_client->client->monc;
1664 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1665 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1669 data = kmalloc(name_len + 16, gfp_flags);
1674 e = data + name_len + 16;
1676 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1677 ceph_encode_64_safe(&p, e, new_snapid, bad);
1679 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1681 data, p - data, NULL);
1685 return ret < 0 ? ret : 0;
1690 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1692 struct rbd_snap *snap;
1693 struct rbd_snap *next;
1695 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1696 __rbd_remove_snap_dev(snap);
1700 * only read the first part of the ondisk header, without the snaps info
1702 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1705 struct rbd_image_header h;
1707 ret = rbd_read_header(rbd_dev, &h);
1711 down_write(&rbd_dev->header_rwsem);
1714 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1715 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1717 dout("setting size to %llu sectors", (unsigned long long) size);
1718 set_capacity(rbd_dev->disk, size);
1721 /* rbd_dev->header.object_prefix shouldn't change */
1722 kfree(rbd_dev->header.snap_sizes);
1723 kfree(rbd_dev->header.snap_names);
1724 /* osd requests may still refer to snapc */
1725 ceph_put_snap_context(rbd_dev->header.snapc);
1727 rbd_dev->header.obj_version = h.obj_version;
1728 rbd_dev->header.image_size = h.image_size;
1729 rbd_dev->header.total_snaps = h.total_snaps;
1730 rbd_dev->header.snapc = h.snapc;
1731 rbd_dev->header.snap_names = h.snap_names;
1732 rbd_dev->header.snap_names_len = h.snap_names_len;
1733 rbd_dev->header.snap_sizes = h.snap_sizes;
1734 /* Free the extra copy of the object prefix */
1735 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1736 kfree(h.object_prefix);
1738 ret = __rbd_init_snaps_header(rbd_dev);
1740 up_write(&rbd_dev->header_rwsem);
1745 static int rbd_init_disk(struct rbd_device *rbd_dev)
1747 struct gendisk *disk;
1748 struct request_queue *q;
1753 /* contact OSD, request size info about the object being mapped */
1754 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1758 /* no need to lock here, as rbd_dev is not registered yet */
1759 rc = __rbd_init_snaps_header(rbd_dev);
1763 rc = rbd_header_set_snap(rbd_dev, &total_size);
1767 /* create gendisk info */
1769 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1773 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1775 disk->major = rbd_dev->major;
1776 disk->first_minor = 0;
1777 disk->fops = &rbd_bd_ops;
1778 disk->private_data = rbd_dev;
1782 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1786 /* We use the default size, but let's be explicit about it. */
1787 blk_queue_physical_block_size(q, SECTOR_SIZE);
1789 /* set io sizes to object size */
1790 segment_size = rbd_obj_bytes(&rbd_dev->header);
1791 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1792 blk_queue_max_segment_size(q, segment_size);
1793 blk_queue_io_min(q, segment_size);
1794 blk_queue_io_opt(q, segment_size);
1796 blk_queue_merge_bvec(q, rbd_merge_bvec);
1799 q->queuedata = rbd_dev;
1801 rbd_dev->disk = disk;
1804 /* finally, announce the disk to the world */
1805 set_capacity(disk, total_size / SECTOR_SIZE);
1808 pr_info("%s: added with size 0x%llx\n",
1809 disk->disk_name, (unsigned long long)total_size);
1822 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1824 return container_of(dev, struct rbd_device, dev);
1827 static ssize_t rbd_size_show(struct device *dev,
1828 struct device_attribute *attr, char *buf)
1830 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1833 down_read(&rbd_dev->header_rwsem);
1834 size = get_capacity(rbd_dev->disk);
1835 up_read(&rbd_dev->header_rwsem);
1837 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1840 static ssize_t rbd_major_show(struct device *dev,
1841 struct device_attribute *attr, char *buf)
1843 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1845 return sprintf(buf, "%d\n", rbd_dev->major);
1848 static ssize_t rbd_client_id_show(struct device *dev,
1849 struct device_attribute *attr, char *buf)
1851 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1853 return sprintf(buf, "client%lld\n",
1854 ceph_client_id(rbd_dev->rbd_client->client));
1857 static ssize_t rbd_pool_show(struct device *dev,
1858 struct device_attribute *attr, char *buf)
1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1865 static ssize_t rbd_pool_id_show(struct device *dev,
1866 struct device_attribute *attr, char *buf)
1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1873 static ssize_t rbd_name_show(struct device *dev,
1874 struct device_attribute *attr, char *buf)
1876 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878 return sprintf(buf, "%s\n", rbd_dev->image_name);
1881 static ssize_t rbd_snap_show(struct device *dev,
1882 struct device_attribute *attr,
1885 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1890 static ssize_t rbd_image_refresh(struct device *dev,
1891 struct device_attribute *attr,
1895 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1899 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1901 rc = __rbd_refresh_header(rbd_dev);
1905 mutex_unlock(&ctl_mutex);
1909 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1910 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1911 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1912 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1913 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1914 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1915 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1916 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1917 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1919 static struct attribute *rbd_attrs[] = {
1920 &dev_attr_size.attr,
1921 &dev_attr_major.attr,
1922 &dev_attr_client_id.attr,
1923 &dev_attr_pool.attr,
1924 &dev_attr_pool_id.attr,
1925 &dev_attr_name.attr,
1926 &dev_attr_current_snap.attr,
1927 &dev_attr_refresh.attr,
1928 &dev_attr_create_snap.attr,
1932 static struct attribute_group rbd_attr_group = {
1936 static const struct attribute_group *rbd_attr_groups[] = {
1941 static void rbd_sysfs_dev_release(struct device *dev)
1945 static struct device_type rbd_device_type = {
1947 .groups = rbd_attr_groups,
1948 .release = rbd_sysfs_dev_release,
1956 static ssize_t rbd_snap_size_show(struct device *dev,
1957 struct device_attribute *attr,
1960 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1962 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1965 static ssize_t rbd_snap_id_show(struct device *dev,
1966 struct device_attribute *attr,
1969 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1971 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1974 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1975 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1977 static struct attribute *rbd_snap_attrs[] = {
1978 &dev_attr_snap_size.attr,
1979 &dev_attr_snap_id.attr,
1983 static struct attribute_group rbd_snap_attr_group = {
1984 .attrs = rbd_snap_attrs,
1987 static void rbd_snap_dev_release(struct device *dev)
1989 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1994 static const struct attribute_group *rbd_snap_attr_groups[] = {
1995 &rbd_snap_attr_group,
1999 static struct device_type rbd_snap_device_type = {
2000 .groups = rbd_snap_attr_groups,
2001 .release = rbd_snap_dev_release,
2004 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2006 list_del(&snap->node);
2007 device_unregister(&snap->dev);
2010 static int rbd_register_snap_dev(struct rbd_snap *snap,
2011 struct device *parent)
2013 struct device *dev = &snap->dev;
2016 dev->type = &rbd_snap_device_type;
2017 dev->parent = parent;
2018 dev->release = rbd_snap_dev_release;
2019 dev_set_name(dev, "snap_%s", snap->name);
2020 ret = device_register(dev);
2025 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2026 int i, const char *name)
2028 struct rbd_snap *snap;
2031 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2033 return ERR_PTR(-ENOMEM);
2036 snap->name = kstrdup(name, GFP_KERNEL);
2040 snap->size = rbd_dev->header.snap_sizes[i];
2041 snap->id = rbd_dev->header.snapc->snaps[i];
2042 if (device_is_registered(&rbd_dev->dev)) {
2043 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2054 return ERR_PTR(ret);
2058 * search for the previous snap in a null delimited string list
2060 const char *rbd_prev_snap_name(const char *name, const char *start)
2062 if (name < start + 2)
2075 * compare the old list of snapshots that we have to what's in the header
2076 * and update it accordingly. Note that the header holds the snapshots
2077 * in a reverse order (from newest to oldest) and we need to go from
2078 * older to new so that we don't get a duplicate snap name when
2079 * doing the process (e.g., removed snapshot and recreated a new
2080 * one with the same name.
2082 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2084 const char *name, *first_name;
2085 int i = rbd_dev->header.total_snaps;
2086 struct rbd_snap *snap, *old_snap = NULL;
2087 struct list_head *p, *n;
2089 first_name = rbd_dev->header.snap_names;
2090 name = first_name + rbd_dev->header.snap_names_len;
2092 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2095 old_snap = list_entry(p, struct rbd_snap, node);
2098 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2100 if (!i || old_snap->id < cur_id) {
2102 * old_snap->id was skipped, thus was
2103 * removed. If this rbd_dev is mapped to
2104 * the removed snapshot, record that it no
2105 * longer exists, to prevent further I/O.
2107 if (rbd_dev->snap_id == old_snap->id)
2108 rbd_dev->snap_exists = false;
2109 __rbd_remove_snap_dev(old_snap);
2112 if (old_snap->id == cur_id) {
2113 /* we have this snapshot already */
2115 name = rbd_prev_snap_name(name, first_name);
2119 i--, name = rbd_prev_snap_name(name, first_name)) {
2124 cur_id = rbd_dev->header.snapc->snaps[i];
2125 /* snapshot removal? handle it above */
2126 if (cur_id >= old_snap->id)
2128 /* a new snapshot */
2129 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2131 return PTR_ERR(snap);
2133 /* note that we add it backward so using n and not p */
2134 list_add(&snap->node, n);
2138 /* we're done going over the old snap list, just add what's left */
2139 for (; i > 0; i--) {
2140 name = rbd_prev_snap_name(name, first_name);
2145 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2147 return PTR_ERR(snap);
2148 list_add(&snap->node, &rbd_dev->snaps);
2154 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2158 struct rbd_snap *snap;
2160 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2161 dev = &rbd_dev->dev;
2163 dev->bus = &rbd_bus_type;
2164 dev->type = &rbd_device_type;
2165 dev->parent = &rbd_root_dev;
2166 dev->release = rbd_dev_release;
2167 dev_set_name(dev, "%d", rbd_dev->dev_id);
2168 ret = device_register(dev);
2172 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2173 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2178 mutex_unlock(&ctl_mutex);
2182 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2184 device_unregister(&rbd_dev->dev);
2187 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2192 ret = rbd_req_sync_watch(rbd_dev);
2193 if (ret == -ERANGE) {
2194 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2195 rc = __rbd_refresh_header(rbd_dev);
2196 mutex_unlock(&ctl_mutex);
2200 } while (ret == -ERANGE);
2205 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2208 * Get a unique rbd identifier for the given new rbd_dev, and add
2209 * the rbd_dev to the global list. The minimum rbd id is 1.
2211 static void rbd_id_get(struct rbd_device *rbd_dev)
2213 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2215 spin_lock(&rbd_dev_list_lock);
2216 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2217 spin_unlock(&rbd_dev_list_lock);
2221 * Remove an rbd_dev from the global list, and record that its
2222 * identifier is no longer in use.
2224 static void rbd_id_put(struct rbd_device *rbd_dev)
2226 struct list_head *tmp;
2227 int rbd_id = rbd_dev->dev_id;
2232 spin_lock(&rbd_dev_list_lock);
2233 list_del_init(&rbd_dev->node);
2236 * If the id being "put" is not the current maximum, there
2237 * is nothing special we need to do.
2239 if (rbd_id != atomic64_read(&rbd_id_max)) {
2240 spin_unlock(&rbd_dev_list_lock);
2245 * We need to update the current maximum id. Search the
2246 * list to find out what it is. We're more likely to find
2247 * the maximum at the end, so search the list backward.
2250 list_for_each_prev(tmp, &rbd_dev_list) {
2251 struct rbd_device *rbd_dev;
2253 rbd_dev = list_entry(tmp, struct rbd_device, node);
2254 if (rbd_id > max_id)
2257 spin_unlock(&rbd_dev_list_lock);
2260 * The max id could have been updated by rbd_id_get(), in
2261 * which case it now accurately reflects the new maximum.
2262 * Be careful not to overwrite the maximum value in that
2265 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2269 * Skips over white space at *buf, and updates *buf to point to the
2270 * first found non-space character (if any). Returns the length of
2271 * the token (string of non-white space characters) found. Note
2272 * that *buf must be terminated with '\0'.
2274 static inline size_t next_token(const char **buf)
2277 * These are the characters that produce nonzero for
2278 * isspace() in the "C" and "POSIX" locales.
2280 const char *spaces = " \f\n\r\t\v";
2282 *buf += strspn(*buf, spaces); /* Find start of token */
2284 return strcspn(*buf, spaces); /* Return token length */
2288 * Finds the next token in *buf, and if the provided token buffer is
2289 * big enough, copies the found token into it. The result, if
2290 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2291 * must be terminated with '\0' on entry.
2293 * Returns the length of the token found (not including the '\0').
2294 * Return value will be 0 if no token is found, and it will be >=
2295 * token_size if the token would not fit.
2297 * The *buf pointer will be updated to point beyond the end of the
2298 * found token. Note that this occurs even if the token buffer is
2299 * too small to hold it.
2301 static inline size_t copy_token(const char **buf,
2307 len = next_token(buf);
2308 if (len < token_size) {
2309 memcpy(token, *buf, len);
2310 *(token + len) = '\0';
2318 * Finds the next token in *buf, dynamically allocates a buffer big
2319 * enough to hold a copy of it, and copies the token into the new
2320 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2321 * that a duplicate buffer is created even for a zero-length token.
2323 * Returns a pointer to the newly-allocated duplicate, or a null
2324 * pointer if memory for the duplicate was not available. If
2325 * the lenp argument is a non-null pointer, the length of the token
2326 * (not including the '\0') is returned in *lenp.
2328 * If successful, the *buf pointer will be updated to point beyond
2329 * the end of the found token.
2331 * Note: uses GFP_KERNEL for allocation.
2333 static inline char *dup_token(const char **buf, size_t *lenp)
2338 len = next_token(buf);
2339 dup = kmalloc(len + 1, GFP_KERNEL);
2343 memcpy(dup, *buf, len);
2344 *(dup + len) = '\0';
2354 * This fills in the pool_name, image_name, image_name_len, snap_name,
2355 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2356 * on the list of monitor addresses and other options provided via
2359 * Note: rbd_dev is assumed to have been initially zero-filled.
2361 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2363 const char **mon_addrs,
2364 size_t *mon_addrs_size,
2366 size_t options_size)
2371 /* The first four tokens are required */
2373 len = next_token(&buf);
2376 *mon_addrs_size = len + 1;
2381 len = copy_token(&buf, options, options_size);
2382 if (!len || len >= options_size)
2386 rbd_dev->pool_name = dup_token(&buf, NULL);
2387 if (!rbd_dev->pool_name)
2390 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2391 if (!rbd_dev->image_name)
2394 /* Create the name of the header object */
2396 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2397 + sizeof (RBD_SUFFIX),
2399 if (!rbd_dev->header_name)
2401 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2404 * The snapshot name is optional. If none is is supplied,
2405 * we use the default value.
2407 rbd_dev->snap_name = dup_token(&buf, &len);
2408 if (!rbd_dev->snap_name)
2411 /* Replace the empty name with the default */
2412 kfree(rbd_dev->snap_name);
2414 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2415 if (!rbd_dev->snap_name)
2418 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2419 sizeof (RBD_SNAP_HEAD_NAME));
2425 kfree(rbd_dev->header_name);
2426 kfree(rbd_dev->image_name);
2427 kfree(rbd_dev->pool_name);
2428 rbd_dev->pool_name = NULL;
2433 static ssize_t rbd_add(struct bus_type *bus,
2438 struct rbd_device *rbd_dev = NULL;
2439 const char *mon_addrs = NULL;
2440 size_t mon_addrs_size = 0;
2441 struct ceph_osd_client *osdc;
2444 if (!try_module_get(THIS_MODULE))
2447 options = kmalloc(count, GFP_KERNEL);
2450 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2454 /* static rbd_device initialization */
2455 spin_lock_init(&rbd_dev->lock);
2456 INIT_LIST_HEAD(&rbd_dev->node);
2457 INIT_LIST_HEAD(&rbd_dev->snaps);
2458 init_rwsem(&rbd_dev->header_rwsem);
2460 /* generate unique id: find highest unique id, add one */
2461 rbd_id_get(rbd_dev);
2463 /* Fill in the device name, now that we have its id. */
2464 BUILD_BUG_ON(DEV_NAME_LEN
2465 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2466 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2468 /* parse add command */
2469 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2474 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2476 if (IS_ERR(rbd_dev->rbd_client)) {
2477 rc = PTR_ERR(rbd_dev->rbd_client);
2482 osdc = &rbd_dev->rbd_client->client->osdc;
2483 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2485 goto err_out_client;
2486 rbd_dev->pool_id = rc;
2488 /* register our block device */
2489 rc = register_blkdev(0, rbd_dev->name);
2491 goto err_out_client;
2492 rbd_dev->major = rc;
2494 rc = rbd_bus_add_dev(rbd_dev);
2496 goto err_out_blkdev;
2499 * At this point cleanup in the event of an error is the job
2500 * of the sysfs code (initiated by rbd_bus_del_dev()).
2502 * Set up and announce blkdev mapping.
2504 rc = rbd_init_disk(rbd_dev);
2508 rc = rbd_init_watch_dev(rbd_dev);
2515 /* this will also clean up rest of rbd_dev stuff */
2517 rbd_bus_del_dev(rbd_dev);
2522 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2524 rbd_put_client(rbd_dev);
2526 if (rbd_dev->pool_name) {
2527 kfree(rbd_dev->snap_name);
2528 kfree(rbd_dev->header_name);
2529 kfree(rbd_dev->image_name);
2530 kfree(rbd_dev->pool_name);
2532 rbd_id_put(rbd_dev);
2537 dout("Error adding device %s\n", buf);
2538 module_put(THIS_MODULE);
2540 return (ssize_t) rc;
2543 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2545 struct list_head *tmp;
2546 struct rbd_device *rbd_dev;
2548 spin_lock(&rbd_dev_list_lock);
2549 list_for_each(tmp, &rbd_dev_list) {
2550 rbd_dev = list_entry(tmp, struct rbd_device, node);
2551 if (rbd_dev->dev_id == dev_id) {
2552 spin_unlock(&rbd_dev_list_lock);
2556 spin_unlock(&rbd_dev_list_lock);
2560 static void rbd_dev_release(struct device *dev)
2562 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2564 if (rbd_dev->watch_request) {
2565 struct ceph_client *client = rbd_dev->rbd_client->client;
2567 ceph_osdc_unregister_linger_request(&client->osdc,
2568 rbd_dev->watch_request);
2570 if (rbd_dev->watch_event)
2571 rbd_req_sync_unwatch(rbd_dev);
2573 rbd_put_client(rbd_dev);
2575 /* clean up and free blkdev */
2576 rbd_free_disk(rbd_dev);
2577 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2579 /* done with the id, and with the rbd_dev */
2580 kfree(rbd_dev->snap_name);
2581 kfree(rbd_dev->header_name);
2582 kfree(rbd_dev->pool_name);
2583 kfree(rbd_dev->image_name);
2584 rbd_id_put(rbd_dev);
2587 /* release module ref */
2588 module_put(THIS_MODULE);
2591 static ssize_t rbd_remove(struct bus_type *bus,
2595 struct rbd_device *rbd_dev = NULL;
2600 rc = strict_strtoul(buf, 10, &ul);
2604 /* convert to int; abort if we lost anything in the conversion */
2605 target_id = (int) ul;
2606 if (target_id != ul)
2609 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2611 rbd_dev = __rbd_get_dev(target_id);
2617 __rbd_remove_all_snaps(rbd_dev);
2618 rbd_bus_del_dev(rbd_dev);
2621 mutex_unlock(&ctl_mutex);
2625 static ssize_t rbd_snap_add(struct device *dev,
2626 struct device_attribute *attr,
2630 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2632 char *name = kmalloc(count + 1, GFP_KERNEL);
2636 snprintf(name, count, "%s", buf);
2638 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2640 ret = rbd_header_add_snap(rbd_dev,
2645 ret = __rbd_refresh_header(rbd_dev);
2649 /* shouldn't hold ctl_mutex when notifying.. notify might
2650 trigger a watch callback that would need to get that mutex */
2651 mutex_unlock(&ctl_mutex);
2653 /* make a best effort, don't error if failed */
2654 rbd_req_sync_notify(rbd_dev);
2661 mutex_unlock(&ctl_mutex);
2667 * create control files in sysfs
2670 static int rbd_sysfs_init(void)
2674 ret = device_register(&rbd_root_dev);
2678 ret = bus_register(&rbd_bus_type);
2680 device_unregister(&rbd_root_dev);
2685 static void rbd_sysfs_cleanup(void)
2687 bus_unregister(&rbd_bus_type);
2688 device_unregister(&rbd_root_dev);
2691 int __init rbd_init(void)
2695 rc = rbd_sysfs_init();
2698 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2702 void __exit rbd_exit(void)
2704 rbd_sysfs_cleanup();
2707 module_init(rbd_init);
2708 module_exit(rbd_exit);
2710 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2711 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2712 MODULE_DESCRIPTION("rados block device");
2714 /* following authorship retained from original osdblk.c */
2715 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2717 MODULE_LICENSE("GPL");