Merge tag 'at91-fixes' of git://github.com/at91linux/linux-at91 into fixes
[cascardo/linux.git] / drivers / block / rbd.c
index 4c95b50..b2c98c1 100644 (file)
@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
                return -ENOENT;
 
        (void) get_device(&rbd_dev->dev);
-       set_device_ro(bdev, rbd_dev->mapping.read_only);
 
        return 0;
 }
@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
        put_device(&rbd_dev->dev);
 }
 
+static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
+{
+       int ret = 0;
+       int val;
+       bool ro;
+       bool ro_changed = false;
+
+       /* get_user() may sleep, so call it before taking rbd_dev->lock */
+       if (get_user(val, (int __user *)(arg)))
+               return -EFAULT;
+
+       ro = val ? true : false;
+       /* Snapshot doesn't allow to write*/
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
+               return -EROFS;
+
+       spin_lock_irq(&rbd_dev->lock);
+       /* prevent others open this device */
+       if (rbd_dev->open_count > 1) {
+               ret = -EBUSY;
+               goto out;
+       }
+
+       if (rbd_dev->mapping.read_only != ro) {
+               rbd_dev->mapping.read_only = ro;
+               ro_changed = true;
+       }
+
+out:
+       spin_unlock_irq(&rbd_dev->lock);
+       /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
+       if (ret == 0 && ro_changed)
+               set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
+
+       return ret;
+}
+
+static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
+                       unsigned int cmd, unsigned long arg)
+{
+       struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+       int ret = 0;
+
+       switch (cmd) {
+       case BLKROSET:
+               ret = rbd_ioctl_set_ro(rbd_dev, arg);
+               break;
+       default:
+               ret = -ENOTTY;
+       }
+
+       return ret;
+}
+
+#ifdef CONFIG_COMPAT
+static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
+                               unsigned int cmd, unsigned long arg)
+{
+       return rbd_ioctl(bdev, mode, cmd, arg);
+}
+#endif /* CONFIG_COMPAT */
+
 static const struct block_device_operations rbd_bd_ops = {
        .owner                  = THIS_MODULE,
        .open                   = rbd_open,
        .release                = rbd_release,
+       .ioctl                  = rbd_ioctl,
+#ifdef CONFIG_COMPAT
+       .compat_ioctl           = rbd_compat_ioctl,
+#endif
 };
 
 /*
@@ -1366,6 +1431,14 @@ static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
        return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
 }
 
+static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
+{
+       struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
+
+       return obj_request->img_offset <
+           round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
+}
+
 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
        dout("%s: obj %p (was %d)\n", __func__, obj_request,
@@ -1382,6 +1455,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+            atomic_read(&img_request->kref.refcount));
+       kref_get(&img_request->kref);
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request);
 static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
@@ -2142,6 +2222,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
        img_request->next_completion = which;
 out:
        spin_unlock_irq(&img_request->completion_lock);
+       rbd_img_request_put(img_request);
 
        if (!more)
                rbd_img_request_complete(img_request);
@@ -2242,6 +2323,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                        goto out_unwind;
                obj_request->osd_req = osd_req;
                obj_request->callback = rbd_img_obj_callback;
+               rbd_img_request_get(img_request);
 
                if (write_request) {
                        osd_req_op_alloc_hint_init(osd_req, which,
@@ -2674,7 +2756,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
         */
        if (!img_request_write_test(img_request) ||
                !img_request_layered_test(img_request) ||
-               rbd_dev->parent_overlap <= obj_request->img_offset ||
+               !obj_request_overlaps_parent(obj_request) ||
                ((known = obj_request_known_test(obj_request)) &&
                        obj_request_exists_test(obj_request))) {
 
@@ -2872,56 +2954,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 }
 
 /*
- * Request sync osd watch/unwatch.  The value of "start" determines
- * whether a watch request is being initiated or torn down.
+ * Initiate a watch request, synchronously.
  */
-static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        int ret;
 
-       rbd_assert(start ^ !!rbd_dev->watch_event);
-       rbd_assert(start ^ !!rbd_dev->watch_request);
+       rbd_assert(!rbd_dev->watch_event);
+       rbd_assert(!rbd_dev->watch_request);
 
-       if (start) {
-               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-                                               &rbd_dev->watch_event);
-               if (ret < 0)
-                       return ret;
-               rbd_assert(rbd_dev->watch_event != NULL);
-       }
+       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+                                    &rbd_dev->watch_event);
+       if (ret < 0)
+               return ret;
+
+       rbd_assert(rbd_dev->watch_event);
 
-       ret = -ENOMEM;
        obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-                                                       OBJ_REQUEST_NODATA);
-       if (!obj_request)
+                                            OBJ_REQUEST_NODATA);
+       if (!obj_request) {
+               ret = -ENOMEM;
                goto out_cancel;
+       }
 
        obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
                                                  obj_request);
-       if (!obj_request->osd_req)
-               goto out_cancel;
+       if (!obj_request->osd_req) {
+               ret = -ENOMEM;
+               goto out_put;
+       }
 
-       if (start)
-               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-       else
-               ceph_osdc_unregister_linger_request(osdc,
-                                       rbd_dev->watch_request->osd_req);
+       ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
+                             rbd_dev->watch_event->cookie, 0, 1);
        rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
-               goto out_cancel;
+               goto out_linger;
+
        ret = rbd_obj_request_wait(obj_request);
        if (ret)
-               goto out_cancel;
+               goto out_linger;
+
        ret = obj_request->result;
        if (ret)
-               goto out_cancel;
+               goto out_linger;
 
        /*
         * A watch request is set to linger, so the underlying osd
@@ -2931,36 +3012,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
         * it.  We'll drop that reference (below) after we've
         * unregistered it.
         */
-       if (start) {
-               rbd_dev->watch_request = obj_request;
+       rbd_dev->watch_request = obj_request;
 
-               return 0;
+       return 0;
+
+out_linger:
+       ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
+out_put:
+       rbd_obj_request_put(obj_request);
+out_cancel:
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
+
+       return ret;
+}
+
+/*
+ * Tear down a watch request, synchronously.
+ */
+static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
+       int ret;
+
+       rbd_assert(rbd_dev->watch_event);
+       rbd_assert(rbd_dev->watch_request);
+
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                            OBJ_REQUEST_NODATA);
+       if (!obj_request) {
+               ret = -ENOMEM;
+               goto out_cancel;
+       }
+
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
+                                                 obj_request);
+       if (!obj_request->osd_req) {
+               ret = -ENOMEM;
+               goto out_put;
        }
 
+       osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
+                             rbd_dev->watch_event->cookie, 0, 0);
+       rbd_osd_req_format_write(obj_request);
+
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out_put;
+
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out_put;
+
+       ret = obj_request->result;
+       if (ret)
+               goto out_put;
+
        /* We have successfully torn down the watch request */
 
+       ceph_osdc_unregister_linger_request(osdc,
+                                           rbd_dev->watch_request->osd_req);
        rbd_obj_request_put(rbd_dev->watch_request);
        rbd_dev->watch_request = NULL;
+
+out_put:
+       rbd_obj_request_put(obj_request);
 out_cancel:
-       /* Cancel the event if we're tearing down, or on error */
        ceph_osdc_cancel_event(rbd_dev->watch_event);
        rbd_dev->watch_event = NULL;
-       if (obj_request)
-               rbd_obj_request_put(obj_request);
 
        return ret;
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
-{
-       return __rbd_dev_header_watch_sync(rbd_dev, true);
-}
-
 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       ret = __rbd_dev_header_watch_sync(rbd_dev, false);
+       ret = __rbd_dev_header_unwatch_sync(rbd_dev);
        if (ret) {
                rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
                         ret);
@@ -3058,7 +3187,6 @@ static void rbd_request_fn(struct request_queue *q)
                __releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       bool read_only = rbd_dev->mapping.read_only;
        struct request *rq;
        int result;
 
@@ -3094,7 +3222,7 @@ static void rbd_request_fn(struct request_queue *q)
 
                if (write_request) {
                        result = -EROFS;
-                       if (read_only)
+                       if (rbd_dev->mapping.read_only)
                                goto end_request;
                        rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
                }
@@ -4682,6 +4810,38 @@ out_err:
        return ret;
 }
 
+/*
+ * Return pool id (>= 0) or a negative error code.
+ */
+static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
+{
+       u64 newest_epoch;
+       unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
+       int tries = 0;
+       int ret;
+
+again:
+       ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
+       if (ret == -ENOENT && tries++ < 1) {
+               ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
+                                              &newest_epoch);
+               if (ret < 0)
+                       return ret;
+
+               if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
+                       ceph_monc_request_next_osdmap(&rbdc->client->monc);
+                       (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
+                                                    newest_epoch, timeout);
+                       goto again;
+               } else {
+                       /* the osdmap we have is new enough */
+                       return -ENOENT;
+               }
+       }
+
+       return ret;
+}
+
 /*
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
@@ -4752,7 +4912,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 
                image_id = ceph_extract_encoded_string(&p, p + ret,
                                                NULL, GFP_NOIO);
-               ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+               ret = PTR_ERR_OR_ZERO(image_id);
                if (!ret)
                        rbd_dev->image_format = 2;
        } else {
@@ -4907,6 +5067,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
        ret = rbd_bus_add_dev(rbd_dev);
        if (ret)
@@ -5053,7 +5214,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        struct rbd_options *rbd_opts = NULL;
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
-       struct ceph_osd_client *osdc;
        bool read_only;
        int rc = -ENOMEM;
 
@@ -5075,8 +5235,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        }
 
        /* pick the pool */
-       osdc = &rbdc->client->osdc;
-       rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+       rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
        spec->pool_id = (u64)rc;
@@ -5387,6 +5546,7 @@ err_out_slab:
 
 static void __exit rbd_exit(void)
 {
+       ida_destroy(&rbd_dev_id_ida);
        rbd_sysfs_cleanup();
        if (single_major)
                unregister_blkdev(rbd_major, RBD_DRV_NAME);