regulator: tps65023: Constify struct regmap_config and regulator_ops
[cascardo/linux.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 #include <linux/workqueue.h>
46
47 #include "rbd_types.h"
48
49 #define RBD_DEBUG       /* Activate rbd_assert() calls */
50
51 /*
52  * The basic unit of block I/O is a sector.  It is interpreted in a
53  * number of contexts in Linux (blk, bio, genhd), but the default is
54  * universally 512 bytes.  These symbols are just slightly more
55  * meaningful than the bare numbers they represent.
56  */
57 #define SECTOR_SHIFT    9
58 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
59
60 /*
61  * Increment the given counter and return its updated value.
62  * If the counter is already 0 it will not be incremented.
63  * If the counter is already at its maximum value returns
64  * -EINVAL without updating it.
65  */
66 static int atomic_inc_return_safe(atomic_t *v)
67 {
68         unsigned int counter;
69
70         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71         if (counter <= (unsigned int)INT_MAX)
72                 return (int)counter;
73
74         atomic_dec(v);
75
76         return -EINVAL;
77 }
78
79 /* Decrement the counter.  Return the resulting value, or -EINVAL */
80 static int atomic_dec_return_safe(atomic_t *v)
81 {
82         int counter;
83
84         counter = atomic_dec_return(v);
85         if (counter >= 0)
86                 return counter;
87
88         atomic_inc(v);
89
90         return -EINVAL;
91 }
92
93 #define RBD_DRV_NAME "rbd"
94
95 #define RBD_MINORS_PER_MAJOR            256
96 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
97
98 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
99 #define RBD_MAX_SNAP_NAME_LEN   \
100                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
101
102 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
103
104 #define RBD_SNAP_HEAD_NAME      "-"
105
106 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
107
108 /* This allows a single page to hold an image name sent by OSD */
109 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
110 #define RBD_IMAGE_ID_LEN_MAX    64
111
112 #define RBD_OBJ_PREFIX_LEN_MAX  64
113
114 /* Feature bits */
115
116 #define RBD_FEATURE_LAYERING    (1<<0)
117 #define RBD_FEATURE_STRIPINGV2  (1<<1)
118 #define RBD_FEATURES_ALL \
119             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
120
121 /* Features supported by this (client software) implementation. */
122
123 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
124
125 /*
126  * An RBD device name will be "rbd#", where the "rbd" comes from
127  * RBD_DRV_NAME above, and # is a unique integer identifier.
128  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129  * enough to hold all possible device names.
130  */
131 #define DEV_NAME_LEN            32
132 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
133
134 /*
135  * block device image metadata (in-memory version)
136  */
137 struct rbd_image_header {
138         /* These six fields never change for a given rbd image */
139         char *object_prefix;
140         __u8 obj_order;
141         __u8 crypt_type;
142         __u8 comp_type;
143         u64 stripe_unit;
144         u64 stripe_count;
145         u64 features;           /* Might be changeable someday? */
146
147         /* The remaining fields need to be updated occasionally */
148         u64 image_size;
149         struct ceph_snap_context *snapc;
150         char *snap_names;       /* format 1 only */
151         u64 *snap_sizes;        /* format 1 only */
152 };
153
154 /*
155  * An rbd image specification.
156  *
157  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158  * identify an image.  Each rbd_dev structure includes a pointer to
159  * an rbd_spec structure that encapsulates this identity.
160  *
161  * Each of the id's in an rbd_spec has an associated name.  For a
162  * user-mapped image, the names are supplied and the id's associated
163  * with them are looked up.  For a layered image, a parent image is
164  * defined by the tuple, and the names are looked up.
165  *
166  * An rbd_dev structure contains a parent_spec pointer which is
167  * non-null if the image it represents is a child in a layered
168  * image.  This pointer will refer to the rbd_spec structure used
169  * by the parent rbd_dev for its own identity (i.e., the structure
170  * is shared between the parent and child).
171  *
172  * Since these structures are populated once, during the discovery
173  * phase of image construction, they are effectively immutable so
174  * we make no effort to synchronize access to them.
175  *
176  * Note that code herein does not assume the image name is known (it
177  * could be a null pointer).
178  */
179 struct rbd_spec {
180         u64             pool_id;
181         const char      *pool_name;
182
183         const char      *image_id;
184         const char      *image_name;
185
186         u64             snap_id;
187         const char      *snap_name;
188
189         struct kref     kref;
190 };
191
192 /*
193  * an instance of the client.  multiple devices may share an rbd client.
194  */
195 struct rbd_client {
196         struct ceph_client      *client;
197         struct kref             kref;
198         struct list_head        node;
199 };
200
201 struct rbd_img_request;
202 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
203
204 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
205
206 struct rbd_obj_request;
207 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
208
209 enum obj_request_type {
210         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_WRITE,
215         OBJ_OP_READ,
216         OBJ_OP_DISCARD,
217 };
218
219 enum obj_req_flags {
220         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
221         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
222         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
223         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
224 };
225
226 struct rbd_obj_request {
227         const char              *object_name;
228         u64                     offset;         /* object start byte */
229         u64                     length;         /* bytes from offset */
230         unsigned long           flags;
231
232         /*
233          * An object request associated with an image will have its
234          * img_data flag set; a standalone object request will not.
235          *
236          * A standalone object request will have which == BAD_WHICH
237          * and a null obj_request pointer.
238          *
239          * An object request initiated in support of a layered image
240          * object (to check for its existence before a write) will
241          * have which == BAD_WHICH and a non-null obj_request pointer.
242          *
243          * Finally, an object request for rbd image data will have
244          * which != BAD_WHICH, and will have a non-null img_request
245          * pointer.  The value of which will be in the range
246          * 0..(img_request->obj_request_count-1).
247          */
248         union {
249                 struct rbd_obj_request  *obj_request;   /* STAT op */
250                 struct {
251                         struct rbd_img_request  *img_request;
252                         u64                     img_offset;
253                         /* links for img_request->obj_requests list */
254                         struct list_head        links;
255                 };
256         };
257         u32                     which;          /* posn image request list */
258
259         enum obj_request_type   type;
260         union {
261                 struct bio      *bio_list;
262                 struct {
263                         struct page     **pages;
264                         u32             page_count;
265                 };
266         };
267         struct page             **copyup_pages;
268         u32                     copyup_page_count;
269
270         struct ceph_osd_request *osd_req;
271
272         u64                     xferred;        /* bytes transferred */
273         int                     result;
274
275         rbd_obj_callback_t      callback;
276         struct completion       completion;
277
278         struct kref             kref;
279 };
280
281 enum img_req_flags {
282         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
283         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
284         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
285         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
286 };
287
288 struct rbd_img_request {
289         struct rbd_device       *rbd_dev;
290         u64                     offset; /* starting image byte offset */
291         u64                     length; /* byte count from offset */
292         unsigned long           flags;
293         union {
294                 u64                     snap_id;        /* for reads */
295                 struct ceph_snap_context *snapc;        /* for writes */
296         };
297         union {
298                 struct request          *rq;            /* block request */
299                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
300         };
301         struct page             **copyup_pages;
302         u32                     copyup_page_count;
303         spinlock_t              completion_lock;/* protects next_completion */
304         u32                     next_completion;
305         rbd_img_callback_t      callback;
306         u64                     xferred;/* aggregate bytes transferred */
307         int                     result; /* first nonzero obj_request result */
308
309         u32                     obj_request_count;
310         struct list_head        obj_requests;   /* rbd_obj_request structs */
311
312         struct kref             kref;
313 };
314
315 #define for_each_obj_request(ireq, oreq) \
316         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_from(ireq, oreq) \
318         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319 #define for_each_obj_request_safe(ireq, oreq, n) \
320         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
321
322 struct rbd_mapping {
323         u64                     size;
324         u64                     features;
325         bool                    read_only;
326 };
327
328 /*
329  * a single device
330  */
331 struct rbd_device {
332         int                     dev_id;         /* blkdev unique id */
333
334         int                     major;          /* blkdev assigned major */
335         int                     minor;
336         struct gendisk          *disk;          /* blkdev's gendisk and rq */
337
338         u32                     image_format;   /* Either 1 or 2 */
339         struct rbd_client       *rbd_client;
340
341         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343         struct list_head        rq_queue;       /* incoming rq queue */
344         spinlock_t              lock;           /* queue, flags, open_count */
345         struct work_struct      rq_work;
346
347         struct rbd_image_header header;
348         unsigned long           flags;          /* possibly lock protected */
349         struct rbd_spec         *spec;
350
351         char                    *header_name;
352
353         struct ceph_file_layout layout;
354
355         struct ceph_osd_event   *watch_event;
356         struct rbd_obj_request  *watch_request;
357
358         struct rbd_spec         *parent_spec;
359         u64                     parent_overlap;
360         atomic_t                parent_ref;
361         struct rbd_device       *parent;
362
363         /* protects updating the header */
364         struct rw_semaphore     header_rwsem;
365
366         struct rbd_mapping      mapping;
367
368         struct list_head        node;
369
370         /* sysfs related */
371         struct device           dev;
372         unsigned long           open_count;     /* protected by lock */
373 };
374
375 /*
376  * Flag bits for rbd_dev->flags.  If atomicity is required,
377  * rbd_dev->lock is used to protect access.
378  *
379  * Currently, only the "removing" flag (which is coupled with the
380  * "open_count" field) requires atomic access.
381  */
382 enum rbd_dev_flags {
383         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
384         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
385 };
386
387 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
388
389 static LIST_HEAD(rbd_dev_list);    /* devices */
390 static DEFINE_SPINLOCK(rbd_dev_list_lock);
391
392 static LIST_HEAD(rbd_client_list);              /* clients */
393 static DEFINE_SPINLOCK(rbd_client_list_lock);
394
395 /* Slab caches for frequently-allocated structures */
396
397 static struct kmem_cache        *rbd_img_request_cache;
398 static struct kmem_cache        *rbd_obj_request_cache;
399 static struct kmem_cache        *rbd_segment_name_cache;
400
401 static int rbd_major;
402 static DEFINE_IDA(rbd_dev_id_ida);
403
404 static struct workqueue_struct *rbd_wq;
405
406 /*
407  * Default to false for now, as single-major requires >= 0.75 version of
408  * userspace rbd utility.
409  */
410 static bool single_major = false;
411 module_param(single_major, bool, S_IRUGO);
412 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
413
414 static int rbd_img_request_submit(struct rbd_img_request *img_request);
415
416 static void rbd_dev_device_release(struct device *dev);
417
418 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
419                        size_t count);
420 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
421                           size_t count);
422 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
423                                     size_t count);
424 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
425                                        size_t count);
426 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
427 static void rbd_spec_put(struct rbd_spec *spec);
428
429 static int rbd_dev_id_to_minor(int dev_id)
430 {
431         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
432 }
433
434 static int minor_to_rbd_dev_id(int minor)
435 {
436         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
437 }
438
439 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
440 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
441 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
442 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
443
444 static struct attribute *rbd_bus_attrs[] = {
445         &bus_attr_add.attr,
446         &bus_attr_remove.attr,
447         &bus_attr_add_single_major.attr,
448         &bus_attr_remove_single_major.attr,
449         NULL,
450 };
451
452 static umode_t rbd_bus_is_visible(struct kobject *kobj,
453                                   struct attribute *attr, int index)
454 {
455         if (!single_major &&
456             (attr == &bus_attr_add_single_major.attr ||
457              attr == &bus_attr_remove_single_major.attr))
458                 return 0;
459
460         return attr->mode;
461 }
462
463 static const struct attribute_group rbd_bus_group = {
464         .attrs = rbd_bus_attrs,
465         .is_visible = rbd_bus_is_visible,
466 };
467 __ATTRIBUTE_GROUPS(rbd_bus);
468
469 static struct bus_type rbd_bus_type = {
470         .name           = "rbd",
471         .bus_groups     = rbd_bus_groups,
472 };
473
474 static void rbd_root_dev_release(struct device *dev)
475 {
476 }
477
478 static struct device rbd_root_dev = {
479         .init_name =    "rbd",
480         .release =      rbd_root_dev_release,
481 };
482
483 static __printf(2, 3)
484 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
485 {
486         struct va_format vaf;
487         va_list args;
488
489         va_start(args, fmt);
490         vaf.fmt = fmt;
491         vaf.va = &args;
492
493         if (!rbd_dev)
494                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
495         else if (rbd_dev->disk)
496                 printk(KERN_WARNING "%s: %s: %pV\n",
497                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
498         else if (rbd_dev->spec && rbd_dev->spec->image_name)
499                 printk(KERN_WARNING "%s: image %s: %pV\n",
500                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
501         else if (rbd_dev->spec && rbd_dev->spec->image_id)
502                 printk(KERN_WARNING "%s: id %s: %pV\n",
503                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
504         else    /* punt */
505                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
506                         RBD_DRV_NAME, rbd_dev, &vaf);
507         va_end(args);
508 }
509
510 #ifdef RBD_DEBUG
511 #define rbd_assert(expr)                                                \
512                 if (unlikely(!(expr))) {                                \
513                         printk(KERN_ERR "\nAssertion failure in %s() "  \
514                                                 "at line %d:\n\n"       \
515                                         "\trbd_assert(%s);\n\n",        \
516                                         __func__, __LINE__, #expr);     \
517                         BUG();                                          \
518                 }
519 #else /* !RBD_DEBUG */
520 #  define rbd_assert(expr)      ((void) 0)
521 #endif /* !RBD_DEBUG */
522
523 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
524 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
525 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
526
527 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
528 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
529 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
530 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
531 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
532                                         u64 snap_id);
533 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
534                                 u8 *order, u64 *snap_size);
535 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
536                 u64 *snap_features);
537 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
538
539 static int rbd_open(struct block_device *bdev, fmode_t mode)
540 {
541         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
542         bool removing = false;
543
544         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
545                 return -EROFS;
546
547         spin_lock_irq(&rbd_dev->lock);
548         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
549                 removing = true;
550         else
551                 rbd_dev->open_count++;
552         spin_unlock_irq(&rbd_dev->lock);
553         if (removing)
554                 return -ENOENT;
555
556         (void) get_device(&rbd_dev->dev);
557
558         return 0;
559 }
560
561 static void rbd_release(struct gendisk *disk, fmode_t mode)
562 {
563         struct rbd_device *rbd_dev = disk->private_data;
564         unsigned long open_count_before;
565
566         spin_lock_irq(&rbd_dev->lock);
567         open_count_before = rbd_dev->open_count--;
568         spin_unlock_irq(&rbd_dev->lock);
569         rbd_assert(open_count_before > 0);
570
571         put_device(&rbd_dev->dev);
572 }
573
574 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
575 {
576         int ret = 0;
577         int val;
578         bool ro;
579         bool ro_changed = false;
580
581         /* get_user() may sleep, so call it before taking rbd_dev->lock */
582         if (get_user(val, (int __user *)(arg)))
583                 return -EFAULT;
584
585         ro = val ? true : false;
586         /* Snapshot doesn't allow to write*/
587         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
588                 return -EROFS;
589
590         spin_lock_irq(&rbd_dev->lock);
591         /* prevent others open this device */
592         if (rbd_dev->open_count > 1) {
593                 ret = -EBUSY;
594                 goto out;
595         }
596
597         if (rbd_dev->mapping.read_only != ro) {
598                 rbd_dev->mapping.read_only = ro;
599                 ro_changed = true;
600         }
601
602 out:
603         spin_unlock_irq(&rbd_dev->lock);
604         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
605         if (ret == 0 && ro_changed)
606                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
607
608         return ret;
609 }
610
611 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
612                         unsigned int cmd, unsigned long arg)
613 {
614         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
615         int ret = 0;
616
617         switch (cmd) {
618         case BLKROSET:
619                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
620                 break;
621         default:
622                 ret = -ENOTTY;
623         }
624
625         return ret;
626 }
627
628 #ifdef CONFIG_COMPAT
629 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
630                                 unsigned int cmd, unsigned long arg)
631 {
632         return rbd_ioctl(bdev, mode, cmd, arg);
633 }
634 #endif /* CONFIG_COMPAT */
635
636 static const struct block_device_operations rbd_bd_ops = {
637         .owner                  = THIS_MODULE,
638         .open                   = rbd_open,
639         .release                = rbd_release,
640         .ioctl                  = rbd_ioctl,
641 #ifdef CONFIG_COMPAT
642         .compat_ioctl           = rbd_compat_ioctl,
643 #endif
644 };
645
646 /*
647  * Initialize an rbd client instance.  Success or not, this function
648  * consumes ceph_opts.  Caller holds client_mutex.
649  */
650 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
651 {
652         struct rbd_client *rbdc;
653         int ret = -ENOMEM;
654
655         dout("%s:\n", __func__);
656         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
657         if (!rbdc)
658                 goto out_opt;
659
660         kref_init(&rbdc->kref);
661         INIT_LIST_HEAD(&rbdc->node);
662
663         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
664         if (IS_ERR(rbdc->client))
665                 goto out_rbdc;
666         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
667
668         ret = ceph_open_session(rbdc->client);
669         if (ret < 0)
670                 goto out_client;
671
672         spin_lock(&rbd_client_list_lock);
673         list_add_tail(&rbdc->node, &rbd_client_list);
674         spin_unlock(&rbd_client_list_lock);
675
676         dout("%s: rbdc %p\n", __func__, rbdc);
677
678         return rbdc;
679 out_client:
680         ceph_destroy_client(rbdc->client);
681 out_rbdc:
682         kfree(rbdc);
683 out_opt:
684         if (ceph_opts)
685                 ceph_destroy_options(ceph_opts);
686         dout("%s: error %d\n", __func__, ret);
687
688         return ERR_PTR(ret);
689 }
690
691 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
692 {
693         kref_get(&rbdc->kref);
694
695         return rbdc;
696 }
697
698 /*
699  * Find a ceph client with specific addr and configuration.  If
700  * found, bump its reference count.
701  */
702 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
703 {
704         struct rbd_client *client_node;
705         bool found = false;
706
707         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
708                 return NULL;
709
710         spin_lock(&rbd_client_list_lock);
711         list_for_each_entry(client_node, &rbd_client_list, node) {
712                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
713                         __rbd_get_client(client_node);
714
715                         found = true;
716                         break;
717                 }
718         }
719         spin_unlock(&rbd_client_list_lock);
720
721         return found ? client_node : NULL;
722 }
723
724 /*
725  * mount options
726  */
727 enum {
728         Opt_last_int,
729         /* int args above */
730         Opt_last_string,
731         /* string args above */
732         Opt_read_only,
733         Opt_read_write,
734         /* Boolean args above */
735         Opt_last_bool,
736 };
737
738 static match_table_t rbd_opts_tokens = {
739         /* int args above */
740         /* string args above */
741         {Opt_read_only, "read_only"},
742         {Opt_read_only, "ro"},          /* Alternate spelling */
743         {Opt_read_write, "read_write"},
744         {Opt_read_write, "rw"},         /* Alternate spelling */
745         /* Boolean args above */
746         {-1, NULL}
747 };
748
749 struct rbd_options {
750         bool    read_only;
751 };
752
753 #define RBD_READ_ONLY_DEFAULT   false
754
755 static int parse_rbd_opts_token(char *c, void *private)
756 {
757         struct rbd_options *rbd_opts = private;
758         substring_t argstr[MAX_OPT_ARGS];
759         int token, intval, ret;
760
761         token = match_token(c, rbd_opts_tokens, argstr);
762         if (token < 0)
763                 return -EINVAL;
764
765         if (token < Opt_last_int) {
766                 ret = match_int(&argstr[0], &intval);
767                 if (ret < 0) {
768                         pr_err("bad mount option arg (not int) "
769                                "at '%s'\n", c);
770                         return ret;
771                 }
772                 dout("got int token %d val %d\n", token, intval);
773         } else if (token > Opt_last_int && token < Opt_last_string) {
774                 dout("got string token %d val %s\n", token,
775                      argstr[0].from);
776         } else if (token > Opt_last_string && token < Opt_last_bool) {
777                 dout("got Boolean token %d\n", token);
778         } else {
779                 dout("got token %d\n", token);
780         }
781
782         switch (token) {
783         case Opt_read_only:
784                 rbd_opts->read_only = true;
785                 break;
786         case Opt_read_write:
787                 rbd_opts->read_only = false;
788                 break;
789         default:
790                 rbd_assert(false);
791                 break;
792         }
793         return 0;
794 }
795
796 static char* obj_op_name(enum obj_operation_type op_type)
797 {
798         switch (op_type) {
799         case OBJ_OP_READ:
800                 return "read";
801         case OBJ_OP_WRITE:
802                 return "write";
803         case OBJ_OP_DISCARD:
804                 return "discard";
805         default:
806                 return "???";
807         }
808 }
809
810 /*
811  * Get a ceph client with specific addr and configuration, if one does
812  * not exist create it.  Either way, ceph_opts is consumed by this
813  * function.
814  */
815 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
816 {
817         struct rbd_client *rbdc;
818
819         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
820         rbdc = rbd_client_find(ceph_opts);
821         if (rbdc)       /* using an existing client */
822                 ceph_destroy_options(ceph_opts);
823         else
824                 rbdc = rbd_client_create(ceph_opts);
825         mutex_unlock(&client_mutex);
826
827         return rbdc;
828 }
829
830 /*
831  * Destroy ceph client
832  *
833  * Caller must hold rbd_client_list_lock.
834  */
835 static void rbd_client_release(struct kref *kref)
836 {
837         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
838
839         dout("%s: rbdc %p\n", __func__, rbdc);
840         spin_lock(&rbd_client_list_lock);
841         list_del(&rbdc->node);
842         spin_unlock(&rbd_client_list_lock);
843
844         ceph_destroy_client(rbdc->client);
845         kfree(rbdc);
846 }
847
848 /*
849  * Drop reference to ceph client node. If it's not referenced anymore, release
850  * it.
851  */
852 static void rbd_put_client(struct rbd_client *rbdc)
853 {
854         if (rbdc)
855                 kref_put(&rbdc->kref, rbd_client_release);
856 }
857
858 static bool rbd_image_format_valid(u32 image_format)
859 {
860         return image_format == 1 || image_format == 2;
861 }
862
863 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
864 {
865         size_t size;
866         u32 snap_count;
867
868         /* The header has to start with the magic rbd header text */
869         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
870                 return false;
871
872         /* The bio layer requires at least sector-sized I/O */
873
874         if (ondisk->options.order < SECTOR_SHIFT)
875                 return false;
876
877         /* If we use u64 in a few spots we may be able to loosen this */
878
879         if (ondisk->options.order > 8 * sizeof (int) - 1)
880                 return false;
881
882         /*
883          * The size of a snapshot header has to fit in a size_t, and
884          * that limits the number of snapshots.
885          */
886         snap_count = le32_to_cpu(ondisk->snap_count);
887         size = SIZE_MAX - sizeof (struct ceph_snap_context);
888         if (snap_count > size / sizeof (__le64))
889                 return false;
890
891         /*
892          * Not only that, but the size of the entire the snapshot
893          * header must also be representable in a size_t.
894          */
895         size -= snap_count * sizeof (__le64);
896         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
897                 return false;
898
899         return true;
900 }
901
902 /*
903  * Fill an rbd image header with information from the given format 1
904  * on-disk header.
905  */
906 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
907                                  struct rbd_image_header_ondisk *ondisk)
908 {
909         struct rbd_image_header *header = &rbd_dev->header;
910         bool first_time = header->object_prefix == NULL;
911         struct ceph_snap_context *snapc;
912         char *object_prefix = NULL;
913         char *snap_names = NULL;
914         u64 *snap_sizes = NULL;
915         u32 snap_count;
916         size_t size;
917         int ret = -ENOMEM;
918         u32 i;
919
920         /* Allocate this now to avoid having to handle failure below */
921
922         if (first_time) {
923                 size_t len;
924
925                 len = strnlen(ondisk->object_prefix,
926                                 sizeof (ondisk->object_prefix));
927                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
928                 if (!object_prefix)
929                         return -ENOMEM;
930                 memcpy(object_prefix, ondisk->object_prefix, len);
931                 object_prefix[len] = '\0';
932         }
933
934         /* Allocate the snapshot context and fill it in */
935
936         snap_count = le32_to_cpu(ondisk->snap_count);
937         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
938         if (!snapc)
939                 goto out_err;
940         snapc->seq = le64_to_cpu(ondisk->snap_seq);
941         if (snap_count) {
942                 struct rbd_image_snap_ondisk *snaps;
943                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
944
945                 /* We'll keep a copy of the snapshot names... */
946
947                 if (snap_names_len > (u64)SIZE_MAX)
948                         goto out_2big;
949                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
950                 if (!snap_names)
951                         goto out_err;
952
953                 /* ...as well as the array of their sizes. */
954
955                 size = snap_count * sizeof (*header->snap_sizes);
956                 snap_sizes = kmalloc(size, GFP_KERNEL);
957                 if (!snap_sizes)
958                         goto out_err;
959
960                 /*
961                  * Copy the names, and fill in each snapshot's id
962                  * and size.
963                  *
964                  * Note that rbd_dev_v1_header_info() guarantees the
965                  * ondisk buffer we're working with has
966                  * snap_names_len bytes beyond the end of the
967                  * snapshot id array, this memcpy() is safe.
968                  */
969                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
970                 snaps = ondisk->snaps;
971                 for (i = 0; i < snap_count; i++) {
972                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
973                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
974                 }
975         }
976
977         /* We won't fail any more, fill in the header */
978
979         if (first_time) {
980                 header->object_prefix = object_prefix;
981                 header->obj_order = ondisk->options.order;
982                 header->crypt_type = ondisk->options.crypt_type;
983                 header->comp_type = ondisk->options.comp_type;
984                 /* The rest aren't used for format 1 images */
985                 header->stripe_unit = 0;
986                 header->stripe_count = 0;
987                 header->features = 0;
988         } else {
989                 ceph_put_snap_context(header->snapc);
990                 kfree(header->snap_names);
991                 kfree(header->snap_sizes);
992         }
993
994         /* The remaining fields always get updated (when we refresh) */
995
996         header->image_size = le64_to_cpu(ondisk->image_size);
997         header->snapc = snapc;
998         header->snap_names = snap_names;
999         header->snap_sizes = snap_sizes;
1000
1001         return 0;
1002 out_2big:
1003         ret = -EIO;
1004 out_err:
1005         kfree(snap_sizes);
1006         kfree(snap_names);
1007         ceph_put_snap_context(snapc);
1008         kfree(object_prefix);
1009
1010         return ret;
1011 }
1012
1013 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1014 {
1015         const char *snap_name;
1016
1017         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1018
1019         /* Skip over names until we find the one we are looking for */
1020
1021         snap_name = rbd_dev->header.snap_names;
1022         while (which--)
1023                 snap_name += strlen(snap_name) + 1;
1024
1025         return kstrdup(snap_name, GFP_KERNEL);
1026 }
1027
1028 /*
1029  * Snapshot id comparison function for use with qsort()/bsearch().
1030  * Note that result is for snapshots in *descending* order.
1031  */
1032 static int snapid_compare_reverse(const void *s1, const void *s2)
1033 {
1034         u64 snap_id1 = *(u64 *)s1;
1035         u64 snap_id2 = *(u64 *)s2;
1036
1037         if (snap_id1 < snap_id2)
1038                 return 1;
1039         return snap_id1 == snap_id2 ? 0 : -1;
1040 }
1041
1042 /*
1043  * Search a snapshot context to see if the given snapshot id is
1044  * present.
1045  *
1046  * Returns the position of the snapshot id in the array if it's found,
1047  * or BAD_SNAP_INDEX otherwise.
1048  *
1049  * Note: The snapshot array is in kept sorted (by the osd) in
1050  * reverse order, highest snapshot id first.
1051  */
1052 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1053 {
1054         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1055         u64 *found;
1056
1057         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1058                                 sizeof (snap_id), snapid_compare_reverse);
1059
1060         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1061 }
1062
1063 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1064                                         u64 snap_id)
1065 {
1066         u32 which;
1067         const char *snap_name;
1068
1069         which = rbd_dev_snap_index(rbd_dev, snap_id);
1070         if (which == BAD_SNAP_INDEX)
1071                 return ERR_PTR(-ENOENT);
1072
1073         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1074         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1075 }
1076
1077 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1078 {
1079         if (snap_id == CEPH_NOSNAP)
1080                 return RBD_SNAP_HEAD_NAME;
1081
1082         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1083         if (rbd_dev->image_format == 1)
1084                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1085
1086         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1087 }
1088
1089 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1090                                 u64 *snap_size)
1091 {
1092         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1093         if (snap_id == CEPH_NOSNAP) {
1094                 *snap_size = rbd_dev->header.image_size;
1095         } else if (rbd_dev->image_format == 1) {
1096                 u32 which;
1097
1098                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1099                 if (which == BAD_SNAP_INDEX)
1100                         return -ENOENT;
1101
1102                 *snap_size = rbd_dev->header.snap_sizes[which];
1103         } else {
1104                 u64 size = 0;
1105                 int ret;
1106
1107                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1108                 if (ret)
1109                         return ret;
1110
1111                 *snap_size = size;
1112         }
1113         return 0;
1114 }
1115
1116 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1117                         u64 *snap_features)
1118 {
1119         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1120         if (snap_id == CEPH_NOSNAP) {
1121                 *snap_features = rbd_dev->header.features;
1122         } else if (rbd_dev->image_format == 1) {
1123                 *snap_features = 0;     /* No features for format 1 */
1124         } else {
1125                 u64 features = 0;
1126                 int ret;
1127
1128                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1129                 if (ret)
1130                         return ret;
1131
1132                 *snap_features = features;
1133         }
1134         return 0;
1135 }
1136
1137 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1138 {
1139         u64 snap_id = rbd_dev->spec->snap_id;
1140         u64 size = 0;
1141         u64 features = 0;
1142         int ret;
1143
1144         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1145         if (ret)
1146                 return ret;
1147         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1148         if (ret)
1149                 return ret;
1150
1151         rbd_dev->mapping.size = size;
1152         rbd_dev->mapping.features = features;
1153
1154         return 0;
1155 }
1156
1157 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1158 {
1159         rbd_dev->mapping.size = 0;
1160         rbd_dev->mapping.features = 0;
1161 }
1162
1163 static void rbd_segment_name_free(const char *name)
1164 {
1165         /* The explicit cast here is needed to drop the const qualifier */
1166
1167         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1168 }
1169
1170 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1171 {
1172         char *name;
1173         u64 segment;
1174         int ret;
1175         char *name_format;
1176
1177         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1178         if (!name)
1179                 return NULL;
1180         segment = offset >> rbd_dev->header.obj_order;
1181         name_format = "%s.%012llx";
1182         if (rbd_dev->image_format == 2)
1183                 name_format = "%s.%016llx";
1184         ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1185                         rbd_dev->header.object_prefix, segment);
1186         if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1187                 pr_err("error formatting segment name for #%llu (%d)\n",
1188                         segment, ret);
1189                 rbd_segment_name_free(name);
1190                 name = NULL;
1191         }
1192
1193         return name;
1194 }
1195
1196 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1197 {
1198         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1199
1200         return offset & (segment_size - 1);
1201 }
1202
1203 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1204                                 u64 offset, u64 length)
1205 {
1206         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1207
1208         offset &= segment_size - 1;
1209
1210         rbd_assert(length <= U64_MAX - offset);
1211         if (offset + length > segment_size)
1212                 length = segment_size - offset;
1213
1214         return length;
1215 }
1216
1217 /*
1218  * returns the size of an object in the image
1219  */
1220 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1221 {
1222         return 1 << header->obj_order;
1223 }
1224
1225 /*
1226  * bio helpers
1227  */
1228
1229 static void bio_chain_put(struct bio *chain)
1230 {
1231         struct bio *tmp;
1232
1233         while (chain) {
1234                 tmp = chain;
1235                 chain = chain->bi_next;
1236                 bio_put(tmp);
1237         }
1238 }
1239
1240 /*
1241  * zeros a bio chain, starting at specific offset
1242  */
1243 static void zero_bio_chain(struct bio *chain, int start_ofs)
1244 {
1245         struct bio_vec bv;
1246         struct bvec_iter iter;
1247         unsigned long flags;
1248         void *buf;
1249         int pos = 0;
1250
1251         while (chain) {
1252                 bio_for_each_segment(bv, chain, iter) {
1253                         if (pos + bv.bv_len > start_ofs) {
1254                                 int remainder = max(start_ofs - pos, 0);
1255                                 buf = bvec_kmap_irq(&bv, &flags);
1256                                 memset(buf + remainder, 0,
1257                                        bv.bv_len - remainder);
1258                                 flush_dcache_page(bv.bv_page);
1259                                 bvec_kunmap_irq(buf, &flags);
1260                         }
1261                         pos += bv.bv_len;
1262                 }
1263
1264                 chain = chain->bi_next;
1265         }
1266 }
1267
1268 /*
1269  * similar to zero_bio_chain(), zeros data defined by a page array,
1270  * starting at the given byte offset from the start of the array and
1271  * continuing up to the given end offset.  The pages array is
1272  * assumed to be big enough to hold all bytes up to the end.
1273  */
1274 static void zero_pages(struct page **pages, u64 offset, u64 end)
1275 {
1276         struct page **page = &pages[offset >> PAGE_SHIFT];
1277
1278         rbd_assert(end > offset);
1279         rbd_assert(end - offset <= (u64)SIZE_MAX);
1280         while (offset < end) {
1281                 size_t page_offset;
1282                 size_t length;
1283                 unsigned long flags;
1284                 void *kaddr;
1285
1286                 page_offset = offset & ~PAGE_MASK;
1287                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1288                 local_irq_save(flags);
1289                 kaddr = kmap_atomic(*page);
1290                 memset(kaddr + page_offset, 0, length);
1291                 flush_dcache_page(*page);
1292                 kunmap_atomic(kaddr);
1293                 local_irq_restore(flags);
1294
1295                 offset += length;
1296                 page++;
1297         }
1298 }
1299
1300 /*
1301  * Clone a portion of a bio, starting at the given byte offset
1302  * and continuing for the number of bytes indicated.
1303  */
1304 static struct bio *bio_clone_range(struct bio *bio_src,
1305                                         unsigned int offset,
1306                                         unsigned int len,
1307                                         gfp_t gfpmask)
1308 {
1309         struct bio *bio;
1310
1311         bio = bio_clone(bio_src, gfpmask);
1312         if (!bio)
1313                 return NULL;    /* ENOMEM */
1314
1315         bio_advance(bio, offset);
1316         bio->bi_iter.bi_size = len;
1317
1318         return bio;
1319 }
1320
1321 /*
1322  * Clone a portion of a bio chain, starting at the given byte offset
1323  * into the first bio in the source chain and continuing for the
1324  * number of bytes indicated.  The result is another bio chain of
1325  * exactly the given length, or a null pointer on error.
1326  *
1327  * The bio_src and offset parameters are both in-out.  On entry they
1328  * refer to the first source bio and the offset into that bio where
1329  * the start of data to be cloned is located.
1330  *
1331  * On return, bio_src is updated to refer to the bio in the source
1332  * chain that contains first un-cloned byte, and *offset will
1333  * contain the offset of that byte within that bio.
1334  */
1335 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1336                                         unsigned int *offset,
1337                                         unsigned int len,
1338                                         gfp_t gfpmask)
1339 {
1340         struct bio *bi = *bio_src;
1341         unsigned int off = *offset;
1342         struct bio *chain = NULL;
1343         struct bio **end;
1344
1345         /* Build up a chain of clone bios up to the limit */
1346
1347         if (!bi || off >= bi->bi_iter.bi_size || !len)
1348                 return NULL;            /* Nothing to clone */
1349
1350         end = &chain;
1351         while (len) {
1352                 unsigned int bi_size;
1353                 struct bio *bio;
1354
1355                 if (!bi) {
1356                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1357                         goto out_err;   /* EINVAL; ran out of bio's */
1358                 }
1359                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1360                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1361                 if (!bio)
1362                         goto out_err;   /* ENOMEM */
1363
1364                 *end = bio;
1365                 end = &bio->bi_next;
1366
1367                 off += bi_size;
1368                 if (off == bi->bi_iter.bi_size) {
1369                         bi = bi->bi_next;
1370                         off = 0;
1371                 }
1372                 len -= bi_size;
1373         }
1374         *bio_src = bi;
1375         *offset = off;
1376
1377         return chain;
1378 out_err:
1379         bio_chain_put(chain);
1380
1381         return NULL;
1382 }
1383
1384 /*
1385  * The default/initial value for all object request flags is 0.  For
1386  * each flag, once its value is set to 1 it is never reset to 0
1387  * again.
1388  */
1389 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1390 {
1391         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1392                 struct rbd_device *rbd_dev;
1393
1394                 rbd_dev = obj_request->img_request->rbd_dev;
1395                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1396                         obj_request);
1397         }
1398 }
1399
1400 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1401 {
1402         smp_mb();
1403         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1404 }
1405
1406 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1407 {
1408         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1409                 struct rbd_device *rbd_dev = NULL;
1410
1411                 if (obj_request_img_data_test(obj_request))
1412                         rbd_dev = obj_request->img_request->rbd_dev;
1413                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1414                         obj_request);
1415         }
1416 }
1417
1418 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1419 {
1420         smp_mb();
1421         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1422 }
1423
1424 /*
1425  * This sets the KNOWN flag after (possibly) setting the EXISTS
1426  * flag.  The latter is set based on the "exists" value provided.
1427  *
1428  * Note that for our purposes once an object exists it never goes
1429  * away again.  It's possible that the response from two existence
1430  * checks are separated by the creation of the target object, and
1431  * the first ("doesn't exist") response arrives *after* the second
1432  * ("does exist").  In that case we ignore the second one.
1433  */
1434 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1435                                 bool exists)
1436 {
1437         if (exists)
1438                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1439         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1440         smp_mb();
1441 }
1442
1443 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1444 {
1445         smp_mb();
1446         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1447 }
1448
1449 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1450 {
1451         smp_mb();
1452         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1453 }
1454
1455 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1456 {
1457         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1458
1459         return obj_request->img_offset <
1460             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1461 }
1462
1463 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1464 {
1465         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1466                 atomic_read(&obj_request->kref.refcount));
1467         kref_get(&obj_request->kref);
1468 }
1469
1470 static void rbd_obj_request_destroy(struct kref *kref);
1471 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1472 {
1473         rbd_assert(obj_request != NULL);
1474         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1475                 atomic_read(&obj_request->kref.refcount));
1476         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1477 }
1478
1479 static void rbd_img_request_get(struct rbd_img_request *img_request)
1480 {
1481         dout("%s: img %p (was %d)\n", __func__, img_request,
1482              atomic_read(&img_request->kref.refcount));
1483         kref_get(&img_request->kref);
1484 }
1485
1486 static bool img_request_child_test(struct rbd_img_request *img_request);
1487 static void rbd_parent_request_destroy(struct kref *kref);
1488 static void rbd_img_request_destroy(struct kref *kref);
1489 static void rbd_img_request_put(struct rbd_img_request *img_request)
1490 {
1491         rbd_assert(img_request != NULL);
1492         dout("%s: img %p (was %d)\n", __func__, img_request,
1493                 atomic_read(&img_request->kref.refcount));
1494         if (img_request_child_test(img_request))
1495                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1496         else
1497                 kref_put(&img_request->kref, rbd_img_request_destroy);
1498 }
1499
1500 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1501                                         struct rbd_obj_request *obj_request)
1502 {
1503         rbd_assert(obj_request->img_request == NULL);
1504
1505         /* Image request now owns object's original reference */
1506         obj_request->img_request = img_request;
1507         obj_request->which = img_request->obj_request_count;
1508         rbd_assert(!obj_request_img_data_test(obj_request));
1509         obj_request_img_data_set(obj_request);
1510         rbd_assert(obj_request->which != BAD_WHICH);
1511         img_request->obj_request_count++;
1512         list_add_tail(&obj_request->links, &img_request->obj_requests);
1513         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1514                 obj_request->which);
1515 }
1516
1517 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1518                                         struct rbd_obj_request *obj_request)
1519 {
1520         rbd_assert(obj_request->which != BAD_WHICH);
1521
1522         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1523                 obj_request->which);
1524         list_del(&obj_request->links);
1525         rbd_assert(img_request->obj_request_count > 0);
1526         img_request->obj_request_count--;
1527         rbd_assert(obj_request->which == img_request->obj_request_count);
1528         obj_request->which = BAD_WHICH;
1529         rbd_assert(obj_request_img_data_test(obj_request));
1530         rbd_assert(obj_request->img_request == img_request);
1531         obj_request->img_request = NULL;
1532         obj_request->callback = NULL;
1533         rbd_obj_request_put(obj_request);
1534 }
1535
1536 static bool obj_request_type_valid(enum obj_request_type type)
1537 {
1538         switch (type) {
1539         case OBJ_REQUEST_NODATA:
1540         case OBJ_REQUEST_BIO:
1541         case OBJ_REQUEST_PAGES:
1542                 return true;
1543         default:
1544                 return false;
1545         }
1546 }
1547
1548 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1549                                 struct rbd_obj_request *obj_request)
1550 {
1551         dout("%s %p\n", __func__, obj_request);
1552         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1553 }
1554
1555 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1556 {
1557         dout("%s %p\n", __func__, obj_request);
1558         ceph_osdc_cancel_request(obj_request->osd_req);
1559 }
1560
1561 /*
1562  * Wait for an object request to complete.  If interrupted, cancel the
1563  * underlying osd request.
1564  */
1565 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1566 {
1567         int ret;
1568
1569         dout("%s %p\n", __func__, obj_request);
1570
1571         ret = wait_for_completion_interruptible(&obj_request->completion);
1572         if (ret < 0) {
1573                 dout("%s %p interrupted\n", __func__, obj_request);
1574                 rbd_obj_request_end(obj_request);
1575                 return ret;
1576         }
1577
1578         dout("%s %p done\n", __func__, obj_request);
1579         return 0;
1580 }
1581
1582 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1583 {
1584
1585         dout("%s: img %p\n", __func__, img_request);
1586
1587         /*
1588          * If no error occurred, compute the aggregate transfer
1589          * count for the image request.  We could instead use
1590          * atomic64_cmpxchg() to update it as each object request
1591          * completes; not clear which way is better off hand.
1592          */
1593         if (!img_request->result) {
1594                 struct rbd_obj_request *obj_request;
1595                 u64 xferred = 0;
1596
1597                 for_each_obj_request(img_request, obj_request)
1598                         xferred += obj_request->xferred;
1599                 img_request->xferred = xferred;
1600         }
1601
1602         if (img_request->callback)
1603                 img_request->callback(img_request);
1604         else
1605                 rbd_img_request_put(img_request);
1606 }
1607
1608 /*
1609  * The default/initial value for all image request flags is 0.  Each
1610  * is conditionally set to 1 at image request initialization time
1611  * and currently never change thereafter.
1612  */
1613 static void img_request_write_set(struct rbd_img_request *img_request)
1614 {
1615         set_bit(IMG_REQ_WRITE, &img_request->flags);
1616         smp_mb();
1617 }
1618
1619 static bool img_request_write_test(struct rbd_img_request *img_request)
1620 {
1621         smp_mb();
1622         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1623 }
1624
1625 /*
1626  * Set the discard flag when the img_request is an discard request
1627  */
1628 static void img_request_discard_set(struct rbd_img_request *img_request)
1629 {
1630         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1631         smp_mb();
1632 }
1633
1634 static bool img_request_discard_test(struct rbd_img_request *img_request)
1635 {
1636         smp_mb();
1637         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1638 }
1639
1640 static void img_request_child_set(struct rbd_img_request *img_request)
1641 {
1642         set_bit(IMG_REQ_CHILD, &img_request->flags);
1643         smp_mb();
1644 }
1645
1646 static void img_request_child_clear(struct rbd_img_request *img_request)
1647 {
1648         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1649         smp_mb();
1650 }
1651
1652 static bool img_request_child_test(struct rbd_img_request *img_request)
1653 {
1654         smp_mb();
1655         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1656 }
1657
1658 static void img_request_layered_set(struct rbd_img_request *img_request)
1659 {
1660         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1661         smp_mb();
1662 }
1663
1664 static void img_request_layered_clear(struct rbd_img_request *img_request)
1665 {
1666         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1667         smp_mb();
1668 }
1669
1670 static bool img_request_layered_test(struct rbd_img_request *img_request)
1671 {
1672         smp_mb();
1673         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1674 }
1675
1676 static enum obj_operation_type
1677 rbd_img_request_op_type(struct rbd_img_request *img_request)
1678 {
1679         if (img_request_write_test(img_request))
1680                 return OBJ_OP_WRITE;
1681         else if (img_request_discard_test(img_request))
1682                 return OBJ_OP_DISCARD;
1683         else
1684                 return OBJ_OP_READ;
1685 }
1686
1687 static void
1688 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1689 {
1690         u64 xferred = obj_request->xferred;
1691         u64 length = obj_request->length;
1692
1693         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1694                 obj_request, obj_request->img_request, obj_request->result,
1695                 xferred, length);
1696         /*
1697          * ENOENT means a hole in the image.  We zero-fill the entire
1698          * length of the request.  A short read also implies zero-fill
1699          * to the end of the request.  An error requires the whole
1700          * length of the request to be reported finished with an error
1701          * to the block layer.  In each case we update the xferred
1702          * count to indicate the whole request was satisfied.
1703          */
1704         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1705         if (obj_request->result == -ENOENT) {
1706                 if (obj_request->type == OBJ_REQUEST_BIO)
1707                         zero_bio_chain(obj_request->bio_list, 0);
1708                 else
1709                         zero_pages(obj_request->pages, 0, length);
1710                 obj_request->result = 0;
1711         } else if (xferred < length && !obj_request->result) {
1712                 if (obj_request->type == OBJ_REQUEST_BIO)
1713                         zero_bio_chain(obj_request->bio_list, xferred);
1714                 else
1715                         zero_pages(obj_request->pages, xferred, length);
1716         }
1717         obj_request->xferred = length;
1718         obj_request_done_set(obj_request);
1719 }
1720
1721 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1722 {
1723         dout("%s: obj %p cb %p\n", __func__, obj_request,
1724                 obj_request->callback);
1725         if (obj_request->callback)
1726                 obj_request->callback(obj_request);
1727         else
1728                 complete_all(&obj_request->completion);
1729 }
1730
1731 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1732 {
1733         dout("%s: obj %p\n", __func__, obj_request);
1734         obj_request_done_set(obj_request);
1735 }
1736
1737 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1738 {
1739         struct rbd_img_request *img_request = NULL;
1740         struct rbd_device *rbd_dev = NULL;
1741         bool layered = false;
1742
1743         if (obj_request_img_data_test(obj_request)) {
1744                 img_request = obj_request->img_request;
1745                 layered = img_request && img_request_layered_test(img_request);
1746                 rbd_dev = img_request->rbd_dev;
1747         }
1748
1749         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1750                 obj_request, img_request, obj_request->result,
1751                 obj_request->xferred, obj_request->length);
1752         if (layered && obj_request->result == -ENOENT &&
1753                         obj_request->img_offset < rbd_dev->parent_overlap)
1754                 rbd_img_parent_read(obj_request);
1755         else if (img_request)
1756                 rbd_img_obj_request_read_callback(obj_request);
1757         else
1758                 obj_request_done_set(obj_request);
1759 }
1760
1761 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1762 {
1763         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1764                 obj_request->result, obj_request->length);
1765         /*
1766          * There is no such thing as a successful short write.  Set
1767          * it to our originally-requested length.
1768          */
1769         obj_request->xferred = obj_request->length;
1770         obj_request_done_set(obj_request);
1771 }
1772
1773 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1774 {
1775         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1776                 obj_request->result, obj_request->length);
1777         /*
1778          * There is no such thing as a successful short discard.  Set
1779          * it to our originally-requested length.
1780          */
1781         obj_request->xferred = obj_request->length;
1782         /* discarding a non-existent object is not a problem */
1783         if (obj_request->result == -ENOENT)
1784                 obj_request->result = 0;
1785         obj_request_done_set(obj_request);
1786 }
1787
1788 /*
1789  * For a simple stat call there's nothing to do.  We'll do more if
1790  * this is part of a write sequence for a layered image.
1791  */
1792 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1793 {
1794         dout("%s: obj %p\n", __func__, obj_request);
1795         obj_request_done_set(obj_request);
1796 }
1797
1798 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1799                                 struct ceph_msg *msg)
1800 {
1801         struct rbd_obj_request *obj_request = osd_req->r_priv;
1802         u16 opcode;
1803
1804         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1805         rbd_assert(osd_req == obj_request->osd_req);
1806         if (obj_request_img_data_test(obj_request)) {
1807                 rbd_assert(obj_request->img_request);
1808                 rbd_assert(obj_request->which != BAD_WHICH);
1809         } else {
1810                 rbd_assert(obj_request->which == BAD_WHICH);
1811         }
1812
1813         if (osd_req->r_result < 0)
1814                 obj_request->result = osd_req->r_result;
1815
1816         rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1817
1818         /*
1819          * We support a 64-bit length, but ultimately it has to be
1820          * passed to blk_end_request(), which takes an unsigned int.
1821          */
1822         obj_request->xferred = osd_req->r_reply_op_len[0];
1823         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1824
1825         opcode = osd_req->r_ops[0].op;
1826         switch (opcode) {
1827         case CEPH_OSD_OP_READ:
1828                 rbd_osd_read_callback(obj_request);
1829                 break;
1830         case CEPH_OSD_OP_SETALLOCHINT:
1831                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1832                 /* fall through */
1833         case CEPH_OSD_OP_WRITE:
1834                 rbd_osd_write_callback(obj_request);
1835                 break;
1836         case CEPH_OSD_OP_STAT:
1837                 rbd_osd_stat_callback(obj_request);
1838                 break;
1839         case CEPH_OSD_OP_DELETE:
1840         case CEPH_OSD_OP_TRUNCATE:
1841         case CEPH_OSD_OP_ZERO:
1842                 rbd_osd_discard_callback(obj_request);
1843                 break;
1844         case CEPH_OSD_OP_CALL:
1845         case CEPH_OSD_OP_NOTIFY_ACK:
1846         case CEPH_OSD_OP_WATCH:
1847                 rbd_osd_trivial_callback(obj_request);
1848                 break;
1849         default:
1850                 rbd_warn(NULL, "%s: unsupported op %hu",
1851                         obj_request->object_name, (unsigned short) opcode);
1852                 break;
1853         }
1854
1855         if (obj_request_done_test(obj_request))
1856                 rbd_obj_request_complete(obj_request);
1857 }
1858
1859 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1860 {
1861         struct rbd_img_request *img_request = obj_request->img_request;
1862         struct ceph_osd_request *osd_req = obj_request->osd_req;
1863         u64 snap_id;
1864
1865         rbd_assert(osd_req != NULL);
1866
1867         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1868         ceph_osdc_build_request(osd_req, obj_request->offset,
1869                         NULL, snap_id, NULL);
1870 }
1871
1872 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1873 {
1874         struct rbd_img_request *img_request = obj_request->img_request;
1875         struct ceph_osd_request *osd_req = obj_request->osd_req;
1876         struct ceph_snap_context *snapc;
1877         struct timespec mtime = CURRENT_TIME;
1878
1879         rbd_assert(osd_req != NULL);
1880
1881         snapc = img_request ? img_request->snapc : NULL;
1882         ceph_osdc_build_request(osd_req, obj_request->offset,
1883                         snapc, CEPH_NOSNAP, &mtime);
1884 }
1885
1886 /*
1887  * Create an osd request.  A read request has one osd op (read).
1888  * A write request has either one (watch) or two (hint+write) osd ops.
1889  * (All rbd data writes are prefixed with an allocation hint op, but
1890  * technically osd watch is a write request, hence this distinction.)
1891  */
1892 static struct ceph_osd_request *rbd_osd_req_create(
1893                                         struct rbd_device *rbd_dev,
1894                                         enum obj_operation_type op_type,
1895                                         unsigned int num_ops,
1896                                         struct rbd_obj_request *obj_request)
1897 {
1898         struct ceph_snap_context *snapc = NULL;
1899         struct ceph_osd_client *osdc;
1900         struct ceph_osd_request *osd_req;
1901
1902         if (obj_request_img_data_test(obj_request) &&
1903                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1904                 struct rbd_img_request *img_request = obj_request->img_request;
1905                 if (op_type == OBJ_OP_WRITE) {
1906                         rbd_assert(img_request_write_test(img_request));
1907                 } else {
1908                         rbd_assert(img_request_discard_test(img_request));
1909                 }
1910                 snapc = img_request->snapc;
1911         }
1912
1913         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1914
1915         /* Allocate and initialize the request, for the num_ops ops */
1916
1917         osdc = &rbd_dev->rbd_client->client->osdc;
1918         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1919                                           GFP_ATOMIC);
1920         if (!osd_req)
1921                 return NULL;    /* ENOMEM */
1922
1923         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
1924                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1925         else
1926                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1927
1928         osd_req->r_callback = rbd_osd_req_callback;
1929         osd_req->r_priv = obj_request;
1930
1931         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1932         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1933
1934         return osd_req;
1935 }
1936
1937 /*
1938  * Create a copyup osd request based on the information in the object
1939  * request supplied.  A copyup request has two or three osd ops, a
1940  * copyup method call, potentially a hint op, and a write or truncate
1941  * or zero op.
1942  */
1943 static struct ceph_osd_request *
1944 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1945 {
1946         struct rbd_img_request *img_request;
1947         struct ceph_snap_context *snapc;
1948         struct rbd_device *rbd_dev;
1949         struct ceph_osd_client *osdc;
1950         struct ceph_osd_request *osd_req;
1951         int num_osd_ops = 3;
1952
1953         rbd_assert(obj_request_img_data_test(obj_request));
1954         img_request = obj_request->img_request;
1955         rbd_assert(img_request);
1956         rbd_assert(img_request_write_test(img_request) ||
1957                         img_request_discard_test(img_request));
1958
1959         if (img_request_discard_test(img_request))
1960                 num_osd_ops = 2;
1961
1962         /* Allocate and initialize the request, for all the ops */
1963
1964         snapc = img_request->snapc;
1965         rbd_dev = img_request->rbd_dev;
1966         osdc = &rbd_dev->rbd_client->client->osdc;
1967         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
1968                                                 false, GFP_ATOMIC);
1969         if (!osd_req)
1970                 return NULL;    /* ENOMEM */
1971
1972         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1973         osd_req->r_callback = rbd_osd_req_callback;
1974         osd_req->r_priv = obj_request;
1975
1976         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1977         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1978
1979         return osd_req;
1980 }
1981
1982
1983 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1984 {
1985         ceph_osdc_put_request(osd_req);
1986 }
1987
1988 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1989
1990 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1991                                                 u64 offset, u64 length,
1992                                                 enum obj_request_type type)
1993 {
1994         struct rbd_obj_request *obj_request;
1995         size_t size;
1996         char *name;
1997
1998         rbd_assert(obj_request_type_valid(type));
1999
2000         size = strlen(object_name) + 1;
2001         name = kmalloc(size, GFP_KERNEL);
2002         if (!name)
2003                 return NULL;
2004
2005         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
2006         if (!obj_request) {
2007                 kfree(name);
2008                 return NULL;
2009         }
2010
2011         obj_request->object_name = memcpy(name, object_name, size);
2012         obj_request->offset = offset;
2013         obj_request->length = length;
2014         obj_request->flags = 0;
2015         obj_request->which = BAD_WHICH;
2016         obj_request->type = type;
2017         INIT_LIST_HEAD(&obj_request->links);
2018         init_completion(&obj_request->completion);
2019         kref_init(&obj_request->kref);
2020
2021         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2022                 offset, length, (int)type, obj_request);
2023
2024         return obj_request;
2025 }
2026
2027 static void rbd_obj_request_destroy(struct kref *kref)
2028 {
2029         struct rbd_obj_request *obj_request;
2030
2031         obj_request = container_of(kref, struct rbd_obj_request, kref);
2032
2033         dout("%s: obj %p\n", __func__, obj_request);
2034
2035         rbd_assert(obj_request->img_request == NULL);
2036         rbd_assert(obj_request->which == BAD_WHICH);
2037
2038         if (obj_request->osd_req)
2039                 rbd_osd_req_destroy(obj_request->osd_req);
2040
2041         rbd_assert(obj_request_type_valid(obj_request->type));
2042         switch (obj_request->type) {
2043         case OBJ_REQUEST_NODATA:
2044                 break;          /* Nothing to do */
2045         case OBJ_REQUEST_BIO:
2046                 if (obj_request->bio_list)
2047                         bio_chain_put(obj_request->bio_list);
2048                 break;
2049         case OBJ_REQUEST_PAGES:
2050                 if (obj_request->pages)
2051                         ceph_release_page_vector(obj_request->pages,
2052                                                 obj_request->page_count);
2053                 break;
2054         }
2055
2056         kfree(obj_request->object_name);
2057         obj_request->object_name = NULL;
2058         kmem_cache_free(rbd_obj_request_cache, obj_request);
2059 }
2060
2061 /* It's OK to call this for a device with no parent */
2062
2063 static void rbd_spec_put(struct rbd_spec *spec);
2064 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2065 {
2066         rbd_dev_remove_parent(rbd_dev);
2067         rbd_spec_put(rbd_dev->parent_spec);
2068         rbd_dev->parent_spec = NULL;
2069         rbd_dev->parent_overlap = 0;
2070 }
2071
2072 /*
2073  * Parent image reference counting is used to determine when an
2074  * image's parent fields can be safely torn down--after there are no
2075  * more in-flight requests to the parent image.  When the last
2076  * reference is dropped, cleaning them up is safe.
2077  */
2078 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2079 {
2080         int counter;
2081
2082         if (!rbd_dev->parent_spec)
2083                 return;
2084
2085         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2086         if (counter > 0)
2087                 return;
2088
2089         /* Last reference; clean up parent data structures */
2090
2091         if (!counter)
2092                 rbd_dev_unparent(rbd_dev);
2093         else
2094                 rbd_warn(rbd_dev, "parent reference underflow");
2095 }
2096
2097 /*
2098  * If an image has a non-zero parent overlap, get a reference to its
2099  * parent.
2100  *
2101  * We must get the reference before checking for the overlap to
2102  * coordinate properly with zeroing the parent overlap in
2103  * rbd_dev_v2_parent_info() when an image gets flattened.  We
2104  * drop it again if there is no overlap.
2105  *
2106  * Returns true if the rbd device has a parent with a non-zero
2107  * overlap and a reference for it was successfully taken, or
2108  * false otherwise.
2109  */
2110 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2111 {
2112         int counter;
2113
2114         if (!rbd_dev->parent_spec)
2115                 return false;
2116
2117         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2118         if (counter > 0 && rbd_dev->parent_overlap)
2119                 return true;
2120
2121         /* Image was flattened, but parent is not yet torn down */
2122
2123         if (counter < 0)
2124                 rbd_warn(rbd_dev, "parent reference overflow");
2125
2126         return false;
2127 }
2128
2129 /*
2130  * Caller is responsible for filling in the list of object requests
2131  * that comprises the image request, and the Linux request pointer
2132  * (if there is one).
2133  */
2134 static struct rbd_img_request *rbd_img_request_create(
2135                                         struct rbd_device *rbd_dev,
2136                                         u64 offset, u64 length,
2137                                         enum obj_operation_type op_type,
2138                                         struct ceph_snap_context *snapc)
2139 {
2140         struct rbd_img_request *img_request;
2141
2142         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2143         if (!img_request)
2144                 return NULL;
2145
2146         img_request->rq = NULL;
2147         img_request->rbd_dev = rbd_dev;
2148         img_request->offset = offset;
2149         img_request->length = length;
2150         img_request->flags = 0;
2151         if (op_type == OBJ_OP_DISCARD) {
2152                 img_request_discard_set(img_request);
2153                 img_request->snapc = snapc;
2154         } else if (op_type == OBJ_OP_WRITE) {
2155                 img_request_write_set(img_request);
2156                 img_request->snapc = snapc;
2157         } else {
2158                 img_request->snap_id = rbd_dev->spec->snap_id;
2159         }
2160         if (rbd_dev_parent_get(rbd_dev))
2161                 img_request_layered_set(img_request);
2162         spin_lock_init(&img_request->completion_lock);
2163         img_request->next_completion = 0;
2164         img_request->callback = NULL;
2165         img_request->result = 0;
2166         img_request->obj_request_count = 0;
2167         INIT_LIST_HEAD(&img_request->obj_requests);
2168         kref_init(&img_request->kref);
2169
2170         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2171                 obj_op_name(op_type), offset, length, img_request);
2172
2173         return img_request;
2174 }
2175
2176 static void rbd_img_request_destroy(struct kref *kref)
2177 {
2178         struct rbd_img_request *img_request;
2179         struct rbd_obj_request *obj_request;
2180         struct rbd_obj_request *next_obj_request;
2181
2182         img_request = container_of(kref, struct rbd_img_request, kref);
2183
2184         dout("%s: img %p\n", __func__, img_request);
2185
2186         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2187                 rbd_img_obj_request_del(img_request, obj_request);
2188         rbd_assert(img_request->obj_request_count == 0);
2189
2190         if (img_request_layered_test(img_request)) {
2191                 img_request_layered_clear(img_request);
2192                 rbd_dev_parent_put(img_request->rbd_dev);
2193         }
2194
2195         if (img_request_write_test(img_request) ||
2196                 img_request_discard_test(img_request))
2197                 ceph_put_snap_context(img_request->snapc);
2198
2199         kmem_cache_free(rbd_img_request_cache, img_request);
2200 }
2201
2202 static struct rbd_img_request *rbd_parent_request_create(
2203                                         struct rbd_obj_request *obj_request,
2204                                         u64 img_offset, u64 length)
2205 {
2206         struct rbd_img_request *parent_request;
2207         struct rbd_device *rbd_dev;
2208
2209         rbd_assert(obj_request->img_request);
2210         rbd_dev = obj_request->img_request->rbd_dev;
2211
2212         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2213                                                 length, OBJ_OP_READ, NULL);
2214         if (!parent_request)
2215                 return NULL;
2216
2217         img_request_child_set(parent_request);
2218         rbd_obj_request_get(obj_request);
2219         parent_request->obj_request = obj_request;
2220
2221         return parent_request;
2222 }
2223
2224 static void rbd_parent_request_destroy(struct kref *kref)
2225 {
2226         struct rbd_img_request *parent_request;
2227         struct rbd_obj_request *orig_request;
2228
2229         parent_request = container_of(kref, struct rbd_img_request, kref);
2230         orig_request = parent_request->obj_request;
2231
2232         parent_request->obj_request = NULL;
2233         rbd_obj_request_put(orig_request);
2234         img_request_child_clear(parent_request);
2235
2236         rbd_img_request_destroy(kref);
2237 }
2238
2239 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2240 {
2241         struct rbd_img_request *img_request;
2242         unsigned int xferred;
2243         int result;
2244         bool more;
2245
2246         rbd_assert(obj_request_img_data_test(obj_request));
2247         img_request = obj_request->img_request;
2248
2249         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2250         xferred = (unsigned int)obj_request->xferred;
2251         result = obj_request->result;
2252         if (result) {
2253                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2254                 enum obj_operation_type op_type;
2255
2256                 if (img_request_discard_test(img_request))
2257                         op_type = OBJ_OP_DISCARD;
2258                 else if (img_request_write_test(img_request))
2259                         op_type = OBJ_OP_WRITE;
2260                 else
2261                         op_type = OBJ_OP_READ;
2262
2263                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2264                         obj_op_name(op_type), obj_request->length,
2265                         obj_request->img_offset, obj_request->offset);
2266                 rbd_warn(rbd_dev, "  result %d xferred %x",
2267                         result, xferred);
2268                 if (!img_request->result)
2269                         img_request->result = result;
2270         }
2271
2272         /* Image object requests don't own their page array */
2273
2274         if (obj_request->type == OBJ_REQUEST_PAGES) {
2275                 obj_request->pages = NULL;
2276                 obj_request->page_count = 0;
2277         }
2278
2279         if (img_request_child_test(img_request)) {
2280                 rbd_assert(img_request->obj_request != NULL);
2281                 more = obj_request->which < img_request->obj_request_count - 1;
2282         } else {
2283                 rbd_assert(img_request->rq != NULL);
2284                 more = blk_end_request(img_request->rq, result, xferred);
2285         }
2286
2287         return more;
2288 }
2289
2290 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2291 {
2292         struct rbd_img_request *img_request;
2293         u32 which = obj_request->which;
2294         bool more = true;
2295
2296         rbd_assert(obj_request_img_data_test(obj_request));
2297         img_request = obj_request->img_request;
2298
2299         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2300         rbd_assert(img_request != NULL);
2301         rbd_assert(img_request->obj_request_count > 0);
2302         rbd_assert(which != BAD_WHICH);
2303         rbd_assert(which < img_request->obj_request_count);
2304
2305         spin_lock_irq(&img_request->completion_lock);
2306         if (which != img_request->next_completion)
2307                 goto out;
2308
2309         for_each_obj_request_from(img_request, obj_request) {
2310                 rbd_assert(more);
2311                 rbd_assert(which < img_request->obj_request_count);
2312
2313                 if (!obj_request_done_test(obj_request))
2314                         break;
2315                 more = rbd_img_obj_end_request(obj_request);
2316                 which++;
2317         }
2318
2319         rbd_assert(more ^ (which == img_request->obj_request_count));
2320         img_request->next_completion = which;
2321 out:
2322         spin_unlock_irq(&img_request->completion_lock);
2323         rbd_img_request_put(img_request);
2324
2325         if (!more)
2326                 rbd_img_request_complete(img_request);
2327 }
2328
2329 /*
2330  * Add individual osd ops to the given ceph_osd_request and prepare
2331  * them for submission. num_ops is the current number of
2332  * osd operations already to the object request.
2333  */
2334 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2335                                 struct ceph_osd_request *osd_request,
2336                                 enum obj_operation_type op_type,
2337                                 unsigned int num_ops)
2338 {
2339         struct rbd_img_request *img_request = obj_request->img_request;
2340         struct rbd_device *rbd_dev = img_request->rbd_dev;
2341         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2342         u64 offset = obj_request->offset;
2343         u64 length = obj_request->length;
2344         u64 img_end;
2345         u16 opcode;
2346
2347         if (op_type == OBJ_OP_DISCARD) {
2348                 if (!offset && length == object_size &&
2349                     (!img_request_layered_test(img_request) ||
2350                      !obj_request_overlaps_parent(obj_request))) {
2351                         opcode = CEPH_OSD_OP_DELETE;
2352                 } else if ((offset + length == object_size)) {
2353                         opcode = CEPH_OSD_OP_TRUNCATE;
2354                 } else {
2355                         down_read(&rbd_dev->header_rwsem);
2356                         img_end = rbd_dev->header.image_size;
2357                         up_read(&rbd_dev->header_rwsem);
2358
2359                         if (obj_request->img_offset + length == img_end)
2360                                 opcode = CEPH_OSD_OP_TRUNCATE;
2361                         else
2362                                 opcode = CEPH_OSD_OP_ZERO;
2363                 }
2364         } else if (op_type == OBJ_OP_WRITE) {
2365                 opcode = CEPH_OSD_OP_WRITE;
2366                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2367                                         object_size, object_size);
2368                 num_ops++;
2369         } else {
2370                 opcode = CEPH_OSD_OP_READ;
2371         }
2372
2373         if (opcode == CEPH_OSD_OP_DELETE)
2374                 osd_req_op_init(osd_request, num_ops, opcode);
2375         else
2376                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2377                                        offset, length, 0, 0);
2378
2379         if (obj_request->type == OBJ_REQUEST_BIO)
2380                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2381                                         obj_request->bio_list, length);
2382         else if (obj_request->type == OBJ_REQUEST_PAGES)
2383                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2384                                         obj_request->pages, length,
2385                                         offset & ~PAGE_MASK, false, false);
2386
2387         /* Discards are also writes */
2388         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2389                 rbd_osd_req_format_write(obj_request);
2390         else
2391                 rbd_osd_req_format_read(obj_request);
2392 }
2393
2394 /*
2395  * Split up an image request into one or more object requests, each
2396  * to a different object.  The "type" parameter indicates whether
2397  * "data_desc" is the pointer to the head of a list of bio
2398  * structures, or the base of a page array.  In either case this
2399  * function assumes data_desc describes memory sufficient to hold
2400  * all data described by the image request.
2401  */
2402 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2403                                         enum obj_request_type type,
2404                                         void *data_desc)
2405 {
2406         struct rbd_device *rbd_dev = img_request->rbd_dev;
2407         struct rbd_obj_request *obj_request = NULL;
2408         struct rbd_obj_request *next_obj_request;
2409         struct bio *bio_list = NULL;
2410         unsigned int bio_offset = 0;
2411         struct page **pages = NULL;
2412         enum obj_operation_type op_type;
2413         u64 img_offset;
2414         u64 resid;
2415
2416         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2417                 (int)type, data_desc);
2418
2419         img_offset = img_request->offset;
2420         resid = img_request->length;
2421         rbd_assert(resid > 0);
2422         op_type = rbd_img_request_op_type(img_request);
2423
2424         if (type == OBJ_REQUEST_BIO) {
2425                 bio_list = data_desc;
2426                 rbd_assert(img_offset ==
2427                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2428         } else if (type == OBJ_REQUEST_PAGES) {
2429                 pages = data_desc;
2430         }
2431
2432         while (resid) {
2433                 struct ceph_osd_request *osd_req;
2434                 const char *object_name;
2435                 u64 offset;
2436                 u64 length;
2437
2438                 object_name = rbd_segment_name(rbd_dev, img_offset);
2439                 if (!object_name)
2440                         goto out_unwind;
2441                 offset = rbd_segment_offset(rbd_dev, img_offset);
2442                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2443                 obj_request = rbd_obj_request_create(object_name,
2444                                                 offset, length, type);
2445                 /* object request has its own copy of the object name */
2446                 rbd_segment_name_free(object_name);
2447                 if (!obj_request)
2448                         goto out_unwind;
2449
2450                 /*
2451                  * set obj_request->img_request before creating the
2452                  * osd_request so that it gets the right snapc
2453                  */
2454                 rbd_img_obj_request_add(img_request, obj_request);
2455
2456                 if (type == OBJ_REQUEST_BIO) {
2457                         unsigned int clone_size;
2458
2459                         rbd_assert(length <= (u64)UINT_MAX);
2460                         clone_size = (unsigned int)length;
2461                         obj_request->bio_list =
2462                                         bio_chain_clone_range(&bio_list,
2463                                                                 &bio_offset,
2464                                                                 clone_size,
2465                                                                 GFP_ATOMIC);
2466                         if (!obj_request->bio_list)
2467                                 goto out_unwind;
2468                 } else if (type == OBJ_REQUEST_PAGES) {
2469                         unsigned int page_count;
2470
2471                         obj_request->pages = pages;
2472                         page_count = (u32)calc_pages_for(offset, length);
2473                         obj_request->page_count = page_count;
2474                         if ((offset + length) & ~PAGE_MASK)
2475                                 page_count--;   /* more on last page */
2476                         pages += page_count;
2477                 }
2478
2479                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2480                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2481                                         obj_request);
2482                 if (!osd_req)
2483                         goto out_unwind;
2484
2485                 obj_request->osd_req = osd_req;
2486                 obj_request->callback = rbd_img_obj_callback;
2487                 obj_request->img_offset = img_offset;
2488
2489                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2490
2491                 rbd_img_request_get(img_request);
2492
2493                 img_offset += length;
2494                 resid -= length;
2495         }
2496
2497         return 0;
2498
2499 out_unwind:
2500         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2501                 rbd_img_obj_request_del(img_request, obj_request);
2502
2503         return -ENOMEM;
2504 }
2505
2506 static void
2507 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2508 {
2509         struct rbd_img_request *img_request;
2510         struct rbd_device *rbd_dev;
2511         struct page **pages;
2512         u32 page_count;
2513
2514         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2515                 obj_request->type == OBJ_REQUEST_NODATA);
2516         rbd_assert(obj_request_img_data_test(obj_request));
2517         img_request = obj_request->img_request;
2518         rbd_assert(img_request);
2519
2520         rbd_dev = img_request->rbd_dev;
2521         rbd_assert(rbd_dev);
2522
2523         pages = obj_request->copyup_pages;
2524         rbd_assert(pages != NULL);
2525         obj_request->copyup_pages = NULL;
2526         page_count = obj_request->copyup_page_count;
2527         rbd_assert(page_count);
2528         obj_request->copyup_page_count = 0;
2529         ceph_release_page_vector(pages, page_count);
2530
2531         /*
2532          * We want the transfer count to reflect the size of the
2533          * original write request.  There is no such thing as a
2534          * successful short write, so if the request was successful
2535          * we can just set it to the originally-requested length.
2536          */
2537         if (!obj_request->result)
2538                 obj_request->xferred = obj_request->length;
2539
2540         /* Finish up with the normal image object callback */
2541
2542         rbd_img_obj_callback(obj_request);
2543 }
2544
2545 static void
2546 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2547 {
2548         struct rbd_obj_request *orig_request;
2549         struct ceph_osd_request *osd_req;
2550         struct ceph_osd_client *osdc;
2551         struct rbd_device *rbd_dev;
2552         struct page **pages;
2553         enum obj_operation_type op_type;
2554         u32 page_count;
2555         int img_result;
2556         u64 parent_length;
2557
2558         rbd_assert(img_request_child_test(img_request));
2559
2560         /* First get what we need from the image request */
2561
2562         pages = img_request->copyup_pages;
2563         rbd_assert(pages != NULL);
2564         img_request->copyup_pages = NULL;
2565         page_count = img_request->copyup_page_count;
2566         rbd_assert(page_count);
2567         img_request->copyup_page_count = 0;
2568
2569         orig_request = img_request->obj_request;
2570         rbd_assert(orig_request != NULL);
2571         rbd_assert(obj_request_type_valid(orig_request->type));
2572         img_result = img_request->result;
2573         parent_length = img_request->length;
2574         rbd_assert(parent_length == img_request->xferred);
2575         rbd_img_request_put(img_request);
2576
2577         rbd_assert(orig_request->img_request);
2578         rbd_dev = orig_request->img_request->rbd_dev;
2579         rbd_assert(rbd_dev);
2580
2581         /*
2582          * If the overlap has become 0 (most likely because the
2583          * image has been flattened) we need to free the pages
2584          * and re-submit the original write request.
2585          */
2586         if (!rbd_dev->parent_overlap) {
2587                 struct ceph_osd_client *osdc;
2588
2589                 ceph_release_page_vector(pages, page_count);
2590                 osdc = &rbd_dev->rbd_client->client->osdc;
2591                 img_result = rbd_obj_request_submit(osdc, orig_request);
2592                 if (!img_result)
2593                         return;
2594         }
2595
2596         if (img_result)
2597                 goto out_err;
2598
2599         /*
2600          * The original osd request is of no use to use any more.
2601          * We need a new one that can hold the three ops in a copyup
2602          * request.  Allocate the new copyup osd request for the
2603          * original request, and release the old one.
2604          */
2605         img_result = -ENOMEM;
2606         osd_req = rbd_osd_req_create_copyup(orig_request);
2607         if (!osd_req)
2608                 goto out_err;
2609         rbd_osd_req_destroy(orig_request->osd_req);
2610         orig_request->osd_req = osd_req;
2611         orig_request->copyup_pages = pages;
2612         orig_request->copyup_page_count = page_count;
2613
2614         /* Initialize the copyup op */
2615
2616         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2617         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2618                                                 false, false);
2619
2620         /* Add the other op(s) */
2621
2622         op_type = rbd_img_request_op_type(orig_request->img_request);
2623         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2624
2625         /* All set, send it off. */
2626
2627         orig_request->callback = rbd_img_obj_copyup_callback;
2628         osdc = &rbd_dev->rbd_client->client->osdc;
2629         img_result = rbd_obj_request_submit(osdc, orig_request);
2630         if (!img_result)
2631                 return;
2632 out_err:
2633         /* Record the error code and complete the request */
2634
2635         orig_request->result = img_result;
2636         orig_request->xferred = 0;
2637         obj_request_done_set(orig_request);
2638         rbd_obj_request_complete(orig_request);
2639 }
2640
2641 /*
2642  * Read from the parent image the range of data that covers the
2643  * entire target of the given object request.  This is used for
2644  * satisfying a layered image write request when the target of an
2645  * object request from the image request does not exist.
2646  *
2647  * A page array big enough to hold the returned data is allocated
2648  * and supplied to rbd_img_request_fill() as the "data descriptor."
2649  * When the read completes, this page array will be transferred to
2650  * the original object request for the copyup operation.
2651  *
2652  * If an error occurs, record it as the result of the original
2653  * object request and mark it done so it gets completed.
2654  */
2655 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2656 {
2657         struct rbd_img_request *img_request = NULL;
2658         struct rbd_img_request *parent_request = NULL;
2659         struct rbd_device *rbd_dev;
2660         u64 img_offset;
2661         u64 length;
2662         struct page **pages = NULL;
2663         u32 page_count;
2664         int result;
2665
2666         rbd_assert(obj_request_img_data_test(obj_request));
2667         rbd_assert(obj_request_type_valid(obj_request->type));
2668
2669         img_request = obj_request->img_request;
2670         rbd_assert(img_request != NULL);
2671         rbd_dev = img_request->rbd_dev;
2672         rbd_assert(rbd_dev->parent != NULL);
2673
2674         /*
2675          * Determine the byte range covered by the object in the
2676          * child image to which the original request was to be sent.
2677          */
2678         img_offset = obj_request->img_offset - obj_request->offset;
2679         length = (u64)1 << rbd_dev->header.obj_order;
2680
2681         /*
2682          * There is no defined parent data beyond the parent
2683          * overlap, so limit what we read at that boundary if
2684          * necessary.
2685          */
2686         if (img_offset + length > rbd_dev->parent_overlap) {
2687                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2688                 length = rbd_dev->parent_overlap - img_offset;
2689         }
2690
2691         /*
2692          * Allocate a page array big enough to receive the data read
2693          * from the parent.
2694          */
2695         page_count = (u32)calc_pages_for(0, length);
2696         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2697         if (IS_ERR(pages)) {
2698                 result = PTR_ERR(pages);
2699                 pages = NULL;
2700                 goto out_err;
2701         }
2702
2703         result = -ENOMEM;
2704         parent_request = rbd_parent_request_create(obj_request,
2705                                                 img_offset, length);
2706         if (!parent_request)
2707                 goto out_err;
2708
2709         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2710         if (result)
2711                 goto out_err;
2712         parent_request->copyup_pages = pages;
2713         parent_request->copyup_page_count = page_count;
2714
2715         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2716         result = rbd_img_request_submit(parent_request);
2717         if (!result)
2718                 return 0;
2719
2720         parent_request->copyup_pages = NULL;
2721         parent_request->copyup_page_count = 0;
2722         parent_request->obj_request = NULL;
2723         rbd_obj_request_put(obj_request);
2724 out_err:
2725         if (pages)
2726                 ceph_release_page_vector(pages, page_count);
2727         if (parent_request)
2728                 rbd_img_request_put(parent_request);
2729         obj_request->result = result;
2730         obj_request->xferred = 0;
2731         obj_request_done_set(obj_request);
2732
2733         return result;
2734 }
2735
2736 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2737 {
2738         struct rbd_obj_request *orig_request;
2739         struct rbd_device *rbd_dev;
2740         int result;
2741
2742         rbd_assert(!obj_request_img_data_test(obj_request));
2743
2744         /*
2745          * All we need from the object request is the original
2746          * request and the result of the STAT op.  Grab those, then
2747          * we're done with the request.
2748          */
2749         orig_request = obj_request->obj_request;
2750         obj_request->obj_request = NULL;
2751         rbd_obj_request_put(orig_request);
2752         rbd_assert(orig_request);
2753         rbd_assert(orig_request->img_request);
2754
2755         result = obj_request->result;
2756         obj_request->result = 0;
2757
2758         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2759                 obj_request, orig_request, result,
2760                 obj_request->xferred, obj_request->length);
2761         rbd_obj_request_put(obj_request);
2762
2763         /*
2764          * If the overlap has become 0 (most likely because the
2765          * image has been flattened) we need to free the pages
2766          * and re-submit the original write request.
2767          */
2768         rbd_dev = orig_request->img_request->rbd_dev;
2769         if (!rbd_dev->parent_overlap) {
2770                 struct ceph_osd_client *osdc;
2771
2772                 osdc = &rbd_dev->rbd_client->client->osdc;
2773                 result = rbd_obj_request_submit(osdc, orig_request);
2774                 if (!result)
2775                         return;
2776         }
2777
2778         /*
2779          * Our only purpose here is to determine whether the object
2780          * exists, and we don't want to treat the non-existence as
2781          * an error.  If something else comes back, transfer the
2782          * error to the original request and complete it now.
2783          */
2784         if (!result) {
2785                 obj_request_existence_set(orig_request, true);
2786         } else if (result == -ENOENT) {
2787                 obj_request_existence_set(orig_request, false);
2788         } else if (result) {
2789                 orig_request->result = result;
2790                 goto out;
2791         }
2792
2793         /*
2794          * Resubmit the original request now that we have recorded
2795          * whether the target object exists.
2796          */
2797         orig_request->result = rbd_img_obj_request_submit(orig_request);
2798 out:
2799         if (orig_request->result)
2800                 rbd_obj_request_complete(orig_request);
2801 }
2802
2803 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2804 {
2805         struct rbd_obj_request *stat_request;
2806         struct rbd_device *rbd_dev;
2807         struct ceph_osd_client *osdc;
2808         struct page **pages = NULL;
2809         u32 page_count;
2810         size_t size;
2811         int ret;
2812
2813         /*
2814          * The response data for a STAT call consists of:
2815          *     le64 length;
2816          *     struct {
2817          *         le32 tv_sec;
2818          *         le32 tv_nsec;
2819          *     } mtime;
2820          */
2821         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822         page_count = (u32)calc_pages_for(0, size);
2823         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2824         if (IS_ERR(pages))
2825                 return PTR_ERR(pages);
2826
2827         ret = -ENOMEM;
2828         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2829                                                         OBJ_REQUEST_PAGES);
2830         if (!stat_request)
2831                 goto out;
2832
2833         rbd_obj_request_get(obj_request);
2834         stat_request->obj_request = obj_request;
2835         stat_request->pages = pages;
2836         stat_request->page_count = page_count;
2837
2838         rbd_assert(obj_request->img_request);
2839         rbd_dev = obj_request->img_request->rbd_dev;
2840         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2841                                                    stat_request);
2842         if (!stat_request->osd_req)
2843                 goto out;
2844         stat_request->callback = rbd_img_obj_exists_callback;
2845
2846         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2847         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2848                                         false, false);
2849         rbd_osd_req_format_read(stat_request);
2850
2851         osdc = &rbd_dev->rbd_client->client->osdc;
2852         ret = rbd_obj_request_submit(osdc, stat_request);
2853 out:
2854         if (ret)
2855                 rbd_obj_request_put(obj_request);
2856
2857         return ret;
2858 }
2859
2860 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2861 {
2862         struct rbd_img_request *img_request;
2863         struct rbd_device *rbd_dev;
2864
2865         rbd_assert(obj_request_img_data_test(obj_request));
2866
2867         img_request = obj_request->img_request;
2868         rbd_assert(img_request);
2869         rbd_dev = img_request->rbd_dev;
2870
2871         /* Reads */
2872         if (!img_request_write_test(img_request) &&
2873             !img_request_discard_test(img_request))
2874                 return true;
2875
2876         /* Non-layered writes */
2877         if (!img_request_layered_test(img_request))
2878                 return true;
2879
2880         /*
2881          * Layered writes outside of the parent overlap range don't
2882          * share any data with the parent.
2883          */
2884         if (!obj_request_overlaps_parent(obj_request))
2885                 return true;
2886
2887         /*
2888          * Entire-object layered writes - we will overwrite whatever
2889          * parent data there is anyway.
2890          */
2891         if (!obj_request->offset &&
2892             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2893                 return true;
2894
2895         /*
2896          * If the object is known to already exist, its parent data has
2897          * already been copied.
2898          */
2899         if (obj_request_known_test(obj_request) &&
2900             obj_request_exists_test(obj_request))
2901                 return true;
2902
2903         return false;
2904 }
2905
2906 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2907 {
2908         if (img_obj_request_simple(obj_request)) {
2909                 struct rbd_device *rbd_dev;
2910                 struct ceph_osd_client *osdc;
2911
2912                 rbd_dev = obj_request->img_request->rbd_dev;
2913                 osdc = &rbd_dev->rbd_client->client->osdc;
2914
2915                 return rbd_obj_request_submit(osdc, obj_request);
2916         }
2917
2918         /*
2919          * It's a layered write.  The target object might exist but
2920          * we may not know that yet.  If we know it doesn't exist,
2921          * start by reading the data for the full target object from
2922          * the parent so we can use it for a copyup to the target.
2923          */
2924         if (obj_request_known_test(obj_request))
2925                 return rbd_img_obj_parent_read_full(obj_request);
2926
2927         /* We don't know whether the target exists.  Go find out. */
2928
2929         return rbd_img_obj_exists_submit(obj_request);
2930 }
2931
2932 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2933 {
2934         struct rbd_obj_request *obj_request;
2935         struct rbd_obj_request *next_obj_request;
2936
2937         dout("%s: img %p\n", __func__, img_request);
2938         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2939                 int ret;
2940
2941                 ret = rbd_img_obj_request_submit(obj_request);
2942                 if (ret)
2943                         return ret;
2944         }
2945
2946         return 0;
2947 }
2948
2949 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2950 {
2951         struct rbd_obj_request *obj_request;
2952         struct rbd_device *rbd_dev;
2953         u64 obj_end;
2954         u64 img_xferred;
2955         int img_result;
2956
2957         rbd_assert(img_request_child_test(img_request));
2958
2959         /* First get what we need from the image request and release it */
2960
2961         obj_request = img_request->obj_request;
2962         img_xferred = img_request->xferred;
2963         img_result = img_request->result;
2964         rbd_img_request_put(img_request);
2965
2966         /*
2967          * If the overlap has become 0 (most likely because the
2968          * image has been flattened) we need to re-submit the
2969          * original request.
2970          */
2971         rbd_assert(obj_request);
2972         rbd_assert(obj_request->img_request);
2973         rbd_dev = obj_request->img_request->rbd_dev;
2974         if (!rbd_dev->parent_overlap) {
2975                 struct ceph_osd_client *osdc;
2976
2977                 osdc = &rbd_dev->rbd_client->client->osdc;
2978                 img_result = rbd_obj_request_submit(osdc, obj_request);
2979                 if (!img_result)
2980                         return;
2981         }
2982
2983         obj_request->result = img_result;
2984         if (obj_request->result)
2985                 goto out;
2986
2987         /*
2988          * We need to zero anything beyond the parent overlap
2989          * boundary.  Since rbd_img_obj_request_read_callback()
2990          * will zero anything beyond the end of a short read, an
2991          * easy way to do this is to pretend the data from the
2992          * parent came up short--ending at the overlap boundary.
2993          */
2994         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2995         obj_end = obj_request->img_offset + obj_request->length;
2996         if (obj_end > rbd_dev->parent_overlap) {
2997                 u64 xferred = 0;
2998
2999                 if (obj_request->img_offset < rbd_dev->parent_overlap)
3000                         xferred = rbd_dev->parent_overlap -
3001                                         obj_request->img_offset;
3002
3003                 obj_request->xferred = min(img_xferred, xferred);
3004         } else {
3005                 obj_request->xferred = img_xferred;
3006         }
3007 out:
3008         rbd_img_obj_request_read_callback(obj_request);
3009         rbd_obj_request_complete(obj_request);
3010 }
3011
3012 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3013 {
3014         struct rbd_img_request *img_request;
3015         int result;
3016
3017         rbd_assert(obj_request_img_data_test(obj_request));
3018         rbd_assert(obj_request->img_request != NULL);
3019         rbd_assert(obj_request->result == (s32) -ENOENT);
3020         rbd_assert(obj_request_type_valid(obj_request->type));
3021
3022         /* rbd_read_finish(obj_request, obj_request->length); */
3023         img_request = rbd_parent_request_create(obj_request,
3024                                                 obj_request->img_offset,
3025                                                 obj_request->length);
3026         result = -ENOMEM;
3027         if (!img_request)
3028                 goto out_err;
3029
3030         if (obj_request->type == OBJ_REQUEST_BIO)
3031                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3032                                                 obj_request->bio_list);
3033         else
3034                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3035                                                 obj_request->pages);
3036         if (result)
3037                 goto out_err;
3038
3039         img_request->callback = rbd_img_parent_read_callback;
3040         result = rbd_img_request_submit(img_request);
3041         if (result)
3042                 goto out_err;
3043
3044         return;
3045 out_err:
3046         if (img_request)
3047                 rbd_img_request_put(img_request);
3048         obj_request->result = result;
3049         obj_request->xferred = 0;
3050         obj_request_done_set(obj_request);
3051 }
3052
3053 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
3054 {
3055         struct rbd_obj_request *obj_request;
3056         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3057         int ret;
3058
3059         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3060                                                         OBJ_REQUEST_NODATA);
3061         if (!obj_request)
3062                 return -ENOMEM;
3063
3064         ret = -ENOMEM;
3065         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3066                                                   obj_request);
3067         if (!obj_request->osd_req)
3068                 goto out;
3069
3070         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
3071                                         notify_id, 0, 0);
3072         rbd_osd_req_format_read(obj_request);
3073
3074         ret = rbd_obj_request_submit(osdc, obj_request);
3075         if (ret)
3076                 goto out;
3077         ret = rbd_obj_request_wait(obj_request);
3078 out:
3079         rbd_obj_request_put(obj_request);
3080
3081         return ret;
3082 }
3083
3084 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
3085 {
3086         struct rbd_device *rbd_dev = (struct rbd_device *)data;
3087         int ret;
3088
3089         if (!rbd_dev)
3090                 return;
3091
3092         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3093                 rbd_dev->header_name, (unsigned long long)notify_id,
3094                 (unsigned int)opcode);
3095
3096         /*
3097          * Until adequate refresh error handling is in place, there is
3098          * not much we can do here, except warn.
3099          *
3100          * See http://tracker.ceph.com/issues/5040
3101          */
3102         ret = rbd_dev_refresh(rbd_dev);
3103         if (ret)
3104                 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3105
3106         ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3107         if (ret)
3108                 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3109 }
3110
3111 /*
3112  * Send a (un)watch request and wait for the ack.  Return a request
3113  * with a ref held on success or error.
3114  */
3115 static struct rbd_obj_request *rbd_obj_watch_request_helper(
3116                                                 struct rbd_device *rbd_dev,
3117                                                 bool watch)
3118 {
3119         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3120         struct rbd_obj_request *obj_request;
3121         int ret;
3122
3123         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3124                                              OBJ_REQUEST_NODATA);
3125         if (!obj_request)
3126                 return ERR_PTR(-ENOMEM);
3127
3128         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3129                                                   obj_request);
3130         if (!obj_request->osd_req) {
3131                 ret = -ENOMEM;
3132                 goto out;
3133         }
3134
3135         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3136                               rbd_dev->watch_event->cookie, 0, watch);
3137         rbd_osd_req_format_write(obj_request);
3138
3139         if (watch)
3140                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3141
3142         ret = rbd_obj_request_submit(osdc, obj_request);
3143         if (ret)
3144                 goto out;
3145
3146         ret = rbd_obj_request_wait(obj_request);
3147         if (ret)
3148                 goto out;
3149
3150         ret = obj_request->result;
3151         if (ret) {
3152                 if (watch)
3153                         rbd_obj_request_end(obj_request);
3154                 goto out;
3155         }
3156
3157         return obj_request;
3158
3159 out:
3160         rbd_obj_request_put(obj_request);
3161         return ERR_PTR(ret);
3162 }
3163
3164 /*
3165  * Initiate a watch request, synchronously.
3166  */
3167 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3168 {
3169         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3170         struct rbd_obj_request *obj_request;
3171         int ret;
3172
3173         rbd_assert(!rbd_dev->watch_event);
3174         rbd_assert(!rbd_dev->watch_request);
3175
3176         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3177                                      &rbd_dev->watch_event);
3178         if (ret < 0)
3179                 return ret;
3180
3181         obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3182         if (IS_ERR(obj_request)) {
3183                 ceph_osdc_cancel_event(rbd_dev->watch_event);
3184                 rbd_dev->watch_event = NULL;
3185                 return PTR_ERR(obj_request);
3186         }
3187
3188         /*
3189          * A watch request is set to linger, so the underlying osd
3190          * request won't go away until we unregister it.  We retain
3191          * a pointer to the object request during that time (in
3192          * rbd_dev->watch_request), so we'll keep a reference to it.
3193          * We'll drop that reference after we've unregistered it in
3194          * rbd_dev_header_unwatch_sync().
3195          */
3196         rbd_dev->watch_request = obj_request;
3197
3198         return 0;
3199 }
3200
3201 /*
3202  * Tear down a watch request, synchronously.
3203  */
3204 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3205 {
3206         struct rbd_obj_request *obj_request;
3207
3208         rbd_assert(rbd_dev->watch_event);
3209         rbd_assert(rbd_dev->watch_request);
3210
3211         rbd_obj_request_end(rbd_dev->watch_request);
3212         rbd_obj_request_put(rbd_dev->watch_request);
3213         rbd_dev->watch_request = NULL;
3214
3215         obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3216         if (!IS_ERR(obj_request))
3217                 rbd_obj_request_put(obj_request);
3218         else
3219                 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3220                          PTR_ERR(obj_request));
3221
3222         ceph_osdc_cancel_event(rbd_dev->watch_event);
3223         rbd_dev->watch_event = NULL;
3224 }
3225
3226 /*
3227  * Synchronous osd object method call.  Returns the number of bytes
3228  * returned in the outbound buffer, or a negative error code.
3229  */
3230 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3231                              const char *object_name,
3232                              const char *class_name,
3233                              const char *method_name,
3234                              const void *outbound,
3235                              size_t outbound_size,
3236                              void *inbound,
3237                              size_t inbound_size)
3238 {
3239         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3240         struct rbd_obj_request *obj_request;
3241         struct page **pages;
3242         u32 page_count;
3243         int ret;
3244
3245         /*
3246          * Method calls are ultimately read operations.  The result
3247          * should placed into the inbound buffer provided.  They
3248          * also supply outbound data--parameters for the object
3249          * method.  Currently if this is present it will be a
3250          * snapshot id.
3251          */
3252         page_count = (u32)calc_pages_for(0, inbound_size);
3253         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3254         if (IS_ERR(pages))
3255                 return PTR_ERR(pages);
3256
3257         ret = -ENOMEM;
3258         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3259                                                         OBJ_REQUEST_PAGES);
3260         if (!obj_request)
3261                 goto out;
3262
3263         obj_request->pages = pages;
3264         obj_request->page_count = page_count;
3265
3266         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3267                                                   obj_request);
3268         if (!obj_request->osd_req)
3269                 goto out;
3270
3271         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3272                                         class_name, method_name);
3273         if (outbound_size) {
3274                 struct ceph_pagelist *pagelist;
3275
3276                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3277                 if (!pagelist)
3278                         goto out;
3279
3280                 ceph_pagelist_init(pagelist);
3281                 ceph_pagelist_append(pagelist, outbound, outbound_size);
3282                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3283                                                 pagelist);
3284         }
3285         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3286                                         obj_request->pages, inbound_size,
3287                                         0, false, false);
3288         rbd_osd_req_format_read(obj_request);
3289
3290         ret = rbd_obj_request_submit(osdc, obj_request);
3291         if (ret)
3292                 goto out;
3293         ret = rbd_obj_request_wait(obj_request);
3294         if (ret)
3295                 goto out;
3296
3297         ret = obj_request->result;
3298         if (ret < 0)
3299                 goto out;
3300
3301         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3302         ret = (int)obj_request->xferred;
3303         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3304 out:
3305         if (obj_request)
3306                 rbd_obj_request_put(obj_request);
3307         else
3308                 ceph_release_page_vector(pages, page_count);
3309
3310         return ret;
3311 }
3312
3313 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3314 {
3315         struct rbd_img_request *img_request;
3316         struct ceph_snap_context *snapc = NULL;
3317         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3318         u64 length = blk_rq_bytes(rq);
3319         enum obj_operation_type op_type;
3320         u64 mapping_size;
3321         int result;
3322
3323         if (rq->cmd_flags & REQ_DISCARD)
3324                 op_type = OBJ_OP_DISCARD;
3325         else if (rq->cmd_flags & REQ_WRITE)
3326                 op_type = OBJ_OP_WRITE;
3327         else
3328                 op_type = OBJ_OP_READ;
3329
3330         /* Ignore/skip any zero-length requests */
3331
3332         if (!length) {
3333                 dout("%s: zero-length request\n", __func__);
3334                 result = 0;
3335                 goto err_rq;
3336         }
3337
3338         /* Only reads are allowed to a read-only device */
3339
3340         if (op_type != OBJ_OP_READ) {
3341                 if (rbd_dev->mapping.read_only) {
3342                         result = -EROFS;
3343                         goto err_rq;
3344                 }
3345                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3346         }
3347
3348         /*
3349          * Quit early if the mapped snapshot no longer exists.  It's
3350          * still possible the snapshot will have disappeared by the
3351          * time our request arrives at the osd, but there's no sense in
3352          * sending it if we already know.
3353          */
3354         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3355                 dout("request for non-existent snapshot");
3356                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3357                 result = -ENXIO;
3358                 goto err_rq;
3359         }
3360
3361         if (offset && length > U64_MAX - offset + 1) {
3362                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3363                          length);
3364                 result = -EINVAL;
3365                 goto err_rq;    /* Shouldn't happen */
3366         }
3367
3368         down_read(&rbd_dev->header_rwsem);
3369         mapping_size = rbd_dev->mapping.size;
3370         if (op_type != OBJ_OP_READ) {
3371                 snapc = rbd_dev->header.snapc;
3372                 ceph_get_snap_context(snapc);
3373         }
3374         up_read(&rbd_dev->header_rwsem);
3375
3376         if (offset + length > mapping_size) {
3377                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3378                          length, mapping_size);
3379                 result = -EIO;
3380                 goto err_rq;
3381         }
3382
3383         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3384                                              snapc);
3385         if (!img_request) {
3386                 result = -ENOMEM;
3387                 goto err_rq;
3388         }
3389         img_request->rq = rq;
3390
3391         if (op_type == OBJ_OP_DISCARD)
3392                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3393                                               NULL);
3394         else
3395                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3396                                               rq->bio);
3397         if (result)
3398                 goto err_img_request;
3399
3400         result = rbd_img_request_submit(img_request);
3401         if (result)
3402                 goto err_img_request;
3403
3404         return;
3405
3406 err_img_request:
3407         rbd_img_request_put(img_request);
3408 err_rq:
3409         if (result)
3410                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3411                          obj_op_name(op_type), length, offset, result);
3412         ceph_put_snap_context(snapc);
3413         blk_end_request_all(rq, result);
3414 }
3415
3416 static void rbd_request_workfn(struct work_struct *work)
3417 {
3418         struct rbd_device *rbd_dev =
3419             container_of(work, struct rbd_device, rq_work);
3420         struct request *rq, *next;
3421         LIST_HEAD(requests);
3422
3423         spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3424         list_splice_init(&rbd_dev->rq_queue, &requests);
3425         spin_unlock_irq(&rbd_dev->lock);
3426
3427         list_for_each_entry_safe(rq, next, &requests, queuelist) {
3428                 list_del_init(&rq->queuelist);
3429                 rbd_handle_request(rbd_dev, rq);
3430         }
3431 }
3432
3433 /*
3434  * Called with q->queue_lock held and interrupts disabled, possibly on
3435  * the way to schedule().  Do not sleep here!
3436  */
3437 static void rbd_request_fn(struct request_queue *q)
3438 {
3439         struct rbd_device *rbd_dev = q->queuedata;
3440         struct request *rq;
3441         int queued = 0;
3442
3443         rbd_assert(rbd_dev);
3444
3445         while ((rq = blk_fetch_request(q))) {
3446                 /* Ignore any non-FS requests that filter through. */
3447                 if (rq->cmd_type != REQ_TYPE_FS) {
3448                         dout("%s: non-fs request type %d\n", __func__,
3449                                 (int) rq->cmd_type);
3450                         __blk_end_request_all(rq, 0);
3451                         continue;
3452                 }
3453
3454                 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3455                 queued++;
3456         }
3457
3458         if (queued)
3459                 queue_work(rbd_wq, &rbd_dev->rq_work);
3460 }
3461
3462 /*
3463  * a queue callback. Makes sure that we don't create a bio that spans across
3464  * multiple osd objects. One exception would be with a single page bios,
3465  * which we handle later at bio_chain_clone_range()
3466  */
3467 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3468                           struct bio_vec *bvec)
3469 {
3470         struct rbd_device *rbd_dev = q->queuedata;
3471         sector_t sector_offset;
3472         sector_t sectors_per_obj;
3473         sector_t obj_sector_offset;
3474         int ret;
3475
3476         /*
3477          * Find how far into its rbd object the partition-relative
3478          * bio start sector is to offset relative to the enclosing
3479          * device.
3480          */
3481         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3482         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3483         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3484
3485         /*
3486          * Compute the number of bytes from that offset to the end
3487          * of the object.  Account for what's already used by the bio.
3488          */
3489         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3490         if (ret > bmd->bi_size)
3491                 ret -= bmd->bi_size;
3492         else
3493                 ret = 0;
3494
3495         /*
3496          * Don't send back more than was asked for.  And if the bio
3497          * was empty, let the whole thing through because:  "Note
3498          * that a block device *must* allow a single page to be
3499          * added to an empty bio."
3500          */
3501         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3502         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3503                 ret = (int) bvec->bv_len;
3504
3505         return ret;
3506 }
3507
3508 static void rbd_free_disk(struct rbd_device *rbd_dev)
3509 {
3510         struct gendisk *disk = rbd_dev->disk;
3511
3512         if (!disk)
3513                 return;
3514
3515         rbd_dev->disk = NULL;
3516         if (disk->flags & GENHD_FL_UP) {
3517                 del_gendisk(disk);
3518                 if (disk->queue)
3519                         blk_cleanup_queue(disk->queue);
3520         }
3521         put_disk(disk);
3522 }
3523
3524 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3525                                 const char *object_name,
3526                                 u64 offset, u64 length, void *buf)
3527
3528 {
3529         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3530         struct rbd_obj_request *obj_request;
3531         struct page **pages = NULL;
3532         u32 page_count;
3533         size_t size;
3534         int ret;
3535
3536         page_count = (u32) calc_pages_for(offset, length);
3537         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3538         if (IS_ERR(pages))
3539                 return PTR_ERR(pages);
3540
3541         ret = -ENOMEM;
3542         obj_request = rbd_obj_request_create(object_name, offset, length,
3543                                                         OBJ_REQUEST_PAGES);
3544         if (!obj_request)
3545                 goto out;
3546
3547         obj_request->pages = pages;
3548         obj_request->page_count = page_count;
3549
3550         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3551                                                   obj_request);
3552         if (!obj_request->osd_req)
3553                 goto out;
3554
3555         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3556                                         offset, length, 0, 0);
3557         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3558                                         obj_request->pages,
3559                                         obj_request->length,
3560                                         obj_request->offset & ~PAGE_MASK,
3561                                         false, false);
3562         rbd_osd_req_format_read(obj_request);
3563
3564         ret = rbd_obj_request_submit(osdc, obj_request);
3565         if (ret)
3566                 goto out;
3567         ret = rbd_obj_request_wait(obj_request);
3568         if (ret)
3569                 goto out;
3570
3571         ret = obj_request->result;
3572         if (ret < 0)
3573                 goto out;
3574
3575         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3576         size = (size_t) obj_request->xferred;
3577         ceph_copy_from_page_vector(pages, buf, 0, size);
3578         rbd_assert(size <= (size_t)INT_MAX);
3579         ret = (int)size;
3580 out:
3581         if (obj_request)
3582                 rbd_obj_request_put(obj_request);
3583         else
3584                 ceph_release_page_vector(pages, page_count);
3585
3586         return ret;
3587 }
3588
3589 /*
3590  * Read the complete header for the given rbd device.  On successful
3591  * return, the rbd_dev->header field will contain up-to-date
3592  * information about the image.
3593  */
3594 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3595 {
3596         struct rbd_image_header_ondisk *ondisk = NULL;
3597         u32 snap_count = 0;
3598         u64 names_size = 0;
3599         u32 want_count;
3600         int ret;
3601
3602         /*
3603          * The complete header will include an array of its 64-bit
3604          * snapshot ids, followed by the names of those snapshots as
3605          * a contiguous block of NUL-terminated strings.  Note that
3606          * the number of snapshots could change by the time we read
3607          * it in, in which case we re-read it.
3608          */
3609         do {
3610                 size_t size;
3611
3612                 kfree(ondisk);
3613
3614                 size = sizeof (*ondisk);
3615                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3616                 size += names_size;
3617                 ondisk = kmalloc(size, GFP_KERNEL);
3618                 if (!ondisk)
3619                         return -ENOMEM;
3620
3621                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3622                                        0, size, ondisk);
3623                 if (ret < 0)
3624                         goto out;
3625                 if ((size_t)ret < size) {
3626                         ret = -ENXIO;
3627                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3628                                 size, ret);
3629                         goto out;
3630                 }
3631                 if (!rbd_dev_ondisk_valid(ondisk)) {
3632                         ret = -ENXIO;
3633                         rbd_warn(rbd_dev, "invalid header");
3634                         goto out;
3635                 }
3636
3637                 names_size = le64_to_cpu(ondisk->snap_names_len);
3638                 want_count = snap_count;
3639                 snap_count = le32_to_cpu(ondisk->snap_count);
3640         } while (snap_count != want_count);
3641
3642         ret = rbd_header_from_disk(rbd_dev, ondisk);
3643 out:
3644         kfree(ondisk);
3645
3646         return ret;
3647 }
3648
3649 /*
3650  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3651  * has disappeared from the (just updated) snapshot context.
3652  */
3653 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3654 {
3655         u64 snap_id;
3656
3657         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3658                 return;
3659
3660         snap_id = rbd_dev->spec->snap_id;
3661         if (snap_id == CEPH_NOSNAP)
3662                 return;
3663
3664         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3665                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3666 }
3667
3668 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3669 {
3670         sector_t size;
3671         bool removing;
3672
3673         /*
3674          * Don't hold the lock while doing disk operations,
3675          * or lock ordering will conflict with the bdev mutex via:
3676          * rbd_add() -> blkdev_get() -> rbd_open()
3677          */
3678         spin_lock_irq(&rbd_dev->lock);
3679         removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3680         spin_unlock_irq(&rbd_dev->lock);
3681         /*
3682          * If the device is being removed, rbd_dev->disk has
3683          * been destroyed, so don't try to update its size
3684          */
3685         if (!removing) {
3686                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3687                 dout("setting size to %llu sectors", (unsigned long long)size);
3688                 set_capacity(rbd_dev->disk, size);
3689                 revalidate_disk(rbd_dev->disk);
3690         }
3691 }
3692
3693 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3694 {
3695         u64 mapping_size;
3696         int ret;
3697
3698         down_write(&rbd_dev->header_rwsem);
3699         mapping_size = rbd_dev->mapping.size;
3700
3701         ret = rbd_dev_header_info(rbd_dev);
3702         if (ret)
3703                 return ret;
3704
3705         /*
3706          * If there is a parent, see if it has disappeared due to the
3707          * mapped image getting flattened.
3708          */
3709         if (rbd_dev->parent) {
3710                 ret = rbd_dev_v2_parent_info(rbd_dev);
3711                 if (ret)
3712                         return ret;
3713         }
3714
3715         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3716                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3717                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3718         } else {
3719                 /* validate mapped snapshot's EXISTS flag */
3720                 rbd_exists_validate(rbd_dev);
3721         }
3722
3723         up_write(&rbd_dev->header_rwsem);
3724
3725         if (mapping_size != rbd_dev->mapping.size)
3726                 rbd_dev_update_size(rbd_dev);
3727
3728         return 0;
3729 }
3730
3731 static int rbd_init_disk(struct rbd_device *rbd_dev)
3732 {
3733         struct gendisk *disk;
3734         struct request_queue *q;
3735         u64 segment_size;
3736
3737         /* create gendisk info */
3738         disk = alloc_disk(single_major ?
3739                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3740                           RBD_MINORS_PER_MAJOR);
3741         if (!disk)
3742                 return -ENOMEM;
3743
3744         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3745                  rbd_dev->dev_id);
3746         disk->major = rbd_dev->major;
3747         disk->first_minor = rbd_dev->minor;
3748         if (single_major)
3749                 disk->flags |= GENHD_FL_EXT_DEVT;
3750         disk->fops = &rbd_bd_ops;
3751         disk->private_data = rbd_dev;
3752
3753         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3754         if (!q)
3755                 goto out_disk;
3756
3757         /* We use the default size, but let's be explicit about it. */
3758         blk_queue_physical_block_size(q, SECTOR_SIZE);
3759
3760         /* set io sizes to object size */
3761         segment_size = rbd_obj_bytes(&rbd_dev->header);
3762         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3763         blk_queue_max_segment_size(q, segment_size);
3764         blk_queue_io_min(q, segment_size);
3765         blk_queue_io_opt(q, segment_size);
3766
3767         /* enable the discard support */
3768         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3769         q->limits.discard_granularity = segment_size;
3770         q->limits.discard_alignment = segment_size;
3771         q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
3772         q->limits.discard_zeroes_data = 1;
3773
3774         blk_queue_merge_bvec(q, rbd_merge_bvec);
3775         disk->queue = q;
3776
3777         q->queuedata = rbd_dev;
3778
3779         rbd_dev->disk = disk;
3780
3781         return 0;
3782 out_disk:
3783         put_disk(disk);
3784
3785         return -ENOMEM;
3786 }
3787
3788 /*
3789   sysfs
3790 */
3791
3792 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3793 {
3794         return container_of(dev, struct rbd_device, dev);
3795 }
3796
3797 static ssize_t rbd_size_show(struct device *dev,
3798                              struct device_attribute *attr, char *buf)
3799 {
3800         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3801
3802         return sprintf(buf, "%llu\n",
3803                 (unsigned long long)rbd_dev->mapping.size);
3804 }
3805
3806 /*
3807  * Note this shows the features for whatever's mapped, which is not
3808  * necessarily the base image.
3809  */
3810 static ssize_t rbd_features_show(struct device *dev,
3811                              struct device_attribute *attr, char *buf)
3812 {
3813         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3814
3815         return sprintf(buf, "0x%016llx\n",
3816                         (unsigned long long)rbd_dev->mapping.features);
3817 }
3818
3819 static ssize_t rbd_major_show(struct device *dev,
3820                               struct device_attribute *attr, char *buf)
3821 {
3822         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3823
3824         if (rbd_dev->major)
3825                 return sprintf(buf, "%d\n", rbd_dev->major);
3826
3827         return sprintf(buf, "(none)\n");
3828 }
3829
3830 static ssize_t rbd_minor_show(struct device *dev,
3831                               struct device_attribute *attr, char *buf)
3832 {
3833         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3834
3835         return sprintf(buf, "%d\n", rbd_dev->minor);
3836 }
3837
3838 static ssize_t rbd_client_id_show(struct device *dev,
3839                                   struct device_attribute *attr, char *buf)
3840 {
3841         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3842
3843         return sprintf(buf, "client%lld\n",
3844                         ceph_client_id(rbd_dev->rbd_client->client));
3845 }
3846
3847 static ssize_t rbd_pool_show(struct device *dev,
3848                              struct device_attribute *attr, char *buf)
3849 {
3850         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3851
3852         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3853 }
3854
3855 static ssize_t rbd_pool_id_show(struct device *dev,
3856                              struct device_attribute *attr, char *buf)
3857 {
3858         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3859
3860         return sprintf(buf, "%llu\n",
3861                         (unsigned long long) rbd_dev->spec->pool_id);
3862 }
3863
3864 static ssize_t rbd_name_show(struct device *dev,
3865                              struct device_attribute *attr, char *buf)
3866 {
3867         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3868
3869         if (rbd_dev->spec->image_name)
3870                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3871
3872         return sprintf(buf, "(unknown)\n");
3873 }
3874
3875 static ssize_t rbd_image_id_show(struct device *dev,
3876                              struct device_attribute *attr, char *buf)
3877 {
3878         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3879
3880         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3881 }
3882
3883 /*
3884  * Shows the name of the currently-mapped snapshot (or
3885  * RBD_SNAP_HEAD_NAME for the base image).
3886  */
3887 static ssize_t rbd_snap_show(struct device *dev,
3888                              struct device_attribute *attr,
3889                              char *buf)
3890 {
3891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3892
3893         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3894 }
3895
3896 /*
3897  * For a v2 image, shows the chain of parent images, separated by empty
3898  * lines.  For v1 images or if there is no parent, shows "(no parent
3899  * image)".
3900  */
3901 static ssize_t rbd_parent_show(struct device *dev,
3902                                struct device_attribute *attr,
3903                                char *buf)
3904 {
3905         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3906         ssize_t count = 0;
3907
3908         if (!rbd_dev->parent)
3909                 return sprintf(buf, "(no parent image)\n");
3910
3911         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3912                 struct rbd_spec *spec = rbd_dev->parent_spec;
3913
3914                 count += sprintf(&buf[count], "%s"
3915                             "pool_id %llu\npool_name %s\n"
3916                             "image_id %s\nimage_name %s\n"
3917                             "snap_id %llu\nsnap_name %s\n"
3918                             "overlap %llu\n",
3919                             !count ? "" : "\n", /* first? */
3920                             spec->pool_id, spec->pool_name,
3921                             spec->image_id, spec->image_name ?: "(unknown)",
3922                             spec->snap_id, spec->snap_name,
3923                             rbd_dev->parent_overlap);
3924         }
3925
3926         return count;
3927 }
3928
3929 static ssize_t rbd_image_refresh(struct device *dev,
3930                                  struct device_attribute *attr,
3931                                  const char *buf,
3932                                  size_t size)
3933 {
3934         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3935         int ret;
3936
3937         ret = rbd_dev_refresh(rbd_dev);
3938         if (ret)
3939                 return ret;
3940
3941         return size;
3942 }
3943
3944 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3945 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3946 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3947 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3948 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3949 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3950 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3951 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3952 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3953 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3954 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3955 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3956
3957 static struct attribute *rbd_attrs[] = {
3958         &dev_attr_size.attr,
3959         &dev_attr_features.attr,
3960         &dev_attr_major.attr,
3961         &dev_attr_minor.attr,
3962         &dev_attr_client_id.attr,
3963         &dev_attr_pool.attr,
3964         &dev_attr_pool_id.attr,
3965         &dev_attr_name.attr,
3966         &dev_attr_image_id.attr,
3967         &dev_attr_current_snap.attr,
3968         &dev_attr_parent.attr,
3969         &dev_attr_refresh.attr,
3970         NULL
3971 };
3972
3973 static struct attribute_group rbd_attr_group = {
3974         .attrs = rbd_attrs,
3975 };
3976
3977 static const struct attribute_group *rbd_attr_groups[] = {
3978         &rbd_attr_group,
3979         NULL
3980 };
3981
3982 static void rbd_sysfs_dev_release(struct device *dev)
3983 {
3984 }
3985
3986 static struct device_type rbd_device_type = {
3987         .name           = "rbd",
3988         .groups         = rbd_attr_groups,
3989         .release        = rbd_sysfs_dev_release,
3990 };
3991
3992 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3993 {
3994         kref_get(&spec->kref);
3995
3996         return spec;
3997 }
3998
3999 static void rbd_spec_free(struct kref *kref);
4000 static void rbd_spec_put(struct rbd_spec *spec)
4001 {
4002         if (spec)
4003                 kref_put(&spec->kref, rbd_spec_free);
4004 }
4005
4006 static struct rbd_spec *rbd_spec_alloc(void)
4007 {
4008         struct rbd_spec *spec;
4009
4010         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4011         if (!spec)
4012                 return NULL;
4013
4014         spec->pool_id = CEPH_NOPOOL;
4015         spec->snap_id = CEPH_NOSNAP;
4016         kref_init(&spec->kref);
4017
4018         return spec;
4019 }
4020
4021 static void rbd_spec_free(struct kref *kref)
4022 {
4023         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4024
4025         kfree(spec->pool_name);
4026         kfree(spec->image_id);
4027         kfree(spec->image_name);
4028         kfree(spec->snap_name);
4029         kfree(spec);
4030 }
4031
4032 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4033                                 struct rbd_spec *spec)
4034 {
4035         struct rbd_device *rbd_dev;
4036
4037         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
4038         if (!rbd_dev)
4039                 return NULL;
4040
4041         spin_lock_init(&rbd_dev->lock);
4042         INIT_LIST_HEAD(&rbd_dev->rq_queue);
4043         INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4044         rbd_dev->flags = 0;
4045         atomic_set(&rbd_dev->parent_ref, 0);
4046         INIT_LIST_HEAD(&rbd_dev->node);
4047         init_rwsem(&rbd_dev->header_rwsem);
4048
4049         rbd_dev->spec = spec;
4050         rbd_dev->rbd_client = rbdc;
4051
4052         /* Initialize the layout used for all rbd requests */
4053
4054         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4055         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
4056         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4057         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
4058
4059         return rbd_dev;
4060 }
4061
4062 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4063 {
4064         rbd_put_client(rbd_dev->rbd_client);
4065         rbd_spec_put(rbd_dev->spec);
4066         kfree(rbd_dev);
4067 }
4068
4069 /*
4070  * Get the size and object order for an image snapshot, or if
4071  * snap_id is CEPH_NOSNAP, gets this information for the base
4072  * image.
4073  */
4074 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4075                                 u8 *order, u64 *snap_size)
4076 {
4077         __le64 snapid = cpu_to_le64(snap_id);
4078         int ret;
4079         struct {
4080                 u8 order;
4081                 __le64 size;
4082         } __attribute__ ((packed)) size_buf = { 0 };
4083
4084         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4085                                 "rbd", "get_size",
4086                                 &snapid, sizeof (snapid),
4087                                 &size_buf, sizeof (size_buf));
4088         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4089         if (ret < 0)
4090                 return ret;
4091         if (ret < sizeof (size_buf))
4092                 return -ERANGE;
4093
4094         if (order) {
4095                 *order = size_buf.order;
4096                 dout("  order %u", (unsigned int)*order);
4097         }
4098         *snap_size = le64_to_cpu(size_buf.size);
4099
4100         dout("  snap_id 0x%016llx snap_size = %llu\n",
4101                 (unsigned long long)snap_id,
4102                 (unsigned long long)*snap_size);
4103
4104         return 0;
4105 }
4106
4107 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4108 {
4109         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4110                                         &rbd_dev->header.obj_order,
4111                                         &rbd_dev->header.image_size);
4112 }
4113
4114 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4115 {
4116         void *reply_buf;
4117         int ret;
4118         void *p;
4119
4120         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4121         if (!reply_buf)
4122                 return -ENOMEM;
4123
4124         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4125                                 "rbd", "get_object_prefix", NULL, 0,
4126                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4127         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4128         if (ret < 0)
4129                 goto out;
4130
4131         p = reply_buf;
4132         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4133                                                 p + ret, NULL, GFP_NOIO);
4134         ret = 0;
4135
4136         if (IS_ERR(rbd_dev->header.object_prefix)) {
4137                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4138                 rbd_dev->header.object_prefix = NULL;
4139         } else {
4140                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4141         }
4142 out:
4143         kfree(reply_buf);
4144
4145         return ret;
4146 }
4147
4148 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4149                 u64 *snap_features)
4150 {
4151         __le64 snapid = cpu_to_le64(snap_id);
4152         struct {
4153                 __le64 features;
4154                 __le64 incompat;
4155         } __attribute__ ((packed)) features_buf = { 0 };
4156         u64 incompat;
4157         int ret;
4158
4159         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4160                                 "rbd", "get_features",
4161                                 &snapid, sizeof (snapid),
4162                                 &features_buf, sizeof (features_buf));
4163         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4164         if (ret < 0)
4165                 return ret;
4166         if (ret < sizeof (features_buf))
4167                 return -ERANGE;
4168
4169         incompat = le64_to_cpu(features_buf.incompat);
4170         if (incompat & ~RBD_FEATURES_SUPPORTED)
4171                 return -ENXIO;
4172
4173         *snap_features = le64_to_cpu(features_buf.features);
4174
4175         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4176                 (unsigned long long)snap_id,
4177                 (unsigned long long)*snap_features,
4178                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4179
4180         return 0;
4181 }
4182
4183 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4184 {
4185         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4186                                                 &rbd_dev->header.features);
4187 }
4188
4189 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4190 {
4191         struct rbd_spec *parent_spec;
4192         size_t size;
4193         void *reply_buf = NULL;
4194         __le64 snapid;
4195         void *p;
4196         void *end;
4197         u64 pool_id;
4198         char *image_id;
4199         u64 snap_id;
4200         u64 overlap;
4201         int ret;
4202
4203         parent_spec = rbd_spec_alloc();
4204         if (!parent_spec)
4205                 return -ENOMEM;
4206
4207         size = sizeof (__le64) +                                /* pool_id */
4208                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4209                 sizeof (__le64) +                               /* snap_id */
4210                 sizeof (__le64);                                /* overlap */
4211         reply_buf = kmalloc(size, GFP_KERNEL);
4212         if (!reply_buf) {
4213                 ret = -ENOMEM;
4214                 goto out_err;
4215         }
4216
4217         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4218         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4219                                 "rbd", "get_parent",
4220                                 &snapid, sizeof (snapid),
4221                                 reply_buf, size);
4222         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4223         if (ret < 0)
4224                 goto out_err;
4225
4226         p = reply_buf;
4227         end = reply_buf + ret;
4228         ret = -ERANGE;
4229         ceph_decode_64_safe(&p, end, pool_id, out_err);
4230         if (pool_id == CEPH_NOPOOL) {
4231                 /*
4232                  * Either the parent never existed, or we have
4233                  * record of it but the image got flattened so it no
4234                  * longer has a parent.  When the parent of a
4235                  * layered image disappears we immediately set the
4236                  * overlap to 0.  The effect of this is that all new
4237                  * requests will be treated as if the image had no
4238                  * parent.
4239                  */
4240                 if (rbd_dev->parent_overlap) {
4241                         rbd_dev->parent_overlap = 0;
4242                         smp_mb();
4243                         rbd_dev_parent_put(rbd_dev);
4244                         pr_info("%s: clone image has been flattened\n",
4245                                 rbd_dev->disk->disk_name);
4246                 }
4247
4248                 goto out;       /* No parent?  No problem. */
4249         }
4250
4251         /* The ceph file layout needs to fit pool id in 32 bits */
4252
4253         ret = -EIO;
4254         if (pool_id > (u64)U32_MAX) {
4255                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4256                         (unsigned long long)pool_id, U32_MAX);
4257                 goto out_err;
4258         }
4259
4260         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4261         if (IS_ERR(image_id)) {
4262                 ret = PTR_ERR(image_id);
4263                 goto out_err;
4264         }
4265         ceph_decode_64_safe(&p, end, snap_id, out_err);
4266         ceph_decode_64_safe(&p, end, overlap, out_err);
4267
4268         /*
4269          * The parent won't change (except when the clone is
4270          * flattened, already handled that).  So we only need to
4271          * record the parent spec we have not already done so.
4272          */
4273         if (!rbd_dev->parent_spec) {
4274                 parent_spec->pool_id = pool_id;
4275                 parent_spec->image_id = image_id;
4276                 parent_spec->snap_id = snap_id;
4277                 rbd_dev->parent_spec = parent_spec;
4278                 parent_spec = NULL;     /* rbd_dev now owns this */
4279         } else {
4280                 kfree(image_id);
4281         }
4282
4283         /*
4284          * We always update the parent overlap.  If it's zero we
4285          * treat it specially.
4286          */
4287         rbd_dev->parent_overlap = overlap;
4288         smp_mb();
4289         if (!overlap) {
4290
4291                 /* A null parent_spec indicates it's the initial probe */
4292
4293                 if (parent_spec) {
4294                         /*
4295                          * The overlap has become zero, so the clone
4296                          * must have been resized down to 0 at some
4297                          * point.  Treat this the same as a flatten.
4298                          */
4299                         rbd_dev_parent_put(rbd_dev);
4300                         pr_info("%s: clone image now standalone\n",
4301                                 rbd_dev->disk->disk_name);
4302                 } else {
4303                         /*
4304                          * For the initial probe, if we find the
4305                          * overlap is zero we just pretend there was
4306                          * no parent image.
4307                          */
4308                         rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4309                 }
4310         }
4311 out:
4312         ret = 0;
4313 out_err:
4314         kfree(reply_buf);
4315         rbd_spec_put(parent_spec);
4316
4317         return ret;
4318 }
4319
4320 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4321 {
4322         struct {
4323                 __le64 stripe_unit;
4324                 __le64 stripe_count;
4325         } __attribute__ ((packed)) striping_info_buf = { 0 };
4326         size_t size = sizeof (striping_info_buf);
4327         void *p;
4328         u64 obj_size;
4329         u64 stripe_unit;
4330         u64 stripe_count;
4331         int ret;
4332
4333         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4334                                 "rbd", "get_stripe_unit_count", NULL, 0,
4335                                 (char *)&striping_info_buf, size);
4336         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4337         if (ret < 0)
4338                 return ret;
4339         if (ret < size)
4340                 return -ERANGE;
4341
4342         /*
4343          * We don't actually support the "fancy striping" feature
4344          * (STRIPINGV2) yet, but if the striping sizes are the
4345          * defaults the behavior is the same as before.  So find
4346          * out, and only fail if the image has non-default values.
4347          */
4348         ret = -EINVAL;
4349         obj_size = (u64)1 << rbd_dev->header.obj_order;
4350         p = &striping_info_buf;
4351         stripe_unit = ceph_decode_64(&p);
4352         if (stripe_unit != obj_size) {
4353                 rbd_warn(rbd_dev, "unsupported stripe unit "
4354                                 "(got %llu want %llu)",
4355                                 stripe_unit, obj_size);
4356                 return -EINVAL;
4357         }
4358         stripe_count = ceph_decode_64(&p);
4359         if (stripe_count != 1) {
4360                 rbd_warn(rbd_dev, "unsupported stripe count "
4361                                 "(got %llu want 1)", stripe_count);
4362                 return -EINVAL;
4363         }
4364         rbd_dev->header.stripe_unit = stripe_unit;
4365         rbd_dev->header.stripe_count = stripe_count;
4366
4367         return 0;
4368 }
4369
4370 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4371 {
4372         size_t image_id_size;
4373         char *image_id;
4374         void *p;
4375         void *end;
4376         size_t size;
4377         void *reply_buf = NULL;
4378         size_t len = 0;
4379         char *image_name = NULL;
4380         int ret;
4381
4382         rbd_assert(!rbd_dev->spec->image_name);
4383
4384         len = strlen(rbd_dev->spec->image_id);
4385         image_id_size = sizeof (__le32) + len;
4386         image_id = kmalloc(image_id_size, GFP_KERNEL);
4387         if (!image_id)
4388                 return NULL;
4389
4390         p = image_id;
4391         end = image_id + image_id_size;
4392         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4393
4394         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4395         reply_buf = kmalloc(size, GFP_KERNEL);
4396         if (!reply_buf)
4397                 goto out;
4398
4399         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4400                                 "rbd", "dir_get_name",
4401                                 image_id, image_id_size,
4402                                 reply_buf, size);
4403         if (ret < 0)
4404                 goto out;
4405         p = reply_buf;
4406         end = reply_buf + ret;
4407
4408         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4409         if (IS_ERR(image_name))
4410                 image_name = NULL;
4411         else
4412                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4413 out:
4414         kfree(reply_buf);
4415         kfree(image_id);
4416
4417         return image_name;
4418 }
4419
4420 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4421 {
4422         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4423         const char *snap_name;
4424         u32 which = 0;
4425
4426         /* Skip over names until we find the one we are looking for */
4427
4428         snap_name = rbd_dev->header.snap_names;
4429         while (which < snapc->num_snaps) {
4430                 if (!strcmp(name, snap_name))
4431                         return snapc->snaps[which];
4432                 snap_name += strlen(snap_name) + 1;
4433                 which++;
4434         }
4435         return CEPH_NOSNAP;
4436 }
4437
4438 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4439 {
4440         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4441         u32 which;
4442         bool found = false;
4443         u64 snap_id;
4444
4445         for (which = 0; !found && which < snapc->num_snaps; which++) {
4446                 const char *snap_name;
4447
4448                 snap_id = snapc->snaps[which];
4449                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4450                 if (IS_ERR(snap_name)) {
4451                         /* ignore no-longer existing snapshots */
4452                         if (PTR_ERR(snap_name) == -ENOENT)
4453                                 continue;
4454                         else
4455                                 break;
4456                 }
4457                 found = !strcmp(name, snap_name);
4458                 kfree(snap_name);
4459         }
4460         return found ? snap_id : CEPH_NOSNAP;
4461 }
4462
4463 /*
4464  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4465  * no snapshot by that name is found, or if an error occurs.
4466  */
4467 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4468 {
4469         if (rbd_dev->image_format == 1)
4470                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4471
4472         return rbd_v2_snap_id_by_name(rbd_dev, name);
4473 }
4474
4475 /*
4476  * An image being mapped will have everything but the snap id.
4477  */
4478 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4479 {
4480         struct rbd_spec *spec = rbd_dev->spec;
4481
4482         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4483         rbd_assert(spec->image_id && spec->image_name);
4484         rbd_assert(spec->snap_name);
4485
4486         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4487                 u64 snap_id;
4488
4489                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4490                 if (snap_id == CEPH_NOSNAP)
4491                         return -ENOENT;
4492
4493                 spec->snap_id = snap_id;
4494         } else {
4495                 spec->snap_id = CEPH_NOSNAP;
4496         }
4497
4498         return 0;
4499 }
4500
4501 /*
4502  * A parent image will have all ids but none of the names.
4503  *
4504  * All names in an rbd spec are dynamically allocated.  It's OK if we
4505  * can't figure out the name for an image id.
4506  */
4507 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4508 {
4509         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4510         struct rbd_spec *spec = rbd_dev->spec;
4511         const char *pool_name;
4512         const char *image_name;
4513         const char *snap_name;
4514         int ret;
4515
4516         rbd_assert(spec->pool_id != CEPH_NOPOOL);
4517         rbd_assert(spec->image_id);
4518         rbd_assert(spec->snap_id != CEPH_NOSNAP);
4519
4520         /* Get the pool name; we have to make our own copy of this */
4521
4522         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4523         if (!pool_name) {
4524                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4525                 return -EIO;
4526         }
4527         pool_name = kstrdup(pool_name, GFP_KERNEL);
4528         if (!pool_name)
4529                 return -ENOMEM;
4530
4531         /* Fetch the image name; tolerate failure here */
4532
4533         image_name = rbd_dev_image_name(rbd_dev);
4534         if (!image_name)
4535                 rbd_warn(rbd_dev, "unable to get image name");
4536
4537         /* Fetch the snapshot name */
4538
4539         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4540         if (IS_ERR(snap_name)) {
4541                 ret = PTR_ERR(snap_name);
4542                 goto out_err;
4543         }
4544
4545         spec->pool_name = pool_name;
4546         spec->image_name = image_name;
4547         spec->snap_name = snap_name;
4548
4549         return 0;
4550
4551 out_err:
4552         kfree(image_name);
4553         kfree(pool_name);
4554         return ret;
4555 }
4556
4557 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4558 {
4559         size_t size;
4560         int ret;
4561         void *reply_buf;
4562         void *p;
4563         void *end;
4564         u64 seq;
4565         u32 snap_count;
4566         struct ceph_snap_context *snapc;
4567         u32 i;
4568
4569         /*
4570          * We'll need room for the seq value (maximum snapshot id),
4571          * snapshot count, and array of that many snapshot ids.
4572          * For now we have a fixed upper limit on the number we're
4573          * prepared to receive.
4574          */
4575         size = sizeof (__le64) + sizeof (__le32) +
4576                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4577         reply_buf = kzalloc(size, GFP_KERNEL);
4578         if (!reply_buf)
4579                 return -ENOMEM;
4580
4581         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4582                                 "rbd", "get_snapcontext", NULL, 0,
4583                                 reply_buf, size);
4584         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4585         if (ret < 0)
4586                 goto out;
4587
4588         p = reply_buf;
4589         end = reply_buf + ret;
4590         ret = -ERANGE;
4591         ceph_decode_64_safe(&p, end, seq, out);
4592         ceph_decode_32_safe(&p, end, snap_count, out);
4593
4594         /*
4595          * Make sure the reported number of snapshot ids wouldn't go
4596          * beyond the end of our buffer.  But before checking that,
4597          * make sure the computed size of the snapshot context we
4598          * allocate is representable in a size_t.
4599          */
4600         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4601                                  / sizeof (u64)) {
4602                 ret = -EINVAL;
4603                 goto out;
4604         }
4605         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4606                 goto out;
4607         ret = 0;
4608
4609         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4610         if (!snapc) {
4611                 ret = -ENOMEM;
4612                 goto out;
4613         }
4614         snapc->seq = seq;
4615         for (i = 0; i < snap_count; i++)
4616                 snapc->snaps[i] = ceph_decode_64(&p);
4617
4618         ceph_put_snap_context(rbd_dev->header.snapc);
4619         rbd_dev->header.snapc = snapc;
4620
4621         dout("  snap context seq = %llu, snap_count = %u\n",
4622                 (unsigned long long)seq, (unsigned int)snap_count);
4623 out:
4624         kfree(reply_buf);
4625
4626         return ret;
4627 }
4628
4629 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4630                                         u64 snap_id)
4631 {
4632         size_t size;
4633         void *reply_buf;
4634         __le64 snapid;
4635         int ret;
4636         void *p;
4637         void *end;
4638         char *snap_name;
4639
4640         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4641         reply_buf = kmalloc(size, GFP_KERNEL);
4642         if (!reply_buf)
4643                 return ERR_PTR(-ENOMEM);
4644
4645         snapid = cpu_to_le64(snap_id);
4646         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4647                                 "rbd", "get_snapshot_name",
4648                                 &snapid, sizeof (snapid),
4649                                 reply_buf, size);
4650         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4651         if (ret < 0) {
4652                 snap_name = ERR_PTR(ret);
4653                 goto out;
4654         }
4655
4656         p = reply_buf;
4657         end = reply_buf + ret;
4658         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4659         if (IS_ERR(snap_name))
4660                 goto out;
4661
4662         dout("  snap_id 0x%016llx snap_name = %s\n",
4663                 (unsigned long long)snap_id, snap_name);
4664 out:
4665         kfree(reply_buf);
4666
4667         return snap_name;
4668 }
4669
4670 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4671 {
4672         bool first_time = rbd_dev->header.object_prefix == NULL;
4673         int ret;
4674
4675         ret = rbd_dev_v2_image_size(rbd_dev);
4676         if (ret)
4677                 return ret;
4678
4679         if (first_time) {
4680                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4681                 if (ret)
4682                         return ret;
4683         }
4684
4685         ret = rbd_dev_v2_snap_context(rbd_dev);
4686         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4687
4688         return ret;
4689 }
4690
4691 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4692 {
4693         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4694
4695         if (rbd_dev->image_format == 1)
4696                 return rbd_dev_v1_header_info(rbd_dev);
4697
4698         return rbd_dev_v2_header_info(rbd_dev);
4699 }
4700
4701 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4702 {
4703         struct device *dev;
4704         int ret;
4705
4706         dev = &rbd_dev->dev;
4707         dev->bus = &rbd_bus_type;
4708         dev->type = &rbd_device_type;
4709         dev->parent = &rbd_root_dev;
4710         dev->release = rbd_dev_device_release;
4711         dev_set_name(dev, "%d", rbd_dev->dev_id);
4712         ret = device_register(dev);
4713
4714         return ret;
4715 }
4716
4717 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4718 {
4719         device_unregister(&rbd_dev->dev);
4720 }
4721
4722 /*
4723  * Get a unique rbd identifier for the given new rbd_dev, and add
4724  * the rbd_dev to the global list.
4725  */
4726 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4727 {
4728         int new_dev_id;
4729
4730         new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4731                                     0, minor_to_rbd_dev_id(1 << MINORBITS),
4732                                     GFP_KERNEL);
4733         if (new_dev_id < 0)
4734                 return new_dev_id;
4735
4736         rbd_dev->dev_id = new_dev_id;
4737
4738         spin_lock(&rbd_dev_list_lock);
4739         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4740         spin_unlock(&rbd_dev_list_lock);
4741
4742         dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4743
4744         return 0;
4745 }
4746
4747 /*
4748  * Remove an rbd_dev from the global list, and record that its
4749  * identifier is no longer in use.
4750  */
4751 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4752 {
4753         spin_lock(&rbd_dev_list_lock);
4754         list_del_init(&rbd_dev->node);
4755         spin_unlock(&rbd_dev_list_lock);
4756
4757         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4758
4759         dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4760 }
4761
4762 /*
4763  * Skips over white space at *buf, and updates *buf to point to the
4764  * first found non-space character (if any). Returns the length of
4765  * the token (string of non-white space characters) found.  Note
4766  * that *buf must be terminated with '\0'.
4767  */
4768 static inline size_t next_token(const char **buf)
4769 {
4770         /*
4771         * These are the characters that produce nonzero for
4772         * isspace() in the "C" and "POSIX" locales.
4773         */
4774         const char *spaces = " \f\n\r\t\v";
4775
4776         *buf += strspn(*buf, spaces);   /* Find start of token */
4777
4778         return strcspn(*buf, spaces);   /* Return token length */
4779 }
4780
4781 /*
4782  * Finds the next token in *buf, and if the provided token buffer is
4783  * big enough, copies the found token into it.  The result, if
4784  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4785  * must be terminated with '\0' on entry.
4786  *
4787  * Returns the length of the token found (not including the '\0').
4788  * Return value will be 0 if no token is found, and it will be >=
4789  * token_size if the token would not fit.
4790  *
4791  * The *buf pointer will be updated to point beyond the end of the
4792  * found token.  Note that this occurs even if the token buffer is
4793  * too small to hold it.
4794  */
4795 static inline size_t copy_token(const char **buf,
4796                                 char *token,
4797                                 size_t token_size)
4798 {
4799         size_t len;
4800
4801         len = next_token(buf);
4802         if (len < token_size) {
4803                 memcpy(token, *buf, len);
4804                 *(token + len) = '\0';
4805         }
4806         *buf += len;
4807
4808         return len;
4809 }
4810
4811 /*
4812  * Finds the next token in *buf, dynamically allocates a buffer big
4813  * enough to hold a copy of it, and copies the token into the new
4814  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4815  * that a duplicate buffer is created even for a zero-length token.
4816  *
4817  * Returns a pointer to the newly-allocated duplicate, or a null
4818  * pointer if memory for the duplicate was not available.  If
4819  * the lenp argument is a non-null pointer, the length of the token
4820  * (not including the '\0') is returned in *lenp.
4821  *
4822  * If successful, the *buf pointer will be updated to point beyond
4823  * the end of the found token.
4824  *
4825  * Note: uses GFP_KERNEL for allocation.
4826  */
4827 static inline char *dup_token(const char **buf, size_t *lenp)
4828 {
4829         char *dup;
4830         size_t len;
4831
4832         len = next_token(buf);
4833         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4834         if (!dup)
4835                 return NULL;
4836         *(dup + len) = '\0';
4837         *buf += len;
4838
4839         if (lenp)
4840                 *lenp = len;
4841
4842         return dup;
4843 }
4844
4845 /*
4846  * Parse the options provided for an "rbd add" (i.e., rbd image
4847  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4848  * and the data written is passed here via a NUL-terminated buffer.
4849  * Returns 0 if successful or an error code otherwise.
4850  *
4851  * The information extracted from these options is recorded in
4852  * the other parameters which return dynamically-allocated
4853  * structures:
4854  *  ceph_opts
4855  *      The address of a pointer that will refer to a ceph options
4856  *      structure.  Caller must release the returned pointer using
4857  *      ceph_destroy_options() when it is no longer needed.
4858  *  rbd_opts
4859  *      Address of an rbd options pointer.  Fully initialized by
4860  *      this function; caller must release with kfree().
4861  *  spec
4862  *      Address of an rbd image specification pointer.  Fully
4863  *      initialized by this function based on parsed options.
4864  *      Caller must release with rbd_spec_put().
4865  *
4866  * The options passed take this form:
4867  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4868  * where:
4869  *  <mon_addrs>
4870  *      A comma-separated list of one or more monitor addresses.
4871  *      A monitor address is an ip address, optionally followed
4872  *      by a port number (separated by a colon).
4873  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4874  *  <options>
4875  *      A comma-separated list of ceph and/or rbd options.
4876  *  <pool_name>
4877  *      The name of the rados pool containing the rbd image.
4878  *  <image_name>
4879  *      The name of the image in that pool to map.
4880  *  <snap_id>
4881  *      An optional snapshot id.  If provided, the mapping will
4882  *      present data from the image at the time that snapshot was
4883  *      created.  The image head is used if no snapshot id is
4884  *      provided.  Snapshot mappings are always read-only.
4885  */
4886 static int rbd_add_parse_args(const char *buf,
4887                                 struct ceph_options **ceph_opts,
4888                                 struct rbd_options **opts,
4889                                 struct rbd_spec **rbd_spec)
4890 {
4891         size_t len;
4892         char *options;
4893         const char *mon_addrs;
4894         char *snap_name;
4895         size_t mon_addrs_size;
4896         struct rbd_spec *spec = NULL;
4897         struct rbd_options *rbd_opts = NULL;
4898         struct ceph_options *copts;
4899         int ret;
4900
4901         /* The first four tokens are required */
4902
4903         len = next_token(&buf);
4904         if (!len) {
4905                 rbd_warn(NULL, "no monitor address(es) provided");
4906                 return -EINVAL;
4907         }
4908         mon_addrs = buf;
4909         mon_addrs_size = len + 1;
4910         buf += len;
4911
4912         ret = -EINVAL;
4913         options = dup_token(&buf, NULL);
4914         if (!options)
4915                 return -ENOMEM;
4916         if (!*options) {
4917                 rbd_warn(NULL, "no options provided");
4918                 goto out_err;
4919         }
4920
4921         spec = rbd_spec_alloc();
4922         if (!spec)
4923                 goto out_mem;
4924
4925         spec->pool_name = dup_token(&buf, NULL);
4926         if (!spec->pool_name)
4927                 goto out_mem;
4928         if (!*spec->pool_name) {
4929                 rbd_warn(NULL, "no pool name provided");
4930                 goto out_err;
4931         }
4932
4933         spec->image_name = dup_token(&buf, NULL);
4934         if (!spec->image_name)
4935                 goto out_mem;
4936         if (!*spec->image_name) {
4937                 rbd_warn(NULL, "no image name provided");
4938                 goto out_err;
4939         }
4940
4941         /*
4942          * Snapshot name is optional; default is to use "-"
4943          * (indicating the head/no snapshot).
4944          */
4945         len = next_token(&buf);
4946         if (!len) {
4947                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4948                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4949         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4950                 ret = -ENAMETOOLONG;
4951                 goto out_err;
4952         }
4953         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4954         if (!snap_name)
4955                 goto out_mem;
4956         *(snap_name + len) = '\0';
4957         spec->snap_name = snap_name;
4958
4959         /* Initialize all rbd options to the defaults */
4960
4961         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4962         if (!rbd_opts)
4963                 goto out_mem;
4964
4965         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4966
4967         copts = ceph_parse_options(options, mon_addrs,
4968                                         mon_addrs + mon_addrs_size - 1,
4969                                         parse_rbd_opts_token, rbd_opts);
4970         if (IS_ERR(copts)) {
4971                 ret = PTR_ERR(copts);
4972                 goto out_err;
4973         }
4974         kfree(options);
4975
4976         *ceph_opts = copts;
4977         *opts = rbd_opts;
4978         *rbd_spec = spec;
4979
4980         return 0;
4981 out_mem:
4982         ret = -ENOMEM;
4983 out_err:
4984         kfree(rbd_opts);
4985         rbd_spec_put(spec);
4986         kfree(options);
4987
4988         return ret;
4989 }
4990
4991 /*
4992  * Return pool id (>= 0) or a negative error code.
4993  */
4994 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4995 {
4996         u64 newest_epoch;
4997         unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4998         int tries = 0;
4999         int ret;
5000
5001 again:
5002         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5003         if (ret == -ENOENT && tries++ < 1) {
5004                 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
5005                                                &newest_epoch);
5006                 if (ret < 0)
5007                         return ret;
5008
5009                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5010                         ceph_monc_request_next_osdmap(&rbdc->client->monc);
5011                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5012                                                      newest_epoch, timeout);
5013                         goto again;
5014                 } else {
5015                         /* the osdmap we have is new enough */
5016                         return -ENOENT;
5017                 }
5018         }
5019
5020         return ret;
5021 }
5022
5023 /*
5024  * An rbd format 2 image has a unique identifier, distinct from the
5025  * name given to it by the user.  Internally, that identifier is
5026  * what's used to specify the names of objects related to the image.
5027  *
5028  * A special "rbd id" object is used to map an rbd image name to its
5029  * id.  If that object doesn't exist, then there is no v2 rbd image
5030  * with the supplied name.
5031  *
5032  * This function will record the given rbd_dev's image_id field if
5033  * it can be determined, and in that case will return 0.  If any
5034  * errors occur a negative errno will be returned and the rbd_dev's
5035  * image_id field will be unchanged (and should be NULL).
5036  */
5037 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5038 {
5039         int ret;
5040         size_t size;
5041         char *object_name;
5042         void *response;
5043         char *image_id;
5044
5045         /*
5046          * When probing a parent image, the image id is already
5047          * known (and the image name likely is not).  There's no
5048          * need to fetch the image id again in this case.  We
5049          * do still need to set the image format though.
5050          */
5051         if (rbd_dev->spec->image_id) {
5052                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5053
5054                 return 0;
5055         }
5056
5057         /*
5058          * First, see if the format 2 image id file exists, and if
5059          * so, get the image's persistent id from it.
5060          */
5061         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5062         object_name = kmalloc(size, GFP_NOIO);
5063         if (!object_name)
5064                 return -ENOMEM;
5065         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5066         dout("rbd id object name is %s\n", object_name);
5067
5068         /* Response will be an encoded string, which includes a length */
5069
5070         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5071         response = kzalloc(size, GFP_NOIO);
5072         if (!response) {
5073                 ret = -ENOMEM;
5074                 goto out;
5075         }
5076
5077         /* If it doesn't exist we'll assume it's a format 1 image */
5078
5079         ret = rbd_obj_method_sync(rbd_dev, object_name,
5080                                 "rbd", "get_id", NULL, 0,
5081                                 response, RBD_IMAGE_ID_LEN_MAX);
5082         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5083         if (ret == -ENOENT) {
5084                 image_id = kstrdup("", GFP_KERNEL);
5085                 ret = image_id ? 0 : -ENOMEM;
5086                 if (!ret)
5087                         rbd_dev->image_format = 1;
5088         } else if (ret >= 0) {
5089                 void *p = response;
5090
5091                 image_id = ceph_extract_encoded_string(&p, p + ret,
5092                                                 NULL, GFP_NOIO);
5093                 ret = PTR_ERR_OR_ZERO(image_id);
5094                 if (!ret)
5095                         rbd_dev->image_format = 2;
5096         }
5097
5098         if (!ret) {
5099                 rbd_dev->spec->image_id = image_id;
5100                 dout("image_id is %s\n", image_id);
5101         }
5102 out:
5103         kfree(response);
5104         kfree(object_name);
5105
5106         return ret;
5107 }
5108
5109 /*
5110  * Undo whatever state changes are made by v1 or v2 header info
5111  * call.
5112  */
5113 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5114 {
5115         struct rbd_image_header *header;
5116
5117         /* Drop parent reference unless it's already been done (or none) */
5118
5119         if (rbd_dev->parent_overlap)
5120                 rbd_dev_parent_put(rbd_dev);
5121
5122         /* Free dynamic fields from the header, then zero it out */
5123
5124         header = &rbd_dev->header;
5125         ceph_put_snap_context(header->snapc);
5126         kfree(header->snap_sizes);
5127         kfree(header->snap_names);
5128         kfree(header->object_prefix);
5129         memset(header, 0, sizeof (*header));
5130 }
5131
5132 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5133 {
5134         int ret;
5135
5136         ret = rbd_dev_v2_object_prefix(rbd_dev);
5137         if (ret)
5138                 goto out_err;
5139
5140         /*
5141          * Get the and check features for the image.  Currently the
5142          * features are assumed to never change.
5143          */
5144         ret = rbd_dev_v2_features(rbd_dev);
5145         if (ret)
5146                 goto out_err;
5147
5148         /* If the image supports fancy striping, get its parameters */
5149
5150         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5151                 ret = rbd_dev_v2_striping_info(rbd_dev);
5152                 if (ret < 0)
5153                         goto out_err;
5154         }
5155         /* No support for crypto and compression type format 2 images */
5156
5157         return 0;
5158 out_err:
5159         rbd_dev->header.features = 0;
5160         kfree(rbd_dev->header.object_prefix);
5161         rbd_dev->header.object_prefix = NULL;
5162
5163         return ret;
5164 }
5165
5166 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5167 {
5168         struct rbd_device *parent = NULL;
5169         struct rbd_spec *parent_spec;
5170         struct rbd_client *rbdc;
5171         int ret;
5172
5173         if (!rbd_dev->parent_spec)
5174                 return 0;
5175         /*
5176          * We need to pass a reference to the client and the parent
5177          * spec when creating the parent rbd_dev.  Images related by
5178          * parent/child relationships always share both.
5179          */
5180         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
5181         rbdc = __rbd_get_client(rbd_dev->rbd_client);
5182
5183         ret = -ENOMEM;
5184         parent = rbd_dev_create(rbdc, parent_spec);
5185         if (!parent)
5186                 goto out_err;
5187
5188         ret = rbd_dev_image_probe(parent, false);
5189         if (ret < 0)
5190                 goto out_err;
5191         rbd_dev->parent = parent;
5192         atomic_set(&rbd_dev->parent_ref, 1);
5193
5194         return 0;
5195 out_err:
5196         if (parent) {
5197                 rbd_dev_unparent(rbd_dev);
5198                 kfree(rbd_dev->header_name);
5199                 rbd_dev_destroy(parent);
5200         } else {
5201                 rbd_put_client(rbdc);
5202                 rbd_spec_put(parent_spec);
5203         }
5204
5205         return ret;
5206 }
5207
5208 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5209 {
5210         int ret;
5211
5212         /* Get an id and fill in device name. */
5213
5214         ret = rbd_dev_id_get(rbd_dev);
5215         if (ret)
5216                 return ret;
5217
5218         BUILD_BUG_ON(DEV_NAME_LEN
5219                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5220         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5221
5222         /* Record our major and minor device numbers. */
5223
5224         if (!single_major) {
5225                 ret = register_blkdev(0, rbd_dev->name);
5226                 if (ret < 0)
5227                         goto err_out_id;
5228
5229                 rbd_dev->major = ret;
5230                 rbd_dev->minor = 0;
5231         } else {
5232                 rbd_dev->major = rbd_major;
5233                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5234         }
5235
5236         /* Set up the blkdev mapping. */
5237
5238         ret = rbd_init_disk(rbd_dev);
5239         if (ret)
5240                 goto err_out_blkdev;
5241
5242         ret = rbd_dev_mapping_set(rbd_dev);
5243         if (ret)
5244                 goto err_out_disk;
5245
5246         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5247         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5248
5249         ret = rbd_bus_add_dev(rbd_dev);
5250         if (ret)
5251                 goto err_out_mapping;
5252
5253         /* Everything's ready.  Announce the disk to the world. */
5254
5255         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5256         add_disk(rbd_dev->disk);
5257
5258         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5259                 (unsigned long long) rbd_dev->mapping.size);
5260
5261         return ret;
5262
5263 err_out_mapping:
5264         rbd_dev_mapping_clear(rbd_dev);
5265 err_out_disk:
5266         rbd_free_disk(rbd_dev);
5267 err_out_blkdev:
5268         if (!single_major)
5269                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5270 err_out_id:
5271         rbd_dev_id_put(rbd_dev);
5272         rbd_dev_mapping_clear(rbd_dev);
5273
5274         return ret;
5275 }
5276
5277 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5278 {
5279         struct rbd_spec *spec = rbd_dev->spec;
5280         size_t size;
5281
5282         /* Record the header object name for this rbd image. */
5283
5284         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5285
5286         if (rbd_dev->image_format == 1)
5287                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5288         else
5289                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5290
5291         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5292         if (!rbd_dev->header_name)
5293                 return -ENOMEM;
5294
5295         if (rbd_dev->image_format == 1)
5296                 sprintf(rbd_dev->header_name, "%s%s",
5297                         spec->image_name, RBD_SUFFIX);
5298         else
5299                 sprintf(rbd_dev->header_name, "%s%s",
5300                         RBD_HEADER_PREFIX, spec->image_id);
5301         return 0;
5302 }
5303
5304 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5305 {
5306         rbd_dev_unprobe(rbd_dev);
5307         kfree(rbd_dev->header_name);
5308         rbd_dev->header_name = NULL;
5309         rbd_dev->image_format = 0;
5310         kfree(rbd_dev->spec->image_id);
5311         rbd_dev->spec->image_id = NULL;
5312
5313         rbd_dev_destroy(rbd_dev);
5314 }
5315
5316 /*
5317  * Probe for the existence of the header object for the given rbd
5318  * device.  If this image is the one being mapped (i.e., not a
5319  * parent), initiate a watch on its header object before using that
5320  * object to get detailed information about the rbd image.
5321  */
5322 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5323 {
5324         int ret;
5325
5326         /*
5327          * Get the id from the image id object.  Unless there's an
5328          * error, rbd_dev->spec->image_id will be filled in with
5329          * a dynamically-allocated string, and rbd_dev->image_format
5330          * will be set to either 1 or 2.
5331          */
5332         ret = rbd_dev_image_id(rbd_dev);
5333         if (ret)
5334                 return ret;
5335
5336         ret = rbd_dev_header_name(rbd_dev);
5337         if (ret)
5338                 goto err_out_format;
5339
5340         if (mapping) {
5341                 ret = rbd_dev_header_watch_sync(rbd_dev);
5342                 if (ret)
5343                         goto out_header_name;
5344         }
5345
5346         ret = rbd_dev_header_info(rbd_dev);
5347         if (ret)
5348                 goto err_out_watch;
5349
5350         /*
5351          * If this image is the one being mapped, we have pool name and
5352          * id, image name and id, and snap name - need to fill snap id.
5353          * Otherwise this is a parent image, identified by pool, image
5354          * and snap ids - need to fill in names for those ids.
5355          */
5356         if (mapping)
5357                 ret = rbd_spec_fill_snap_id(rbd_dev);
5358         else
5359                 ret = rbd_spec_fill_names(rbd_dev);
5360         if (ret)
5361                 goto err_out_probe;
5362
5363         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5364                 ret = rbd_dev_v2_parent_info(rbd_dev);
5365                 if (ret)
5366                         goto err_out_probe;
5367
5368                 /*
5369                  * Need to warn users if this image is the one being
5370                  * mapped and has a parent.
5371                  */
5372                 if (mapping && rbd_dev->parent_spec)
5373                         rbd_warn(rbd_dev,
5374                                  "WARNING: kernel layering is EXPERIMENTAL!");
5375         }
5376
5377         ret = rbd_dev_probe_parent(rbd_dev);
5378         if (ret)
5379                 goto err_out_probe;
5380
5381         dout("discovered format %u image, header name is %s\n",
5382                 rbd_dev->image_format, rbd_dev->header_name);
5383         return 0;
5384
5385 err_out_probe:
5386         rbd_dev_unprobe(rbd_dev);
5387 err_out_watch:
5388         if (mapping)
5389                 rbd_dev_header_unwatch_sync(rbd_dev);
5390 out_header_name:
5391         kfree(rbd_dev->header_name);
5392         rbd_dev->header_name = NULL;
5393 err_out_format:
5394         rbd_dev->image_format = 0;
5395         kfree(rbd_dev->spec->image_id);
5396         rbd_dev->spec->image_id = NULL;
5397         return ret;
5398 }
5399
5400 static ssize_t do_rbd_add(struct bus_type *bus,
5401                           const char *buf,
5402                           size_t count)
5403 {
5404         struct rbd_device *rbd_dev = NULL;
5405         struct ceph_options *ceph_opts = NULL;
5406         struct rbd_options *rbd_opts = NULL;
5407         struct rbd_spec *spec = NULL;
5408         struct rbd_client *rbdc;
5409         bool read_only;
5410         int rc = -ENOMEM;
5411
5412         if (!try_module_get(THIS_MODULE))
5413                 return -ENODEV;
5414
5415         /* parse add command */
5416         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5417         if (rc < 0)
5418                 goto err_out_module;
5419         read_only = rbd_opts->read_only;
5420         kfree(rbd_opts);
5421         rbd_opts = NULL;        /* done with this */
5422
5423         rbdc = rbd_get_client(ceph_opts);
5424         if (IS_ERR(rbdc)) {
5425                 rc = PTR_ERR(rbdc);
5426                 goto err_out_args;
5427         }
5428
5429         /* pick the pool */
5430         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5431         if (rc < 0)
5432                 goto err_out_client;
5433         spec->pool_id = (u64)rc;
5434
5435         /* The ceph file layout needs to fit pool id in 32 bits */
5436
5437         if (spec->pool_id > (u64)U32_MAX) {
5438                 rbd_warn(NULL, "pool id too large (%llu > %u)",
5439                                 (unsigned long long)spec->pool_id, U32_MAX);
5440                 rc = -EIO;
5441                 goto err_out_client;
5442         }
5443
5444         rbd_dev = rbd_dev_create(rbdc, spec);
5445         if (!rbd_dev)
5446                 goto err_out_client;
5447         rbdc = NULL;            /* rbd_dev now owns this */
5448         spec = NULL;            /* rbd_dev now owns this */
5449
5450         rc = rbd_dev_image_probe(rbd_dev, true);
5451         if (rc < 0)
5452                 goto err_out_rbd_dev;
5453
5454         /* If we are mapping a snapshot it must be marked read-only */
5455
5456         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5457                 read_only = true;
5458         rbd_dev->mapping.read_only = read_only;
5459
5460         rc = rbd_dev_device_setup(rbd_dev);
5461         if (rc) {
5462                 /*
5463                  * rbd_dev_header_unwatch_sync() can't be moved into
5464                  * rbd_dev_image_release() without refactoring, see
5465                  * commit 1f3ef78861ac.
5466                  */
5467                 rbd_dev_header_unwatch_sync(rbd_dev);
5468                 rbd_dev_image_release(rbd_dev);
5469                 goto err_out_module;
5470         }
5471
5472         return count;
5473
5474 err_out_rbd_dev:
5475         rbd_dev_destroy(rbd_dev);
5476 err_out_client:
5477         rbd_put_client(rbdc);
5478 err_out_args:
5479         rbd_spec_put(spec);
5480 err_out_module:
5481         module_put(THIS_MODULE);
5482
5483         dout("Error adding device %s\n", buf);
5484
5485         return (ssize_t)rc;
5486 }
5487
5488 static ssize_t rbd_add(struct bus_type *bus,
5489                        const char *buf,
5490                        size_t count)
5491 {
5492         if (single_major)
5493                 return -EINVAL;
5494
5495         return do_rbd_add(bus, buf, count);
5496 }
5497
5498 static ssize_t rbd_add_single_major(struct bus_type *bus,
5499                                     const char *buf,
5500                                     size_t count)
5501 {
5502         return do_rbd_add(bus, buf, count);
5503 }
5504
5505 static void rbd_dev_device_release(struct device *dev)
5506 {
5507         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5508
5509         rbd_free_disk(rbd_dev);
5510         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5511         rbd_dev_mapping_clear(rbd_dev);
5512         if (!single_major)
5513                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5514         rbd_dev_id_put(rbd_dev);
5515         rbd_dev_mapping_clear(rbd_dev);
5516 }
5517
5518 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5519 {
5520         while (rbd_dev->parent) {
5521                 struct rbd_device *first = rbd_dev;
5522                 struct rbd_device *second = first->parent;
5523                 struct rbd_device *third;
5524
5525                 /*
5526                  * Follow to the parent with no grandparent and
5527                  * remove it.
5528                  */
5529                 while (second && (third = second->parent)) {
5530                         first = second;
5531                         second = third;
5532                 }
5533                 rbd_assert(second);
5534                 rbd_dev_image_release(second);
5535                 first->parent = NULL;
5536                 first->parent_overlap = 0;
5537
5538                 rbd_assert(first->parent_spec);
5539                 rbd_spec_put(first->parent_spec);
5540                 first->parent_spec = NULL;
5541         }
5542 }
5543
5544 static ssize_t do_rbd_remove(struct bus_type *bus,
5545                              const char *buf,
5546                              size_t count)
5547 {
5548         struct rbd_device *rbd_dev = NULL;
5549         struct list_head *tmp;
5550         int dev_id;
5551         unsigned long ul;
5552         bool already = false;
5553         int ret;
5554
5555         ret = kstrtoul(buf, 10, &ul);
5556         if (ret)
5557                 return ret;
5558
5559         /* convert to int; abort if we lost anything in the conversion */
5560         dev_id = (int)ul;
5561         if (dev_id != ul)
5562                 return -EINVAL;
5563
5564         ret = -ENOENT;
5565         spin_lock(&rbd_dev_list_lock);
5566         list_for_each(tmp, &rbd_dev_list) {
5567                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5568                 if (rbd_dev->dev_id == dev_id) {
5569                         ret = 0;
5570                         break;
5571                 }
5572         }
5573         if (!ret) {
5574                 spin_lock_irq(&rbd_dev->lock);
5575                 if (rbd_dev->open_count)
5576                         ret = -EBUSY;
5577                 else
5578                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5579                                                         &rbd_dev->flags);
5580                 spin_unlock_irq(&rbd_dev->lock);
5581         }
5582         spin_unlock(&rbd_dev_list_lock);
5583         if (ret < 0 || already)
5584                 return ret;
5585
5586         rbd_dev_header_unwatch_sync(rbd_dev);
5587         /*
5588          * flush remaining watch callbacks - these must be complete
5589          * before the osd_client is shutdown
5590          */
5591         dout("%s: flushing notifies", __func__);
5592         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5593
5594         /*
5595          * Don't free anything from rbd_dev->disk until after all
5596          * notifies are completely processed. Otherwise
5597          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5598          * in a potential use after free of rbd_dev->disk or rbd_dev.
5599          */
5600         rbd_bus_del_dev(rbd_dev);
5601         rbd_dev_image_release(rbd_dev);
5602         module_put(THIS_MODULE);
5603
5604         return count;
5605 }
5606
5607 static ssize_t rbd_remove(struct bus_type *bus,
5608                           const char *buf,
5609                           size_t count)
5610 {
5611         if (single_major)
5612                 return -EINVAL;
5613
5614         return do_rbd_remove(bus, buf, count);
5615 }
5616
5617 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5618                                        const char *buf,
5619                                        size_t count)
5620 {
5621         return do_rbd_remove(bus, buf, count);
5622 }
5623
5624 /*
5625  * create control files in sysfs
5626  * /sys/bus/rbd/...
5627  */
5628 static int rbd_sysfs_init(void)
5629 {
5630         int ret;
5631
5632         ret = device_register(&rbd_root_dev);
5633         if (ret < 0)
5634                 return ret;
5635
5636         ret = bus_register(&rbd_bus_type);
5637         if (ret < 0)
5638                 device_unregister(&rbd_root_dev);
5639
5640         return ret;
5641 }
5642
5643 static void rbd_sysfs_cleanup(void)
5644 {
5645         bus_unregister(&rbd_bus_type);
5646         device_unregister(&rbd_root_dev);
5647 }
5648
5649 static int rbd_slab_init(void)
5650 {
5651         rbd_assert(!rbd_img_request_cache);
5652         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5653                                         sizeof (struct rbd_img_request),
5654                                         __alignof__(struct rbd_img_request),
5655                                         0, NULL);
5656         if (!rbd_img_request_cache)
5657                 return -ENOMEM;
5658
5659         rbd_assert(!rbd_obj_request_cache);
5660         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5661                                         sizeof (struct rbd_obj_request),
5662                                         __alignof__(struct rbd_obj_request),
5663                                         0, NULL);
5664         if (!rbd_obj_request_cache)
5665                 goto out_err;
5666
5667         rbd_assert(!rbd_segment_name_cache);
5668         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5669                                         CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5670         if (rbd_segment_name_cache)
5671                 return 0;
5672 out_err:
5673         if (rbd_obj_request_cache) {
5674                 kmem_cache_destroy(rbd_obj_request_cache);
5675                 rbd_obj_request_cache = NULL;
5676         }
5677
5678         kmem_cache_destroy(rbd_img_request_cache);
5679         rbd_img_request_cache = NULL;
5680
5681         return -ENOMEM;
5682 }
5683
5684 static void rbd_slab_exit(void)
5685 {
5686         rbd_assert(rbd_segment_name_cache);
5687         kmem_cache_destroy(rbd_segment_name_cache);
5688         rbd_segment_name_cache = NULL;
5689
5690         rbd_assert(rbd_obj_request_cache);
5691         kmem_cache_destroy(rbd_obj_request_cache);
5692         rbd_obj_request_cache = NULL;
5693
5694         rbd_assert(rbd_img_request_cache);
5695         kmem_cache_destroy(rbd_img_request_cache);
5696         rbd_img_request_cache = NULL;
5697 }
5698
5699 static int __init rbd_init(void)
5700 {
5701         int rc;
5702
5703         if (!libceph_compatible(NULL)) {
5704                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5705                 return -EINVAL;
5706         }
5707
5708         rc = rbd_slab_init();
5709         if (rc)
5710                 return rc;
5711
5712         /*
5713          * The number of active work items is limited by the number of
5714          * rbd devices, so leave @max_active at default.
5715          */
5716         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5717         if (!rbd_wq) {
5718                 rc = -ENOMEM;
5719                 goto err_out_slab;
5720         }
5721
5722         if (single_major) {
5723                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5724                 if (rbd_major < 0) {
5725                         rc = rbd_major;
5726                         goto err_out_wq;
5727                 }
5728         }
5729
5730         rc = rbd_sysfs_init();
5731         if (rc)
5732                 goto err_out_blkdev;
5733
5734         if (single_major)
5735                 pr_info("loaded (major %d)\n", rbd_major);
5736         else
5737                 pr_info("loaded\n");
5738
5739         return 0;
5740
5741 err_out_blkdev:
5742         if (single_major)
5743                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5744 err_out_wq:
5745         destroy_workqueue(rbd_wq);
5746 err_out_slab:
5747         rbd_slab_exit();
5748         return rc;
5749 }
5750
5751 static void __exit rbd_exit(void)
5752 {
5753         ida_destroy(&rbd_dev_id_ida);
5754         rbd_sysfs_cleanup();
5755         if (single_major)
5756                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5757         destroy_workqueue(rbd_wq);
5758         rbd_slab_exit();
5759 }
5760
5761 module_init(rbd_init);
5762 module_exit(rbd_exit);
5763
5764 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5765 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5766 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5767 /* following authorship retained from original osdblk.c */
5768 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5769
5770 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5771 MODULE_LICENSE("GPL");