rbd: always pass ops array to rbd_req_sync_op()
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 i, snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
504                          / sizeof (*ondisk))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513         if (snap_count) {
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 header->snap_names = NULL;
524                 header->snap_sizes = NULL;
525         }
526
527         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
528                                         GFP_KERNEL);
529         if (!header->object_prefix)
530                 goto err_sizes;
531
532         memcpy(header->object_prefix, ondisk->block_name,
533                sizeof(ondisk->block_name));
534         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
535
536         header->image_size = le64_to_cpu(ondisk->image_size);
537         header->obj_order = ondisk->options.order;
538         header->crypt_type = ondisk->options.crypt_type;
539         header->comp_type = ondisk->options.comp_type;
540
541         atomic_set(&header->snapc->nref, 1);
542         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
543         header->snapc->num_snaps = snap_count;
544         header->total_snaps = snap_count;
545
546         if (snap_count && allocated_snaps == snap_count) {
547                 for (i = 0; i < snap_count; i++) {
548                         header->snapc->snaps[i] =
549                                 le64_to_cpu(ondisk->snaps[i].id);
550                         header->snap_sizes[i] =
551                                 le64_to_cpu(ondisk->snaps[i].image_size);
552                 }
553
554                 /* copy snapshot names */
555                 memcpy(header->snap_names, &ondisk->snaps[i],
556                         header->snap_names_len);
557         }
558
559         return 0;
560
561 err_sizes:
562         kfree(header->snap_sizes);
563 err_names:
564         kfree(header->snap_names);
565 err_snapc:
566         kfree(header->snapc);
567         return -ENOMEM;
568 }
569
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571                         u64 *seq, u64 *size)
572 {
573         int i;
574         char *p = header->snap_names;
575
576         for (i = 0; i < header->total_snaps; i++) {
577                 if (!strcmp(snap_name, p)) {
578
579                         /* Found it.  Pass back its id and/or size */
580
581                         if (seq)
582                                 *seq = header->snapc->snaps[i];
583                         if (size)
584                                 *size = header->snap_sizes[i];
585                         return i;
586                 }
587                 p += strlen(p) + 1;     /* Skip ahead to the next name */
588         }
589         return -ENOENT;
590 }
591
592 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
593 {
594         int ret;
595
596         down_write(&rbd_dev->header_rwsem);
597
598         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
599                     sizeof (RBD_SNAP_HEAD_NAME))) {
600                 rbd_dev->snap_id = CEPH_NOSNAP;
601                 rbd_dev->snap_exists = false;
602                 rbd_dev->read_only = 0;
603                 if (size)
604                         *size = rbd_dev->header.image_size;
605         } else {
606                 u64 snap_id = 0;
607
608                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
609                                         &snap_id, size);
610                 if (ret < 0)
611                         goto done;
612                 rbd_dev->snap_id = snap_id;
613                 rbd_dev->snap_exists = true;
614                 rbd_dev->read_only = 1;
615         }
616
617         ret = 0;
618 done:
619         up_write(&rbd_dev->header_rwsem);
620         return ret;
621 }
622
623 static void rbd_header_free(struct rbd_image_header *header)
624 {
625         kfree(header->object_prefix);
626         kfree(header->snap_sizes);
627         kfree(header->snap_names);
628         ceph_put_snap_context(header->snapc);
629 }
630
631 /*
632  * get the actual striped segment name, offset and length
633  */
634 static u64 rbd_get_segment(struct rbd_image_header *header,
635                            const char *object_prefix,
636                            u64 ofs, u64 len,
637                            char *seg_name, u64 *segofs)
638 {
639         u64 seg = ofs >> header->obj_order;
640
641         if (seg_name)
642                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
643                          "%s.%012llx", object_prefix, seg);
644
645         ofs = ofs & ((1 << header->obj_order) - 1);
646         len = min_t(u64, len, (1 << header->obj_order) - ofs);
647
648         if (segofs)
649                 *segofs = ofs;
650
651         return len;
652 }
653
654 static int rbd_get_num_segments(struct rbd_image_header *header,
655                                 u64 ofs, u64 len)
656 {
657         u64 start_seg = ofs >> header->obj_order;
658         u64 end_seg = (ofs + len - 1) >> header->obj_order;
659         return end_seg - start_seg + 1;
660 }
661
662 /*
663  * returns the size of an object in the image
664  */
665 static u64 rbd_obj_bytes(struct rbd_image_header *header)
666 {
667         return 1 << header->obj_order;
668 }
669
670 /*
671  * bio helpers
672  */
673
674 static void bio_chain_put(struct bio *chain)
675 {
676         struct bio *tmp;
677
678         while (chain) {
679                 tmp = chain;
680                 chain = chain->bi_next;
681                 bio_put(tmp);
682         }
683 }
684
685 /*
686  * zeros a bio chain, starting at specific offset
687  */
688 static void zero_bio_chain(struct bio *chain, int start_ofs)
689 {
690         struct bio_vec *bv;
691         unsigned long flags;
692         void *buf;
693         int i;
694         int pos = 0;
695
696         while (chain) {
697                 bio_for_each_segment(bv, chain, i) {
698                         if (pos + bv->bv_len > start_ofs) {
699                                 int remainder = max(start_ofs - pos, 0);
700                                 buf = bvec_kmap_irq(bv, &flags);
701                                 memset(buf + remainder, 0,
702                                        bv->bv_len - remainder);
703                                 bvec_kunmap_irq(buf, &flags);
704                         }
705                         pos += bv->bv_len;
706                 }
707
708                 chain = chain->bi_next;
709         }
710 }
711
712 /*
713  * bio_chain_clone - clone a chain of bios up to a certain length.
714  * might return a bio_pair that will need to be released.
715  */
716 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717                                    struct bio_pair **bp,
718                                    int len, gfp_t gfpmask)
719 {
720         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
721         int total = 0;
722
723         if (*bp) {
724                 bio_pair_release(*bp);
725                 *bp = NULL;
726         }
727
728         while (old_chain && (total < len)) {
729                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
730                 if (!tmp)
731                         goto err_out;
732
733                 if (total + old_chain->bi_size > len) {
734                         struct bio_pair *bp;
735
736                         /*
737                          * this split can only happen with a single paged bio,
738                          * split_bio will BUG_ON if this is not the case
739                          */
740                         dout("bio_chain_clone split! total=%d remaining=%d"
741                              "bi_size=%u\n",
742                              total, len - total, old_chain->bi_size);
743
744                         /* split the bio. We'll release it either in the next
745                            call, or it will have to be released outside */
746                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
747                         if (!bp)
748                                 goto err_out;
749
750                         __bio_clone(tmp, &bp->bio1);
751
752                         *next = &bp->bio2;
753                 } else {
754                         __bio_clone(tmp, old_chain);
755                         *next = old_chain->bi_next;
756                 }
757
758                 tmp->bi_bdev = NULL;
759                 gfpmask &= ~__GFP_WAIT;
760                 tmp->bi_next = NULL;
761
762                 if (!new_chain) {
763                         new_chain = tail = tmp;
764                 } else {
765                         tail->bi_next = tmp;
766                         tail = tmp;
767                 }
768                 old_chain = old_chain->bi_next;
769
770                 total += tmp->bi_size;
771         }
772
773         BUG_ON(total < len);
774
775         if (tail)
776                 tail->bi_next = NULL;
777
778         *old = old_chain;
779
780         return new_chain;
781
782 err_out:
783         dout("bio_chain_clone with err\n");
784         bio_chain_put(new_chain);
785         return NULL;
786 }
787
788 /*
789  * helpers for osd request op vectors.
790  */
791 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
792                                         int opcode, u32 payload_len)
793 {
794         struct ceph_osd_req_op *ops;
795
796         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
797         if (!ops)
798                 return NULL;
799
800         ops[0].op = opcode;
801
802         /*
803          * op extent offset and length will be set later on
804          * in calc_raw_layout()
805          */
806         ops[0].payload_len = payload_len;
807
808         return ops;
809 }
810
811 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 {
813         kfree(ops);
814 }
815
816 static void rbd_coll_end_req_index(struct request *rq,
817                                    struct rbd_req_coll *coll,
818                                    int index,
819                                    int ret, u64 len)
820 {
821         struct request_queue *q;
822         int min, max, i;
823
824         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
825              coll, index, ret, (unsigned long long) len);
826
827         if (!rq)
828                 return;
829
830         if (!coll) {
831                 blk_end_request(rq, ret, len);
832                 return;
833         }
834
835         q = rq->q;
836
837         spin_lock_irq(q->queue_lock);
838         coll->status[index].done = 1;
839         coll->status[index].rc = ret;
840         coll->status[index].bytes = len;
841         max = min = coll->num_done;
842         while (max < coll->total && coll->status[max].done)
843                 max++;
844
845         for (i = min; i<max; i++) {
846                 __blk_end_request(rq, coll->status[i].rc,
847                                   coll->status[i].bytes);
848                 coll->num_done++;
849                 kref_put(&coll->kref, rbd_coll_release);
850         }
851         spin_unlock_irq(q->queue_lock);
852 }
853
854 static void rbd_coll_end_req(struct rbd_request *req,
855                              int ret, u64 len)
856 {
857         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858 }
859
860 /*
861  * Send ceph osd request
862  */
863 static int rbd_do_request(struct request *rq,
864                           struct rbd_device *rbd_dev,
865                           struct ceph_snap_context *snapc,
866                           u64 snapid,
867                           const char *object_name, u64 ofs, u64 len,
868                           struct bio *bio,
869                           struct page **pages,
870                           int num_pages,
871                           int flags,
872                           struct ceph_osd_req_op *ops,
873                           struct rbd_req_coll *coll,
874                           int coll_index,
875                           void (*rbd_cb)(struct ceph_osd_request *req,
876                                          struct ceph_msg *msg),
877                           struct ceph_osd_request **linger_req,
878                           u64 *ver)
879 {
880         struct ceph_osd_request *req;
881         struct ceph_file_layout *layout;
882         int ret;
883         u64 bno;
884         struct timespec mtime = CURRENT_TIME;
885         struct rbd_request *req_data;
886         struct ceph_osd_request_head *reqhead;
887         struct ceph_osd_client *osdc;
888
889         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890         if (!req_data) {
891                 if (coll)
892                         rbd_coll_end_req_index(rq, coll, coll_index,
893                                                -ENOMEM, len);
894                 return -ENOMEM;
895         }
896
897         if (coll) {
898                 req_data->coll = coll;
899                 req_data->coll_index = coll_index;
900         }
901
902         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
903                 (unsigned long long) ofs, (unsigned long long) len);
904
905         osdc = &rbd_dev->rbd_client->client->osdc;
906         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907                                         false, GFP_NOIO, pages, bio);
908         if (!req) {
909                 ret = -ENOMEM;
910                 goto done_pages;
911         }
912
913         req->r_callback = rbd_cb;
914
915         req_data->rq = rq;
916         req_data->bio = bio;
917         req_data->pages = pages;
918         req_data->len = len;
919
920         req->r_priv = req_data;
921
922         reqhead = req->r_request->front.iov_base;
923         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
925         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
926         req->r_oid_len = strlen(req->r_oid);
927
928         layout = &req->r_file_layout;
929         memset(layout, 0, sizeof(*layout));
930         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_stripe_count = cpu_to_le32(1);
932         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
934         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935                                 req, ops);
936
937         ceph_osdc_build_request(req, ofs, &len,
938                                 ops,
939                                 snapc,
940                                 &mtime,
941                                 req->r_oid, req->r_oid_len);
942
943         if (linger_req) {
944                 ceph_osdc_set_request_linger(osdc, req);
945                 *linger_req = req;
946         }
947
948         ret = ceph_osdc_start_request(osdc, req, false);
949         if (ret < 0)
950                 goto done_err;
951
952         if (!rbd_cb) {
953                 ret = ceph_osdc_wait_request(osdc, req);
954                 if (ver)
955                         *ver = le64_to_cpu(req->r_reassert_version.version);
956                 dout("reassert_ver=%llu\n",
957                         (unsigned long long)
958                                 le64_to_cpu(req->r_reassert_version.version));
959                 ceph_osdc_put_request(req);
960         }
961         return ret;
962
963 done_err:
964         bio_chain_put(req_data->bio);
965         ceph_osdc_put_request(req);
966 done_pages:
967         rbd_coll_end_req(req_data, ret, len);
968         kfree(req_data);
969         return ret;
970 }
971
972 /*
973  * Ceph osd op callback
974  */
975 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
976 {
977         struct rbd_request *req_data = req->r_priv;
978         struct ceph_osd_reply_head *replyhead;
979         struct ceph_osd_op *op;
980         __s32 rc;
981         u64 bytes;
982         int read_op;
983
984         /* parse reply */
985         replyhead = msg->front.iov_base;
986         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
987         op = (void *)(replyhead + 1);
988         rc = le32_to_cpu(replyhead->result);
989         bytes = le64_to_cpu(op->extent.length);
990         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
991
992         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
993                 (unsigned long long) bytes, read_op, (int) rc);
994
995         if (rc == -ENOENT && read_op) {
996                 zero_bio_chain(req_data->bio, 0);
997                 rc = 0;
998         } else if (rc == 0 && read_op && bytes < req_data->len) {
999                 zero_bio_chain(req_data->bio, bytes);
1000                 bytes = req_data->len;
1001         }
1002
1003         rbd_coll_end_req(req_data, rc, bytes);
1004
1005         if (req_data->bio)
1006                 bio_chain_put(req_data->bio);
1007
1008         ceph_osdc_put_request(req);
1009         kfree(req_data);
1010 }
1011
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013 {
1014         ceph_osdc_put_request(req);
1015 }
1016
1017 /*
1018  * Do a synchronous ceph osd operation
1019  */
1020 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1021                            struct ceph_snap_context *snapc,
1022                            u64 snapid,
1023                            int flags,
1024                            struct ceph_osd_req_op *ops,
1025                            const char *object_name,
1026                            u64 ofs, u64 len,
1027                            char *buf,
1028                            struct ceph_osd_request **linger_req,
1029                            u64 *ver)
1030 {
1031         int ret;
1032         struct page **pages;
1033         int num_pages;
1034
1035         BUG_ON(ops == NULL);
1036
1037         num_pages = calc_pages_for(ofs , len);
1038         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1039         if (IS_ERR(pages))
1040                 return PTR_ERR(pages);
1041
1042         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1043                           object_name, ofs, len, NULL,
1044                           pages, num_pages,
1045                           flags,
1046                           ops,
1047                           NULL, 0,
1048                           NULL,
1049                           linger_req, ver);
1050         if (ret < 0)
1051                 goto done;
1052
1053         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1054                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1055
1056 done:
1057         ceph_release_page_vector(pages, num_pages);
1058         return ret;
1059 }
1060
1061 /*
1062  * Do an asynchronous ceph osd operation
1063  */
1064 static int rbd_do_op(struct request *rq,
1065                      struct rbd_device *rbd_dev,
1066                      struct ceph_snap_context *snapc,
1067                      u64 snapid,
1068                      int opcode, int flags,
1069                      u64 ofs, u64 len,
1070                      struct bio *bio,
1071                      struct rbd_req_coll *coll,
1072                      int coll_index)
1073 {
1074         char *seg_name;
1075         u64 seg_ofs;
1076         u64 seg_len;
1077         int ret;
1078         struct ceph_osd_req_op *ops;
1079         u32 payload_len;
1080
1081         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1082         if (!seg_name)
1083                 return -ENOMEM;
1084
1085         seg_len = rbd_get_segment(&rbd_dev->header,
1086                                   rbd_dev->header.object_prefix,
1087                                   ofs, len,
1088                                   seg_name, &seg_ofs);
1089
1090         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1091
1092         ret = -ENOMEM;
1093         ops = rbd_create_rw_ops(1, opcode, payload_len);
1094         if (!ops)
1095                 goto done;
1096
1097         /* we've taken care of segment sizes earlier when we
1098            cloned the bios. We should never have a segment
1099            truncated at this point */
1100         BUG_ON(seg_len < len);
1101
1102         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1103                              seg_name, seg_ofs, seg_len,
1104                              bio,
1105                              NULL, 0,
1106                              flags,
1107                              ops,
1108                              coll, coll_index,
1109                              rbd_req_cb, 0, NULL);
1110
1111         rbd_destroy_ops(ops);
1112 done:
1113         kfree(seg_name);
1114         return ret;
1115 }
1116
1117 /*
1118  * Request async osd write
1119  */
1120 static int rbd_req_write(struct request *rq,
1121                          struct rbd_device *rbd_dev,
1122                          struct ceph_snap_context *snapc,
1123                          u64 ofs, u64 len,
1124                          struct bio *bio,
1125                          struct rbd_req_coll *coll,
1126                          int coll_index)
1127 {
1128         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1129                          CEPH_OSD_OP_WRITE,
1130                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1131                          ofs, len, bio, coll, coll_index);
1132 }
1133
1134 /*
1135  * Request async osd read
1136  */
1137 static int rbd_req_read(struct request *rq,
1138                          struct rbd_device *rbd_dev,
1139                          u64 snapid,
1140                          u64 ofs, u64 len,
1141                          struct bio *bio,
1142                          struct rbd_req_coll *coll,
1143                          int coll_index)
1144 {
1145         return rbd_do_op(rq, rbd_dev, NULL,
1146                          snapid,
1147                          CEPH_OSD_OP_READ,
1148                          CEPH_OSD_FLAG_READ,
1149                          ofs, len, bio, coll, coll_index);
1150 }
1151
1152 /*
1153  * Request sync osd read
1154  */
1155 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1156                           u64 snapid,
1157                           const char *object_name,
1158                           u64 ofs, u64 len,
1159                           char *buf,
1160                           u64 *ver)
1161 {
1162         struct ceph_osd_req_op *ops;
1163         int ret;
1164
1165         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1166         if (!ops)
1167                 return -ENOMEM;
1168
1169         ret = rbd_req_sync_op(rbd_dev, NULL,
1170                                snapid,
1171                                CEPH_OSD_FLAG_READ,
1172                                ops, object_name, ofs, len, buf, NULL, ver);
1173         rbd_destroy_ops(ops);
1174
1175         return ret;
1176 }
1177
1178 /*
1179  * Request sync osd watch
1180  */
1181 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1182                                    u64 ver,
1183                                    u64 notify_id)
1184 {
1185         struct ceph_osd_req_op *ops;
1186         int ret;
1187
1188         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1189         if (!ops)
1190                 return -ENOMEM;
1191
1192         ops[0].watch.ver = cpu_to_le64(ver);
1193         ops[0].watch.cookie = notify_id;
1194         ops[0].watch.flag = 0;
1195
1196         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1197                           rbd_dev->header_name, 0, 0, NULL,
1198                           NULL, 0,
1199                           CEPH_OSD_FLAG_READ,
1200                           ops,
1201                           NULL, 0,
1202                           rbd_simple_req_cb, 0, NULL);
1203
1204         rbd_destroy_ops(ops);
1205         return ret;
1206 }
1207
1208 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1209 {
1210         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1211         u64 hver;
1212         int rc;
1213
1214         if (!rbd_dev)
1215                 return;
1216
1217         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1218                 rbd_dev->header_name, (unsigned long long) notify_id,
1219                 (unsigned int) opcode);
1220         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1221         rc = __rbd_refresh_header(rbd_dev);
1222         hver = rbd_dev->header.obj_version;
1223         mutex_unlock(&ctl_mutex);
1224         if (rc)
1225                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1226                            " update snaps: %d\n", rbd_dev->major, rc);
1227
1228         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1229 }
1230
1231 /*
1232  * Request sync osd watch
1233  */
1234 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1235 {
1236         struct ceph_osd_req_op *ops;
1237         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1238         int ret;
1239
1240         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1241         if (!ops)
1242                 return -ENOMEM;
1243
1244         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1245                                      (void *)rbd_dev, &rbd_dev->watch_event);
1246         if (ret < 0)
1247                 goto fail;
1248
1249         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1250         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1251         ops[0].watch.flag = 1;
1252
1253         ret = rbd_req_sync_op(rbd_dev, NULL,
1254                               CEPH_NOSNAP,
1255                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1256                               ops,
1257                               rbd_dev->header_name,
1258                               0, 0, NULL,
1259                               &rbd_dev->watch_request, NULL);
1260
1261         if (ret < 0)
1262                 goto fail_event;
1263
1264         rbd_destroy_ops(ops);
1265         return 0;
1266
1267 fail_event:
1268         ceph_osdc_cancel_event(rbd_dev->watch_event);
1269         rbd_dev->watch_event = NULL;
1270 fail:
1271         rbd_destroy_ops(ops);
1272         return ret;
1273 }
1274
1275 /*
1276  * Request sync osd unwatch
1277  */
1278 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1279 {
1280         struct ceph_osd_req_op *ops;
1281         int ret;
1282
1283         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1284         if (!ops)
1285                 return -ENOMEM;
1286
1287         ops[0].watch.ver = 0;
1288         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1289         ops[0].watch.flag = 0;
1290
1291         ret = rbd_req_sync_op(rbd_dev, NULL,
1292                               CEPH_NOSNAP,
1293                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1294                               ops,
1295                               rbd_dev->header_name,
1296                               0, 0, NULL, NULL, NULL);
1297
1298
1299         rbd_destroy_ops(ops);
1300         ceph_osdc_cancel_event(rbd_dev->watch_event);
1301         rbd_dev->watch_event = NULL;
1302         return ret;
1303 }
1304
1305 struct rbd_notify_info {
1306         struct rbd_device *rbd_dev;
1307 };
1308
1309 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1310 {
1311         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1312         if (!rbd_dev)
1313                 return;
1314
1315         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1316                         rbd_dev->header_name, (unsigned long long) notify_id,
1317                         (unsigned int) opcode);
1318 }
1319
1320 /*
1321  * Request sync osd notify
1322  */
1323 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1324 {
1325         struct ceph_osd_req_op *ops;
1326         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1327         struct ceph_osd_event *event;
1328         struct rbd_notify_info info;
1329         int payload_len = sizeof(u32) + sizeof(u32);
1330         int ret;
1331
1332         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1333         if (!ops)
1334                 return -ENOMEM;
1335
1336         info.rbd_dev = rbd_dev;
1337
1338         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1339                                      (void *)&info, &event);
1340         if (ret < 0)
1341                 goto fail;
1342
1343         ops[0].watch.ver = 1;
1344         ops[0].watch.flag = 1;
1345         ops[0].watch.cookie = event->cookie;
1346         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1347         ops[0].watch.timeout = 12;
1348
1349         ret = rbd_req_sync_op(rbd_dev, NULL,
1350                                CEPH_NOSNAP,
1351                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1352                                ops,
1353                                rbd_dev->header_name,
1354                                0, 0, NULL, NULL, NULL);
1355         if (ret < 0)
1356                 goto fail_event;
1357
1358         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1359         dout("ceph_osdc_wait_event returned %d\n", ret);
1360         rbd_destroy_ops(ops);
1361         return 0;
1362
1363 fail_event:
1364         ceph_osdc_cancel_event(event);
1365 fail:
1366         rbd_destroy_ops(ops);
1367         return ret;
1368 }
1369
1370 /*
1371  * Request sync osd read
1372  */
1373 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1374                              const char *object_name,
1375                              const char *class_name,
1376                              const char *method_name,
1377                              const char *data,
1378                              int len,
1379                              u64 *ver)
1380 {
1381         struct ceph_osd_req_op *ops;
1382         int class_name_len = strlen(class_name);
1383         int method_name_len = strlen(method_name);
1384         int ret;
1385
1386         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1387                                     class_name_len + method_name_len + len);
1388         if (!ops)
1389                 return -ENOMEM;
1390
1391         ops[0].cls.class_name = class_name;
1392         ops[0].cls.class_len = (__u8) class_name_len;
1393         ops[0].cls.method_name = method_name;
1394         ops[0].cls.method_len = (__u8) method_name_len;
1395         ops[0].cls.argc = 0;
1396         ops[0].cls.indata = data;
1397         ops[0].cls.indata_len = len;
1398
1399         ret = rbd_req_sync_op(rbd_dev, NULL,
1400                                CEPH_NOSNAP,
1401                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1402                                ops,
1403                                object_name, 0, 0, NULL, NULL, ver);
1404
1405         rbd_destroy_ops(ops);
1406
1407         dout("cls_exec returned %d\n", ret);
1408         return ret;
1409 }
1410
1411 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1412 {
1413         struct rbd_req_coll *coll =
1414                         kzalloc(sizeof(struct rbd_req_coll) +
1415                                 sizeof(struct rbd_req_status) * num_reqs,
1416                                 GFP_ATOMIC);
1417
1418         if (!coll)
1419                 return NULL;
1420         coll->total = num_reqs;
1421         kref_init(&coll->kref);
1422         return coll;
1423 }
1424
1425 /*
1426  * block device queue callback
1427  */
1428 static void rbd_rq_fn(struct request_queue *q)
1429 {
1430         struct rbd_device *rbd_dev = q->queuedata;
1431         struct request *rq;
1432         struct bio_pair *bp = NULL;
1433
1434         while ((rq = blk_fetch_request(q))) {
1435                 struct bio *bio;
1436                 struct bio *rq_bio, *next_bio = NULL;
1437                 bool do_write;
1438                 unsigned int size;
1439                 u64 op_size = 0;
1440                 u64 ofs;
1441                 int num_segs, cur_seg = 0;
1442                 struct rbd_req_coll *coll;
1443                 struct ceph_snap_context *snapc;
1444
1445                 /* peek at request from block layer */
1446                 if (!rq)
1447                         break;
1448
1449                 dout("fetched request\n");
1450
1451                 /* filter out block requests we don't understand */
1452                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1453                         __blk_end_request_all(rq, 0);
1454                         continue;
1455                 }
1456
1457                 /* deduce our operation (read, write) */
1458                 do_write = (rq_data_dir(rq) == WRITE);
1459
1460                 size = blk_rq_bytes(rq);
1461                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462                 rq_bio = rq->bio;
1463                 if (do_write && rbd_dev->read_only) {
1464                         __blk_end_request_all(rq, -EROFS);
1465                         continue;
1466                 }
1467
1468                 spin_unlock_irq(q->queue_lock);
1469
1470                 down_read(&rbd_dev->header_rwsem);
1471
1472                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1473                         up_read(&rbd_dev->header_rwsem);
1474                         dout("request for non-existent snapshot");
1475                         spin_lock_irq(q->queue_lock);
1476                         __blk_end_request_all(rq, -ENXIO);
1477                         continue;
1478                 }
1479
1480                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1481
1482                 up_read(&rbd_dev->header_rwsem);
1483
1484                 dout("%s 0x%x bytes at 0x%llx\n",
1485                      do_write ? "write" : "read",
1486                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1487
1488                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1489                 coll = rbd_alloc_coll(num_segs);
1490                 if (!coll) {
1491                         spin_lock_irq(q->queue_lock);
1492                         __blk_end_request_all(rq, -ENOMEM);
1493                         ceph_put_snap_context(snapc);
1494                         continue;
1495                 }
1496
1497                 do {
1498                         /* a bio clone to be passed down to OSD req */
1499                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1500                         op_size = rbd_get_segment(&rbd_dev->header,
1501                                                   rbd_dev->header.object_prefix,
1502                                                   ofs, size,
1503                                                   NULL, NULL);
1504                         kref_get(&coll->kref);
1505                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1506                                               op_size, GFP_ATOMIC);
1507                         if (!bio) {
1508                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1509                                                        -ENOMEM, op_size);
1510                                 goto next_seg;
1511                         }
1512
1513
1514                         /* init OSD command: write or read */
1515                         if (do_write)
1516                                 rbd_req_write(rq, rbd_dev,
1517                                               snapc,
1518                                               ofs,
1519                                               op_size, bio,
1520                                               coll, cur_seg);
1521                         else
1522                                 rbd_req_read(rq, rbd_dev,
1523                                              rbd_dev->snap_id,
1524                                              ofs,
1525                                              op_size, bio,
1526                                              coll, cur_seg);
1527
1528 next_seg:
1529                         size -= op_size;
1530                         ofs += op_size;
1531
1532                         cur_seg++;
1533                         rq_bio = next_bio;
1534                 } while (size > 0);
1535                 kref_put(&coll->kref, rbd_coll_release);
1536
1537                 if (bp)
1538                         bio_pair_release(bp);
1539                 spin_lock_irq(q->queue_lock);
1540
1541                 ceph_put_snap_context(snapc);
1542         }
1543 }
1544
1545 /*
1546  * a queue callback. Makes sure that we don't create a bio that spans across
1547  * multiple osd objects. One exception would be with a single page bios,
1548  * which we handle later at bio_chain_clone
1549  */
1550 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1551                           struct bio_vec *bvec)
1552 {
1553         struct rbd_device *rbd_dev = q->queuedata;
1554         unsigned int chunk_sectors;
1555         sector_t sector;
1556         unsigned int bio_sectors;
1557         int max;
1558
1559         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1560         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1561         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1562
1563         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1564                                  + bio_sectors)) << SECTOR_SHIFT;
1565         if (max < 0)
1566                 max = 0; /* bio_add cannot handle a negative return */
1567         if (max <= bvec->bv_len && bio_sectors == 0)
1568                 return bvec->bv_len;
1569         return max;
1570 }
1571
1572 static void rbd_free_disk(struct rbd_device *rbd_dev)
1573 {
1574         struct gendisk *disk = rbd_dev->disk;
1575
1576         if (!disk)
1577                 return;
1578
1579         rbd_header_free(&rbd_dev->header);
1580
1581         if (disk->flags & GENHD_FL_UP)
1582                 del_gendisk(disk);
1583         if (disk->queue)
1584                 blk_cleanup_queue(disk->queue);
1585         put_disk(disk);
1586 }
1587
1588 /*
1589  * reload the ondisk the header 
1590  */
1591 static int rbd_read_header(struct rbd_device *rbd_dev,
1592                            struct rbd_image_header *header)
1593 {
1594         ssize_t rc;
1595         struct rbd_image_header_ondisk *dh;
1596         u32 snap_count = 0;
1597         u64 ver;
1598         size_t len;
1599
1600         /*
1601          * First reads the fixed-size header to determine the number
1602          * of snapshots, then re-reads it, along with all snapshot
1603          * records as well as their stored names.
1604          */
1605         len = sizeof (*dh);
1606         while (1) {
1607                 dh = kmalloc(len, GFP_KERNEL);
1608                 if (!dh)
1609                         return -ENOMEM;
1610
1611                 rc = rbd_req_sync_read(rbd_dev,
1612                                        CEPH_NOSNAP,
1613                                        rbd_dev->header_name,
1614                                        0, len,
1615                                        (char *)dh, &ver);
1616                 if (rc < 0)
1617                         goto out_dh;
1618
1619                 rc = rbd_header_from_disk(header, dh, snap_count);
1620                 if (rc < 0) {
1621                         if (rc == -ENXIO)
1622                                 pr_warning("unrecognized header format"
1623                                            " for image %s\n",
1624                                            rbd_dev->image_name);
1625                         goto out_dh;
1626                 }
1627
1628                 if (snap_count == header->total_snaps)
1629                         break;
1630
1631                 snap_count = header->total_snaps;
1632                 len = sizeof (*dh) +
1633                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1634                         header->snap_names_len;
1635
1636                 rbd_header_free(header);
1637                 kfree(dh);
1638         }
1639         header->obj_version = ver;
1640
1641 out_dh:
1642         kfree(dh);
1643         return rc;
1644 }
1645
1646 /*
1647  * create a snapshot
1648  */
1649 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1650                                const char *snap_name,
1651                                gfp_t gfp_flags)
1652 {
1653         int name_len = strlen(snap_name);
1654         u64 new_snapid;
1655         int ret;
1656         void *data, *p, *e;
1657         struct ceph_mon_client *monc;
1658
1659         /* we should create a snapshot only if we're pointing at the head */
1660         if (rbd_dev->snap_id != CEPH_NOSNAP)
1661                 return -EINVAL;
1662
1663         monc = &rbd_dev->rbd_client->client->monc;
1664         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1665         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1666         if (ret < 0)
1667                 return ret;
1668
1669         data = kmalloc(name_len + 16, gfp_flags);
1670         if (!data)
1671                 return -ENOMEM;
1672
1673         p = data;
1674         e = data + name_len + 16;
1675
1676         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1677         ceph_encode_64_safe(&p, e, new_snapid, bad);
1678
1679         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1680                                 "rbd", "snap_add",
1681                                 data, p - data, NULL);
1682
1683         kfree(data);
1684
1685         return ret < 0 ? ret : 0;
1686 bad:
1687         return -ERANGE;
1688 }
1689
1690 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1691 {
1692         struct rbd_snap *snap;
1693         struct rbd_snap *next;
1694
1695         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1696                 __rbd_remove_snap_dev(snap);
1697 }
1698
1699 /*
1700  * only read the first part of the ondisk header, without the snaps info
1701  */
1702 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1703 {
1704         int ret;
1705         struct rbd_image_header h;
1706
1707         ret = rbd_read_header(rbd_dev, &h);
1708         if (ret < 0)
1709                 return ret;
1710
1711         down_write(&rbd_dev->header_rwsem);
1712
1713         /* resized? */
1714         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1715                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1716
1717                 dout("setting size to %llu sectors", (unsigned long long) size);
1718                 set_capacity(rbd_dev->disk, size);
1719         }
1720
1721         /* rbd_dev->header.object_prefix shouldn't change */
1722         kfree(rbd_dev->header.snap_sizes);
1723         kfree(rbd_dev->header.snap_names);
1724         /* osd requests may still refer to snapc */
1725         ceph_put_snap_context(rbd_dev->header.snapc);
1726
1727         rbd_dev->header.obj_version = h.obj_version;
1728         rbd_dev->header.image_size = h.image_size;
1729         rbd_dev->header.total_snaps = h.total_snaps;
1730         rbd_dev->header.snapc = h.snapc;
1731         rbd_dev->header.snap_names = h.snap_names;
1732         rbd_dev->header.snap_names_len = h.snap_names_len;
1733         rbd_dev->header.snap_sizes = h.snap_sizes;
1734         /* Free the extra copy of the object prefix */
1735         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1736         kfree(h.object_prefix);
1737
1738         ret = __rbd_init_snaps_header(rbd_dev);
1739
1740         up_write(&rbd_dev->header_rwsem);
1741
1742         return ret;
1743 }
1744
1745 static int rbd_init_disk(struct rbd_device *rbd_dev)
1746 {
1747         struct gendisk *disk;
1748         struct request_queue *q;
1749         int rc;
1750         u64 segment_size;
1751         u64 total_size = 0;
1752
1753         /* contact OSD, request size info about the object being mapped */
1754         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1755         if (rc)
1756                 return rc;
1757
1758         /* no need to lock here, as rbd_dev is not registered yet */
1759         rc = __rbd_init_snaps_header(rbd_dev);
1760         if (rc)
1761                 return rc;
1762
1763         rc = rbd_header_set_snap(rbd_dev, &total_size);
1764         if (rc)
1765                 return rc;
1766
1767         /* create gendisk info */
1768         rc = -ENOMEM;
1769         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1770         if (!disk)
1771                 goto out;
1772
1773         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1774                  rbd_dev->dev_id);
1775         disk->major = rbd_dev->major;
1776         disk->first_minor = 0;
1777         disk->fops = &rbd_bd_ops;
1778         disk->private_data = rbd_dev;
1779
1780         /* init rq */
1781         rc = -ENOMEM;
1782         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1783         if (!q)
1784                 goto out_disk;
1785
1786         /* We use the default size, but let's be explicit about it. */
1787         blk_queue_physical_block_size(q, SECTOR_SIZE);
1788
1789         /* set io sizes to object size */
1790         segment_size = rbd_obj_bytes(&rbd_dev->header);
1791         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1792         blk_queue_max_segment_size(q, segment_size);
1793         blk_queue_io_min(q, segment_size);
1794         blk_queue_io_opt(q, segment_size);
1795
1796         blk_queue_merge_bvec(q, rbd_merge_bvec);
1797         disk->queue = q;
1798
1799         q->queuedata = rbd_dev;
1800
1801         rbd_dev->disk = disk;
1802         rbd_dev->q = q;
1803
1804         /* finally, announce the disk to the world */
1805         set_capacity(disk, total_size / SECTOR_SIZE);
1806         add_disk(disk);
1807
1808         pr_info("%s: added with size 0x%llx\n",
1809                 disk->disk_name, (unsigned long long)total_size);
1810         return 0;
1811
1812 out_disk:
1813         put_disk(disk);
1814 out:
1815         return rc;
1816 }
1817
1818 /*
1819   sysfs
1820 */
1821
1822 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1823 {
1824         return container_of(dev, struct rbd_device, dev);
1825 }
1826
1827 static ssize_t rbd_size_show(struct device *dev,
1828                              struct device_attribute *attr, char *buf)
1829 {
1830         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1831         sector_t size;
1832
1833         down_read(&rbd_dev->header_rwsem);
1834         size = get_capacity(rbd_dev->disk);
1835         up_read(&rbd_dev->header_rwsem);
1836
1837         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1838 }
1839
1840 static ssize_t rbd_major_show(struct device *dev,
1841                               struct device_attribute *attr, char *buf)
1842 {
1843         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844
1845         return sprintf(buf, "%d\n", rbd_dev->major);
1846 }
1847
1848 static ssize_t rbd_client_id_show(struct device *dev,
1849                                   struct device_attribute *attr, char *buf)
1850 {
1851         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1852
1853         return sprintf(buf, "client%lld\n",
1854                         ceph_client_id(rbd_dev->rbd_client->client));
1855 }
1856
1857 static ssize_t rbd_pool_show(struct device *dev,
1858                              struct device_attribute *attr, char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1863 }
1864
1865 static ssize_t rbd_pool_id_show(struct device *dev,
1866                              struct device_attribute *attr, char *buf)
1867 {
1868         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1871 }
1872
1873 static ssize_t rbd_name_show(struct device *dev,
1874                              struct device_attribute *attr, char *buf)
1875 {
1876         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1877
1878         return sprintf(buf, "%s\n", rbd_dev->image_name);
1879 }
1880
1881 static ssize_t rbd_snap_show(struct device *dev,
1882                              struct device_attribute *attr,
1883                              char *buf)
1884 {
1885         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1888 }
1889
1890 static ssize_t rbd_image_refresh(struct device *dev,
1891                                  struct device_attribute *attr,
1892                                  const char *buf,
1893                                  size_t size)
1894 {
1895         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1896         int rc;
1897         int ret = size;
1898
1899         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1900
1901         rc = __rbd_refresh_header(rbd_dev);
1902         if (rc < 0)
1903                 ret = rc;
1904
1905         mutex_unlock(&ctl_mutex);
1906         return ret;
1907 }
1908
1909 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1910 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1911 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1912 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1913 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1914 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1915 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1916 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1917 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1918
1919 static struct attribute *rbd_attrs[] = {
1920         &dev_attr_size.attr,
1921         &dev_attr_major.attr,
1922         &dev_attr_client_id.attr,
1923         &dev_attr_pool.attr,
1924         &dev_attr_pool_id.attr,
1925         &dev_attr_name.attr,
1926         &dev_attr_current_snap.attr,
1927         &dev_attr_refresh.attr,
1928         &dev_attr_create_snap.attr,
1929         NULL
1930 };
1931
1932 static struct attribute_group rbd_attr_group = {
1933         .attrs = rbd_attrs,
1934 };
1935
1936 static const struct attribute_group *rbd_attr_groups[] = {
1937         &rbd_attr_group,
1938         NULL
1939 };
1940
1941 static void rbd_sysfs_dev_release(struct device *dev)
1942 {
1943 }
1944
1945 static struct device_type rbd_device_type = {
1946         .name           = "rbd",
1947         .groups         = rbd_attr_groups,
1948         .release        = rbd_sysfs_dev_release,
1949 };
1950
1951
1952 /*
1953   sysfs - snapshots
1954 */
1955
1956 static ssize_t rbd_snap_size_show(struct device *dev,
1957                                   struct device_attribute *attr,
1958                                   char *buf)
1959 {
1960         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1961
1962         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1963 }
1964
1965 static ssize_t rbd_snap_id_show(struct device *dev,
1966                                 struct device_attribute *attr,
1967                                 char *buf)
1968 {
1969         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1970
1971         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1972 }
1973
1974 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1975 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1976
1977 static struct attribute *rbd_snap_attrs[] = {
1978         &dev_attr_snap_size.attr,
1979         &dev_attr_snap_id.attr,
1980         NULL,
1981 };
1982
1983 static struct attribute_group rbd_snap_attr_group = {
1984         .attrs = rbd_snap_attrs,
1985 };
1986
1987 static void rbd_snap_dev_release(struct device *dev)
1988 {
1989         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1990         kfree(snap->name);
1991         kfree(snap);
1992 }
1993
1994 static const struct attribute_group *rbd_snap_attr_groups[] = {
1995         &rbd_snap_attr_group,
1996         NULL
1997 };
1998
1999 static struct device_type rbd_snap_device_type = {
2000         .groups         = rbd_snap_attr_groups,
2001         .release        = rbd_snap_dev_release,
2002 };
2003
2004 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2005 {
2006         list_del(&snap->node);
2007         device_unregister(&snap->dev);
2008 }
2009
2010 static int rbd_register_snap_dev(struct rbd_snap *snap,
2011                                   struct device *parent)
2012 {
2013         struct device *dev = &snap->dev;
2014         int ret;
2015
2016         dev->type = &rbd_snap_device_type;
2017         dev->parent = parent;
2018         dev->release = rbd_snap_dev_release;
2019         dev_set_name(dev, "snap_%s", snap->name);
2020         ret = device_register(dev);
2021
2022         return ret;
2023 }
2024
2025 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2026                                               int i, const char *name)
2027 {
2028         struct rbd_snap *snap;
2029         int ret;
2030
2031         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2032         if (!snap)
2033                 return ERR_PTR(-ENOMEM);
2034
2035         ret = -ENOMEM;
2036         snap->name = kstrdup(name, GFP_KERNEL);
2037         if (!snap->name)
2038                 goto err;
2039
2040         snap->size = rbd_dev->header.snap_sizes[i];
2041         snap->id = rbd_dev->header.snapc->snaps[i];
2042         if (device_is_registered(&rbd_dev->dev)) {
2043                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2044                 if (ret < 0)
2045                         goto err;
2046         }
2047
2048         return snap;
2049
2050 err:
2051         kfree(snap->name);
2052         kfree(snap);
2053
2054         return ERR_PTR(ret);
2055 }
2056
2057 /*
2058  * search for the previous snap in a null delimited string list
2059  */
2060 const char *rbd_prev_snap_name(const char *name, const char *start)
2061 {
2062         if (name < start + 2)
2063                 return NULL;
2064
2065         name -= 2;
2066         while (*name) {
2067                 if (name == start)
2068                         return start;
2069                 name--;
2070         }
2071         return name + 1;
2072 }
2073
2074 /*
2075  * compare the old list of snapshots that we have to what's in the header
2076  * and update it accordingly. Note that the header holds the snapshots
2077  * in a reverse order (from newest to oldest) and we need to go from
2078  * older to new so that we don't get a duplicate snap name when
2079  * doing the process (e.g., removed snapshot and recreated a new
2080  * one with the same name.
2081  */
2082 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2083 {
2084         const char *name, *first_name;
2085         int i = rbd_dev->header.total_snaps;
2086         struct rbd_snap *snap, *old_snap = NULL;
2087         struct list_head *p, *n;
2088
2089         first_name = rbd_dev->header.snap_names;
2090         name = first_name + rbd_dev->header.snap_names_len;
2091
2092         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2093                 u64 cur_id;
2094
2095                 old_snap = list_entry(p, struct rbd_snap, node);
2096
2097                 if (i)
2098                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2099
2100                 if (!i || old_snap->id < cur_id) {
2101                         /*
2102                          * old_snap->id was skipped, thus was
2103                          * removed.  If this rbd_dev is mapped to
2104                          * the removed snapshot, record that it no
2105                          * longer exists, to prevent further I/O.
2106                          */
2107                         if (rbd_dev->snap_id == old_snap->id)
2108                                 rbd_dev->snap_exists = false;
2109                         __rbd_remove_snap_dev(old_snap);
2110                         continue;
2111                 }
2112                 if (old_snap->id == cur_id) {
2113                         /* we have this snapshot already */
2114                         i--;
2115                         name = rbd_prev_snap_name(name, first_name);
2116                         continue;
2117                 }
2118                 for (; i > 0;
2119                      i--, name = rbd_prev_snap_name(name, first_name)) {
2120                         if (!name) {
2121                                 WARN_ON(1);
2122                                 return -EINVAL;
2123                         }
2124                         cur_id = rbd_dev->header.snapc->snaps[i];
2125                         /* snapshot removal? handle it above */
2126                         if (cur_id >= old_snap->id)
2127                                 break;
2128                         /* a new snapshot */
2129                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2130                         if (IS_ERR(snap))
2131                                 return PTR_ERR(snap);
2132
2133                         /* note that we add it backward so using n and not p */
2134                         list_add(&snap->node, n);
2135                         p = &snap->node;
2136                 }
2137         }
2138         /* we're done going over the old snap list, just add what's left */
2139         for (; i > 0; i--) {
2140                 name = rbd_prev_snap_name(name, first_name);
2141                 if (!name) {
2142                         WARN_ON(1);
2143                         return -EINVAL;
2144                 }
2145                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2146                 if (IS_ERR(snap))
2147                         return PTR_ERR(snap);
2148                 list_add(&snap->node, &rbd_dev->snaps);
2149         }
2150
2151         return 0;
2152 }
2153
2154 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2155 {
2156         int ret;
2157         struct device *dev;
2158         struct rbd_snap *snap;
2159
2160         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2161         dev = &rbd_dev->dev;
2162
2163         dev->bus = &rbd_bus_type;
2164         dev->type = &rbd_device_type;
2165         dev->parent = &rbd_root_dev;
2166         dev->release = rbd_dev_release;
2167         dev_set_name(dev, "%d", rbd_dev->dev_id);
2168         ret = device_register(dev);
2169         if (ret < 0)
2170                 goto out;
2171
2172         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2173                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2174                 if (ret < 0)
2175                         break;
2176         }
2177 out:
2178         mutex_unlock(&ctl_mutex);
2179         return ret;
2180 }
2181
2182 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2183 {
2184         device_unregister(&rbd_dev->dev);
2185 }
2186
2187 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2188 {
2189         int ret, rc;
2190
2191         do {
2192                 ret = rbd_req_sync_watch(rbd_dev);
2193                 if (ret == -ERANGE) {
2194                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2195                         rc = __rbd_refresh_header(rbd_dev);
2196                         mutex_unlock(&ctl_mutex);
2197                         if (rc < 0)
2198                                 return rc;
2199                 }
2200         } while (ret == -ERANGE);
2201
2202         return ret;
2203 }
2204
2205 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2206
2207 /*
2208  * Get a unique rbd identifier for the given new rbd_dev, and add
2209  * the rbd_dev to the global list.  The minimum rbd id is 1.
2210  */
2211 static void rbd_id_get(struct rbd_device *rbd_dev)
2212 {
2213         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2214
2215         spin_lock(&rbd_dev_list_lock);
2216         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2217         spin_unlock(&rbd_dev_list_lock);
2218 }
2219
2220 /*
2221  * Remove an rbd_dev from the global list, and record that its
2222  * identifier is no longer in use.
2223  */
2224 static void rbd_id_put(struct rbd_device *rbd_dev)
2225 {
2226         struct list_head *tmp;
2227         int rbd_id = rbd_dev->dev_id;
2228         int max_id;
2229
2230         BUG_ON(rbd_id < 1);
2231
2232         spin_lock(&rbd_dev_list_lock);
2233         list_del_init(&rbd_dev->node);
2234
2235         /*
2236          * If the id being "put" is not the current maximum, there
2237          * is nothing special we need to do.
2238          */
2239         if (rbd_id != atomic64_read(&rbd_id_max)) {
2240                 spin_unlock(&rbd_dev_list_lock);
2241                 return;
2242         }
2243
2244         /*
2245          * We need to update the current maximum id.  Search the
2246          * list to find out what it is.  We're more likely to find
2247          * the maximum at the end, so search the list backward.
2248          */
2249         max_id = 0;
2250         list_for_each_prev(tmp, &rbd_dev_list) {
2251                 struct rbd_device *rbd_dev;
2252
2253                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2254                 if (rbd_id > max_id)
2255                         max_id = rbd_id;
2256         }
2257         spin_unlock(&rbd_dev_list_lock);
2258
2259         /*
2260          * The max id could have been updated by rbd_id_get(), in
2261          * which case it now accurately reflects the new maximum.
2262          * Be careful not to overwrite the maximum value in that
2263          * case.
2264          */
2265         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2266 }
2267
2268 /*
2269  * Skips over white space at *buf, and updates *buf to point to the
2270  * first found non-space character (if any). Returns the length of
2271  * the token (string of non-white space characters) found.  Note
2272  * that *buf must be terminated with '\0'.
2273  */
2274 static inline size_t next_token(const char **buf)
2275 {
2276         /*
2277         * These are the characters that produce nonzero for
2278         * isspace() in the "C" and "POSIX" locales.
2279         */
2280         const char *spaces = " \f\n\r\t\v";
2281
2282         *buf += strspn(*buf, spaces);   /* Find start of token */
2283
2284         return strcspn(*buf, spaces);   /* Return token length */
2285 }
2286
2287 /*
2288  * Finds the next token in *buf, and if the provided token buffer is
2289  * big enough, copies the found token into it.  The result, if
2290  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2291  * must be terminated with '\0' on entry.
2292  *
2293  * Returns the length of the token found (not including the '\0').
2294  * Return value will be 0 if no token is found, and it will be >=
2295  * token_size if the token would not fit.
2296  *
2297  * The *buf pointer will be updated to point beyond the end of the
2298  * found token.  Note that this occurs even if the token buffer is
2299  * too small to hold it.
2300  */
2301 static inline size_t copy_token(const char **buf,
2302                                 char *token,
2303                                 size_t token_size)
2304 {
2305         size_t len;
2306
2307         len = next_token(buf);
2308         if (len < token_size) {
2309                 memcpy(token, *buf, len);
2310                 *(token + len) = '\0';
2311         }
2312         *buf += len;
2313
2314         return len;
2315 }
2316
2317 /*
2318  * Finds the next token in *buf, dynamically allocates a buffer big
2319  * enough to hold a copy of it, and copies the token into the new
2320  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2321  * that a duplicate buffer is created even for a zero-length token.
2322  *
2323  * Returns a pointer to the newly-allocated duplicate, or a null
2324  * pointer if memory for the duplicate was not available.  If
2325  * the lenp argument is a non-null pointer, the length of the token
2326  * (not including the '\0') is returned in *lenp.
2327  *
2328  * If successful, the *buf pointer will be updated to point beyond
2329  * the end of the found token.
2330  *
2331  * Note: uses GFP_KERNEL for allocation.
2332  */
2333 static inline char *dup_token(const char **buf, size_t *lenp)
2334 {
2335         char *dup;
2336         size_t len;
2337
2338         len = next_token(buf);
2339         dup = kmalloc(len + 1, GFP_KERNEL);
2340         if (!dup)
2341                 return NULL;
2342
2343         memcpy(dup, *buf, len);
2344         *(dup + len) = '\0';
2345         *buf += len;
2346
2347         if (lenp)
2348                 *lenp = len;
2349
2350         return dup;
2351 }
2352
2353 /*
2354  * This fills in the pool_name, image_name, image_name_len, snap_name,
2355  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2356  * on the list of monitor addresses and other options provided via
2357  * /sys/bus/rbd/add.
2358  *
2359  * Note: rbd_dev is assumed to have been initially zero-filled.
2360  */
2361 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2362                               const char *buf,
2363                               const char **mon_addrs,
2364                               size_t *mon_addrs_size,
2365                               char *options,
2366                              size_t options_size)
2367 {
2368         size_t len;
2369         int ret;
2370
2371         /* The first four tokens are required */
2372
2373         len = next_token(&buf);
2374         if (!len)
2375                 return -EINVAL;
2376         *mon_addrs_size = len + 1;
2377         *mon_addrs = buf;
2378
2379         buf += len;
2380
2381         len = copy_token(&buf, options, options_size);
2382         if (!len || len >= options_size)
2383                 return -EINVAL;
2384
2385         ret = -ENOMEM;
2386         rbd_dev->pool_name = dup_token(&buf, NULL);
2387         if (!rbd_dev->pool_name)
2388                 goto out_err;
2389
2390         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2391         if (!rbd_dev->image_name)
2392                 goto out_err;
2393
2394         /* Create the name of the header object */
2395
2396         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2397                                                 + sizeof (RBD_SUFFIX),
2398                                         GFP_KERNEL);
2399         if (!rbd_dev->header_name)
2400                 goto out_err;
2401         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2402
2403         /*
2404          * The snapshot name is optional.  If none is is supplied,
2405          * we use the default value.
2406          */
2407         rbd_dev->snap_name = dup_token(&buf, &len);
2408         if (!rbd_dev->snap_name)
2409                 goto out_err;
2410         if (!len) {
2411                 /* Replace the empty name with the default */
2412                 kfree(rbd_dev->snap_name);
2413                 rbd_dev->snap_name
2414                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2415                 if (!rbd_dev->snap_name)
2416                         goto out_err;
2417
2418                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2419                         sizeof (RBD_SNAP_HEAD_NAME));
2420         }
2421
2422         return 0;
2423
2424 out_err:
2425         kfree(rbd_dev->header_name);
2426         kfree(rbd_dev->image_name);
2427         kfree(rbd_dev->pool_name);
2428         rbd_dev->pool_name = NULL;
2429
2430         return ret;
2431 }
2432
2433 static ssize_t rbd_add(struct bus_type *bus,
2434                        const char *buf,
2435                        size_t count)
2436 {
2437         char *options;
2438         struct rbd_device *rbd_dev = NULL;
2439         const char *mon_addrs = NULL;
2440         size_t mon_addrs_size = 0;
2441         struct ceph_osd_client *osdc;
2442         int rc = -ENOMEM;
2443
2444         if (!try_module_get(THIS_MODULE))
2445                 return -ENODEV;
2446
2447         options = kmalloc(count, GFP_KERNEL);
2448         if (!options)
2449                 goto err_nomem;
2450         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2451         if (!rbd_dev)
2452                 goto err_nomem;
2453
2454         /* static rbd_device initialization */
2455         spin_lock_init(&rbd_dev->lock);
2456         INIT_LIST_HEAD(&rbd_dev->node);
2457         INIT_LIST_HEAD(&rbd_dev->snaps);
2458         init_rwsem(&rbd_dev->header_rwsem);
2459
2460         /* generate unique id: find highest unique id, add one */
2461         rbd_id_get(rbd_dev);
2462
2463         /* Fill in the device name, now that we have its id. */
2464         BUILD_BUG_ON(DEV_NAME_LEN
2465                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2466         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2467
2468         /* parse add command */
2469         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2470                                 options, count);
2471         if (rc)
2472                 goto err_put_id;
2473
2474         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2475                                                 options);
2476         if (IS_ERR(rbd_dev->rbd_client)) {
2477                 rc = PTR_ERR(rbd_dev->rbd_client);
2478                 goto err_put_id;
2479         }
2480
2481         /* pick the pool */
2482         osdc = &rbd_dev->rbd_client->client->osdc;
2483         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2484         if (rc < 0)
2485                 goto err_out_client;
2486         rbd_dev->pool_id = rc;
2487
2488         /* register our block device */
2489         rc = register_blkdev(0, rbd_dev->name);
2490         if (rc < 0)
2491                 goto err_out_client;
2492         rbd_dev->major = rc;
2493
2494         rc = rbd_bus_add_dev(rbd_dev);
2495         if (rc)
2496                 goto err_out_blkdev;
2497
2498         /*
2499          * At this point cleanup in the event of an error is the job
2500          * of the sysfs code (initiated by rbd_bus_del_dev()).
2501          *
2502          * Set up and announce blkdev mapping.
2503          */
2504         rc = rbd_init_disk(rbd_dev);
2505         if (rc)
2506                 goto err_out_bus;
2507
2508         rc = rbd_init_watch_dev(rbd_dev);
2509         if (rc)
2510                 goto err_out_bus;
2511
2512         return count;
2513
2514 err_out_bus:
2515         /* this will also clean up rest of rbd_dev stuff */
2516
2517         rbd_bus_del_dev(rbd_dev);
2518         kfree(options);
2519         return rc;
2520
2521 err_out_blkdev:
2522         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2523 err_out_client:
2524         rbd_put_client(rbd_dev);
2525 err_put_id:
2526         if (rbd_dev->pool_name) {
2527                 kfree(rbd_dev->snap_name);
2528                 kfree(rbd_dev->header_name);
2529                 kfree(rbd_dev->image_name);
2530                 kfree(rbd_dev->pool_name);
2531         }
2532         rbd_id_put(rbd_dev);
2533 err_nomem:
2534         kfree(rbd_dev);
2535         kfree(options);
2536
2537         dout("Error adding device %s\n", buf);
2538         module_put(THIS_MODULE);
2539
2540         return (ssize_t) rc;
2541 }
2542
2543 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2544 {
2545         struct list_head *tmp;
2546         struct rbd_device *rbd_dev;
2547
2548         spin_lock(&rbd_dev_list_lock);
2549         list_for_each(tmp, &rbd_dev_list) {
2550                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2551                 if (rbd_dev->dev_id == dev_id) {
2552                         spin_unlock(&rbd_dev_list_lock);
2553                         return rbd_dev;
2554                 }
2555         }
2556         spin_unlock(&rbd_dev_list_lock);
2557         return NULL;
2558 }
2559
2560 static void rbd_dev_release(struct device *dev)
2561 {
2562         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2563
2564         if (rbd_dev->watch_request) {
2565                 struct ceph_client *client = rbd_dev->rbd_client->client;
2566
2567                 ceph_osdc_unregister_linger_request(&client->osdc,
2568                                                     rbd_dev->watch_request);
2569         }
2570         if (rbd_dev->watch_event)
2571                 rbd_req_sync_unwatch(rbd_dev);
2572
2573         rbd_put_client(rbd_dev);
2574
2575         /* clean up and free blkdev */
2576         rbd_free_disk(rbd_dev);
2577         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2578
2579         /* done with the id, and with the rbd_dev */
2580         kfree(rbd_dev->snap_name);
2581         kfree(rbd_dev->header_name);
2582         kfree(rbd_dev->pool_name);
2583         kfree(rbd_dev->image_name);
2584         rbd_id_put(rbd_dev);
2585         kfree(rbd_dev);
2586
2587         /* release module ref */
2588         module_put(THIS_MODULE);
2589 }
2590
2591 static ssize_t rbd_remove(struct bus_type *bus,
2592                           const char *buf,
2593                           size_t count)
2594 {
2595         struct rbd_device *rbd_dev = NULL;
2596         int target_id, rc;
2597         unsigned long ul;
2598         int ret = count;
2599
2600         rc = strict_strtoul(buf, 10, &ul);
2601         if (rc)
2602                 return rc;
2603
2604         /* convert to int; abort if we lost anything in the conversion */
2605         target_id = (int) ul;
2606         if (target_id != ul)
2607                 return -EINVAL;
2608
2609         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2610
2611         rbd_dev = __rbd_get_dev(target_id);
2612         if (!rbd_dev) {
2613                 ret = -ENOENT;
2614                 goto done;
2615         }
2616
2617         __rbd_remove_all_snaps(rbd_dev);
2618         rbd_bus_del_dev(rbd_dev);
2619
2620 done:
2621         mutex_unlock(&ctl_mutex);
2622         return ret;
2623 }
2624
2625 static ssize_t rbd_snap_add(struct device *dev,
2626                             struct device_attribute *attr,
2627                             const char *buf,
2628                             size_t count)
2629 {
2630         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2631         int ret;
2632         char *name = kmalloc(count + 1, GFP_KERNEL);
2633         if (!name)
2634                 return -ENOMEM;
2635
2636         snprintf(name, count, "%s", buf);
2637
2638         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2639
2640         ret = rbd_header_add_snap(rbd_dev,
2641                                   name, GFP_KERNEL);
2642         if (ret < 0)
2643                 goto err_unlock;
2644
2645         ret = __rbd_refresh_header(rbd_dev);
2646         if (ret < 0)
2647                 goto err_unlock;
2648
2649         /* shouldn't hold ctl_mutex when notifying.. notify might
2650            trigger a watch callback that would need to get that mutex */
2651         mutex_unlock(&ctl_mutex);
2652
2653         /* make a best effort, don't error if failed */
2654         rbd_req_sync_notify(rbd_dev);
2655
2656         ret = count;
2657         kfree(name);
2658         return ret;
2659
2660 err_unlock:
2661         mutex_unlock(&ctl_mutex);
2662         kfree(name);
2663         return ret;
2664 }
2665
2666 /*
2667  * create control files in sysfs
2668  * /sys/bus/rbd/...
2669  */
2670 static int rbd_sysfs_init(void)
2671 {
2672         int ret;
2673
2674         ret = device_register(&rbd_root_dev);
2675         if (ret < 0)
2676                 return ret;
2677
2678         ret = bus_register(&rbd_bus_type);
2679         if (ret < 0)
2680                 device_unregister(&rbd_root_dev);
2681
2682         return ret;
2683 }
2684
2685 static void rbd_sysfs_cleanup(void)
2686 {
2687         bus_unregister(&rbd_bus_type);
2688         device_unregister(&rbd_root_dev);
2689 }
2690
2691 int __init rbd_init(void)
2692 {
2693         int rc;
2694
2695         rc = rbd_sysfs_init();
2696         if (rc)
2697                 return rc;
2698         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2699         return 0;
2700 }
2701
2702 void __exit rbd_exit(void)
2703 {
2704         rbd_sysfs_cleanup();
2705 }
2706
2707 module_init(rbd_init);
2708 module_exit(rbd_exit);
2709
2710 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2711 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2712 MODULE_DESCRIPTION("rados block device");
2713
2714 /* following authorship retained from original osdblk.c */
2715 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2716
2717 MODULE_LICENSE("GPL");