libceph: a major OSD client update
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:26 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:14:03 +0000 (01:14 +0200)
This is a major sync up, up to ~Jewel.  The highlights are:

- per-session request trees (vs a global per-client tree)
- per-session locking (vs a global per-client rwlock)
- homeless OSD session
- no ad-hoc global per-client lists
- support for pool quotas
- foundation for watch/notify v2 support
- foundation for map check (pool deletion detection) support

The switchover is incomplete: lingering requests can be setup and
teared down but aren't ever reestablished.  This functionality is
restored with the introduction of the new lingering infrastructure
(ceph_osd_linger_request, linger_work, etc) in a later commit.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/ioctl.c
fs/ceph/xattr.c
include/linux/ceph/osd_client.h
net/ceph/debugfs.c
net/ceph/osd_client.c

index 1831ad6..be6b165 100644 (file)
@@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        if (copy_from_user(&dl, arg, sizeof(dl)))
                return -EFAULT;
 
-       down_read(&osdc->map_sem);
+       down_read(&osdc->lock);
        r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
                                          &dl.object_no, &dl.object_offset,
                                          &olen);
        if (r < 0) {
-               up_read(&osdc->map_sem);
+               up_read(&osdc->lock);
                return -EIO;
        }
        dl.file_offset -= dl.object_offset;
@@ -217,7 +217,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
 
        r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
        if (r < 0) {
-               up_read(&osdc->map_sem);
+               up_read(&osdc->lock);
                return r;
        }
 
@@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        } else {
                memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));
        }
-       up_read(&osdc->map_sem);
+       up_read(&osdc->lock);
 
        /* send result back to user */
        if (copy_to_user(arg, &dl, sizeof(dl)))
index 9410abd..5afabc4 100644 (file)
@@ -75,7 +75,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
        char buf[128];
 
        dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
-       down_read(&osdc->map_sem);
+       down_read(&osdc->lock);
        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
        if (pool_name) {
                size_t len = strlen(pool_name);
@@ -107,7 +107,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
                                ret = -ERANGE;
                }
        }
-       up_read(&osdc->map_sem);
+       up_read(&osdc->lock);
        return ret;
 }
 
@@ -141,13 +141,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
        s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
        const char *pool_name;
 
-       down_read(&osdc->map_sem);
+       down_read(&osdc->lock);
        pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
        if (pool_name)
                ret = snprintf(val, size, "%s", pool_name);
        else
                ret = snprintf(val, size, "%lld", (unsigned long long)pool);
-       up_read(&osdc->map_sem);
+       up_read(&osdc->lock);
        return ret;
 }
 
index 486d681..342f22f 100644 (file)
@@ -33,12 +33,13 @@ struct ceph_osd {
        int o_incarnation;
        struct rb_node o_node;
        struct ceph_connection o_con;
-       struct list_head o_requests;
+       struct rb_root o_requests;
        struct list_head o_linger_requests;
        struct list_head o_osd_lru;
        struct ceph_auth_handshake o_auth;
        unsigned long lru_ttl;
        struct list_head o_keepalive_item;
+       struct mutex lock;
 };
 
 #define CEPH_OSD_SLAB_OPS      2
@@ -144,8 +145,6 @@ struct ceph_osd_request_target {
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
        struct rb_node  r_node;
-       struct list_head r_req_lru_item;
-       struct list_head r_osd_item;
        struct list_head r_linger_item;
        struct list_head r_linger_osd_item;
        struct ceph_osd *r_osd;
@@ -219,19 +218,16 @@ struct ceph_osd_client {
        struct ceph_client     *client;
 
        struct ceph_osdmap     *osdmap;       /* current map */
-       struct rw_semaphore    map_sem;
+       struct rw_semaphore    lock;
 
-       struct mutex           request_mutex;
        struct rb_root         osds;          /* osds */
        struct list_head       osd_lru;       /* idle osds */
        spinlock_t             osd_lru_lock;
-       u64                    last_tid;      /* tid of last request */
-       struct rb_root         requests;      /* pending requests */
-       struct list_head       req_lru;       /* in-flight lru */
-       struct list_head       req_unsent;    /* unsent/need-resend queue */
-       struct list_head       req_notarget;  /* map to no osd */
        struct list_head       req_linger;    /* lingering requests */
-       int                    num_requests;
+       struct ceph_osd        homeless_osd;
+       atomic64_t             last_tid;      /* tid of last request */
+       atomic_t               num_requests;
+       atomic_t               num_homeless;
        struct delayed_work    timeout_work;
        struct delayed_work    osds_timeout_work;
 #ifdef CONFIG_DEBUG_FS
index 6d3ff71..61dbd9d 100644 (file)
@@ -182,21 +182,39 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
        seq_putc(s, '\n');
 }
 
+static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
+{
+       struct rb_node *n;
+
+       mutex_lock(&osd->lock);
+       for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+               struct ceph_osd_request *req =
+                   rb_entry(n, struct ceph_osd_request, r_node);
+
+               dump_request(s, req);
+       }
+
+       mutex_unlock(&osd->lock);
+}
+
 static int osdc_show(struct seq_file *s, void *pp)
 {
        struct ceph_client *client = s->private;
        struct ceph_osd_client *osdc = &client->osdc;
-       struct rb_node *p;
-
-       mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-               struct ceph_osd_request *req;
+       struct rb_node *n;
 
-               req = rb_entry(p, struct ceph_osd_request, r_node);
+       down_read(&osdc->lock);
+       seq_printf(s, "REQUESTS %d homeless %d\n",
+                  atomic_read(&osdc->num_requests),
+                  atomic_read(&osdc->num_homeless));
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
 
-               dump_request(s, req);
+               dump_requests(s, osd);
        }
-       mutex_unlock(&osdc->request_mutex);
+       dump_requests(s, &osdc->homeless_osd);
+
+       up_read(&osdc->lock);
        return 0;
 }
 
index d1c8e06..4c856c8 100644 (file)
@@ -25,16 +25,6 @@ static struct kmem_cache     *ceph_osd_request_cache;
 
 static const struct ceph_connection_operations osd_con_ops;
 
-static void __send_queued(struct ceph_osd_client *osdc);
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
-static void __register_request(struct ceph_osd_client *osdc,
-                              struct ceph_osd_request *req);
-static void __unregister_request(struct ceph_osd_client *osdc,
-                                struct ceph_osd_request *req);
-static void __unregister_linger_request(struct ceph_osd_client *osdc,
-                                       struct ceph_osd_request *req);
-static void __enqueue_request(struct ceph_osd_request *req);
-
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -53,6 +43,43 @@ static void __enqueue_request(struct ceph_osd_request *req);
  * channel with an OSD is reset.
  */
 
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+
+#if 1
+static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
+{
+       bool wrlocked = true;
+
+       if (unlikely(down_read_trylock(sem))) {
+               wrlocked = false;
+               up_read(sem);
+       }
+
+       return wrlocked;
+}
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
+{
+       WARN_ON(!rwsem_is_locked(&osdc->lock));
+}
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
+{
+       WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
+}
+static inline void verify_osd_locked(struct ceph_osd *osd)
+{
+       struct ceph_osd_client *osdc = osd->o_osdc;
+
+       WARN_ON(!(mutex_is_locked(&osd->lock) &&
+                 rwsem_is_locked(&osdc->lock)) &&
+               !rwsem_is_wrlocked(&osdc->lock));
+}
+#else
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
+static inline void verify_osd_locked(struct ceph_osd *osd) { }
+#endif
+
 /*
  * calculate the mapping of a file extent onto an object, and fill out the
  * request accordingly.  shorten extent as necessary if it crosses an
@@ -336,18 +363,14 @@ static void ceph_osdc_release_request(struct kref *kref)
        dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
             req->r_request, req->r_reply);
        WARN_ON(!RB_EMPTY_NODE(&req->r_node));
-       WARN_ON(!list_empty(&req->r_req_lru_item));
-       WARN_ON(!list_empty(&req->r_osd_item));
        WARN_ON(!list_empty(&req->r_linger_item));
        WARN_ON(!list_empty(&req->r_linger_osd_item));
        WARN_ON(req->r_osd);
 
        if (req->r_request)
                ceph_msg_put(req->r_request);
-       if (req->r_reply) {
-               ceph_msg_revoke_incoming(req->r_reply);
+       if (req->r_reply)
                ceph_msg_put(req->r_reply);
-       }
 
        for (which = 0; which < req->r_num_ops; which++)
                osd_req_op_data_release(req, which);
@@ -418,8 +441,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd_item);
-       INIT_LIST_HEAD(&req->r_req_lru_item);
-       INIT_LIST_HEAD(&req->r_osd_item);
 
        target_init(&req->r_t);
 
@@ -869,141 +890,11 @@ static bool osd_homeless(struct ceph_osd *osd)
        return osd->o_osd == CEPH_HOMELESS_OSD;
 }
 
-static struct ceph_osd_request *
-__lookup_request_ge(struct ceph_osd_client *osdc,
-                   u64 tid)
-{
-       struct ceph_osd_request *req;
-       struct rb_node *n = osdc->requests.rb_node;
-
-       while (n) {
-               req = rb_entry(n, struct ceph_osd_request, r_node);
-               if (tid < req->r_tid) {
-                       if (!n->rb_left)
-                               return req;
-                       n = n->rb_left;
-               } else if (tid > req->r_tid) {
-                       n = n->rb_right;
-               } else {
-                       return req;
-               }
-       }
-       return NULL;
-}
-
-static void __kick_linger_request(struct ceph_osd_request *req)
-{
-       struct ceph_osd_client *osdc = req->r_osdc;
-       struct ceph_osd *osd = req->r_osd;
-
-       /*
-        * Linger requests need to be resent with a new tid to avoid
-        * the dup op detection logic on the OSDs.  Achieve this with
-        * a re-register dance instead of open-coding.
-        */
-       ceph_osdc_get_request(req);
-       if (!list_empty(&req->r_linger_item))
-               __unregister_linger_request(osdc, req);
-       else
-               __unregister_request(osdc, req);
-       __register_request(osdc, req);
-       ceph_osdc_put_request(req);
-
-       /*
-        * Unless request has been registered as both normal and
-        * lingering, __unregister{,_linger}_request clears r_osd.
-        * However, here we need to preserve r_osd to make sure we
-        * requeue on the same OSD.
-        */
-       WARN_ON(req->r_osd || !osd);
-       req->r_osd = osd;
-
-       dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
-       __enqueue_request(req);
-}
-
-/*
- * Resubmit requests pending on the given osd.
- */
-static void __kick_osd_requests(struct ceph_osd_client *osdc,
-                               struct ceph_osd *osd)
-{
-       struct ceph_osd_request *req, *nreq;
-       LIST_HEAD(resend);
-       LIST_HEAD(resend_linger);
-       int err;
-
-       dout("%s osd%d\n", __func__, osd->o_osd);
-       err = __reset_osd(osdc, osd);
-       if (err)
-               return;
-
-       /*
-        * Build up a list of requests to resend by traversing the
-        * osd's list of requests.  Requests for a given object are
-        * sent in tid order, and that is also the order they're
-        * kept on this list.  Therefore all requests that are in
-        * flight will be found first, followed by all requests that
-        * have not yet been sent.  And to resend requests while
-        * preserving this order we will want to put any sent
-        * requests back on the front of the osd client's unsent
-        * list.
-        *
-        * So we build a separate ordered list of already-sent
-        * requests for the affected osd and splice it onto the
-        * front of the osd client's unsent list.  Once we've seen a
-        * request that has not yet been sent we're done.  Those
-        * requests are already sitting right where they belong.
-        */
-       list_for_each_entry(req, &osd->o_requests, r_osd_item) {
-               if (!req->r_sent)
-                       break;
-
-               if (!req->r_linger) {
-                       dout("%s requeueing %p tid %llu\n", __func__, req,
-                            req->r_tid);
-                       list_move_tail(&req->r_req_lru_item, &resend);
-                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
-               } else {
-                       list_move_tail(&req->r_req_lru_item, &resend_linger);
-               }
-       }
-       list_splice(&resend, &osdc->req_unsent);
-
-       /*
-        * Both registered and not yet registered linger requests are
-        * enqueued with a new tid on the same OSD.  We add/move them
-        * to req_unsent/o_requests at the end to keep things in tid
-        * order.
-        */
-       list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
-                                r_linger_osd_item) {
-               WARN_ON(!list_empty(&req->r_req_lru_item));
-               __kick_linger_request(req);
-       }
-
-       list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
-               __kick_linger_request(req);
-}
-
-/*
- * If the osd connection drops, we need to resubmit all requests.
- */
-static void osd_reset(struct ceph_connection *con)
+static bool osd_registered(struct ceph_osd *osd)
 {
-       struct ceph_osd *osd = con->private;
-       struct ceph_osd_client *osdc;
+       verify_osdc_locked(osd->o_osdc);
 
-       if (!osd)
-               return;
-       dout("osd_reset osd%d\n", osd->o_osd);
-       osdc = osd->o_osdc;
-       down_read(&osdc->map_sem);
-       mutex_lock(&osdc->request_mutex);
-       __kick_osd_requests(osdc, osd);
-       __send_queued(osdc);
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
+       return !RB_EMPTY_NODE(&osd->o_node);
 }
 
 /*
@@ -1013,17 +904,18 @@ static void osd_init(struct ceph_osd *osd)
 {
        atomic_set(&osd->o_ref, 1);
        RB_CLEAR_NODE(&osd->o_node);
-       INIT_LIST_HEAD(&osd->o_requests);
+       osd->o_requests = RB_ROOT;
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        INIT_LIST_HEAD(&osd->o_keepalive_item);
        osd->o_incarnation = 1;
+       mutex_init(&osd->lock);
 }
 
 static void osd_cleanup(struct ceph_osd *osd)
 {
        WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
-       WARN_ON(!list_empty(&osd->o_requests));
+       WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
        WARN_ON(!list_empty(&osd->o_linger_requests));
        WARN_ON(!list_empty(&osd->o_osd_lru));
        WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1077,30 +969,6 @@ static void put_osd(struct ceph_osd *osd)
 
 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
 
-/*
- * remove an osd from our map
- */
-static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
-       dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-       WARN_ON(!list_empty(&osd->o_requests));
-       WARN_ON(!list_empty(&osd->o_linger_requests));
-
-       list_del_init(&osd->o_osd_lru);
-       erase_osd(&osdc->osds, osd);
-}
-
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
-       dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-
-       if (!RB_EMPTY_NODE(&osd->o_node)) {
-               ceph_con_close(&osd->o_con);
-               __remove_osd(osdc, osd);
-               put_osd(osd);
-       }
-}
-
 static void __move_osd_to_lru(struct ceph_osd *osd)
 {
        struct ceph_osd_client *osdc = osd->o_osdc;
@@ -1117,7 +985,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
 
 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
 {
-       if (list_empty(&osd->o_requests) &&
+       if (RB_EMPTY_ROOT(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests))
                __move_osd_to_lru(osd);
 }
@@ -1134,30 +1002,64 @@ static void __remove_osd_from_lru(struct ceph_osd *osd)
        spin_unlock(&osdc->osd_lru_lock);
 }
 
+/*
+ * Close the connection and assign any leftover requests to the
+ * homeless session.
+ */
+static void close_osd(struct ceph_osd *osd)
+{
+       struct ceph_osd_client *osdc = osd->o_osdc;
+       struct rb_node *n;
+
+       verify_osdc_wrlocked(osdc);
+       dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+       ceph_con_close(&osd->o_con);
+
+       for (n = rb_first(&osd->o_requests); n; ) {
+               struct ceph_osd_request *req =
+                   rb_entry(n, struct ceph_osd_request, r_node);
+
+               n = rb_next(n); /* unlink_request() */
+
+               dout(" reassigning req %p tid %llu\n", req, req->r_tid);
+               unlink_request(osd, req);
+               link_request(&osdc->homeless_osd, req);
+       }
+
+       __remove_osd_from_lru(osd);
+       erase_osd(&osdc->osds, osd);
+       put_osd(osd);
+}
+
 /*
  * reset osd connect
  */
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int reopen_osd(struct ceph_osd *osd)
 {
        struct ceph_entity_addr *peer_addr;
 
-       dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-       if (list_empty(&osd->o_requests) &&
+       dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+       if (RB_EMPTY_ROOT(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
-               remove_osd(osdc, osd);
+               close_osd(osd);
                return -ENODEV;
        }
 
-       peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+       peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
        if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
                        !ceph_con_opened(&osd->o_con)) {
-               struct ceph_osd_request *req;
+               struct rb_node *n;
 
                dout("osd addr hasn't changed and connection never opened, "
                     "letting msgr retry\n");
                /* touch each r_stamp for handle_timeout()'s benfit */
-               list_for_each_entry(req, &osd->o_requests, r_osd_item)
+               for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+                       struct ceph_osd_request *req =
+                           rb_entry(n, struct ceph_osd_request, r_node);
                        req->r_stamp = jiffies;
+               }
 
                return -EAGAIN;
        }
@@ -1169,73 +1071,84 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        return 0;
 }
 
-/*
- * Register request, assign tid.  If this is the first request, set up
- * the timeout event.
- */
-static void __register_request(struct ceph_osd_client *osdc,
-                              struct ceph_osd_request *req)
-{
-       req->r_tid = ++osdc->last_tid;
-       req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
-       dout("__register_request %p tid %lld\n", req, req->r_tid);
-       insert_request(&osdc->requests, req);
-       ceph_osdc_get_request(req);
-       osdc->num_requests++;
-}
-
-/*
- * called under osdc->request_mutex
- */
-static void __unregister_request(struct ceph_osd_client *osdc,
-                                struct ceph_osd_request *req)
+static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
+                                         bool wrlocked)
 {
-       if (RB_EMPTY_NODE(&req->r_node)) {
-               dout("__unregister_request %p tid %lld not registered\n",
-                       req, req->r_tid);
-               return;
-       }
+       struct ceph_osd *osd;
 
-       dout("__unregister_request %p tid %lld\n", req, req->r_tid);
-       erase_request(&osdc->requests, req);
-       osdc->num_requests--;
+       if (wrlocked)
+               verify_osdc_wrlocked(osdc);
+       else
+               verify_osdc_locked(osdc);
 
-       if (req->r_osd) {
-               /* make sure the original request isn't in flight. */
-               ceph_msg_revoke(req->r_request);
+       if (o != CEPH_HOMELESS_OSD)
+               osd = lookup_osd(&osdc->osds, o);
+       else
+               osd = &osdc->homeless_osd;
+       if (!osd) {
+               if (!wrlocked)
+                       return ERR_PTR(-EAGAIN);
 
-               list_del_init(&req->r_osd_item);
-               maybe_move_osd_to_lru(req->r_osd);
-               if (list_empty(&req->r_linger_osd_item))
-                       req->r_osd = NULL;
+               osd = create_osd(osdc, o);
+               insert_osd(&osdc->osds, osd);
+               ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+                             &osdc->osdmap->osd_addr[osd->o_osd]);
        }
 
-       list_del_init(&req->r_req_lru_item);
-       ceph_osdc_put_request(req);
+       dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
+       return osd;
 }
 
 /*
- * Cancel a previously queued request message
+ * Create request <-> OSD session relation.
+ *
+ * @req has to be assigned a tid, @osd may be homeless.
  */
-static void __cancel_request(struct ceph_osd_request *req)
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
 {
-       if (req->r_sent && req->r_osd) {
-               ceph_msg_revoke(req->r_request);
-               req->r_sent = 0;
-       }
+       verify_osd_locked(osd);
+       WARN_ON(!req->r_tid || req->r_osd);
+       dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+            req, req->r_tid);
+
+       if (!osd_homeless(osd))
+               __remove_osd_from_lru(osd);
+       else
+               atomic_inc(&osd->o_osdc->num_homeless);
+
+       get_osd(osd);
+       insert_request(&osd->o_requests, req);
+       req->r_osd = osd;
 }
 
-static void __register_linger_request(struct ceph_osd_client *osdc,
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
+{
+       verify_osd_locked(osd);
+       WARN_ON(req->r_osd != osd);
+       dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+            req, req->r_tid);
+
+       req->r_osd = NULL;
+       erase_request(&osd->o_requests, req);
+       put_osd(osd);
+
+       if (!osd_homeless(osd))
+               maybe_move_osd_to_lru(osd);
+       else
+               atomic_dec(&osd->o_osdc->num_homeless);
+}
+
+static void __register_linger_request(struct ceph_osd *osd,
                                    struct ceph_osd_request *req)
 {
        dout("%s %p tid %llu\n", __func__, req, req->r_tid);
        WARN_ON(!req->r_linger);
 
        ceph_osdc_get_request(req);
-       list_add_tail(&req->r_linger_item, &osdc->req_linger);
-       if (req->r_osd)
-               list_add_tail(&req->r_linger_osd_item,
-                             &req->r_osd->o_linger_requests);
+       list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
+       list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
+       __remove_osd_from_lru(osd);
+       req->r_osd = osd;
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1255,7 +1168,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
        if (req->r_osd) {
                list_del_init(&req->r_linger_osd_item);
                maybe_move_osd_to_lru(req->r_osd);
-               if (list_empty(&req->r_osd_item))
+               if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
                        req->r_osd = NULL;
        }
        ceph_osdc_put_request(req);
@@ -1291,11 +1204,20 @@ static bool have_pool_full(struct ceph_osd_client *osdc)
        return false;
 }
 
+static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
+{
+       struct ceph_pg_pool_info *pi;
+
+       pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+       if (!pi)
+               return false;
+
+       return __pool_full(pi);
+}
+
 /*
  * Returns whether a request should be blocked from being sent
  * based on the current osdmap and osd_client settings.
- *
- * Caller should hold map_sem for read.
  */
 static bool target_should_be_paused(struct ceph_osd_client *osdc,
                                    const struct ceph_osd_request_target *t,
@@ -1421,87 +1343,6 @@ out:
        return ct_res;
 }
 
-static void __enqueue_request(struct ceph_osd_request *req)
-{
-       struct ceph_osd_client *osdc = req->r_osdc;
-
-       dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
-            req->r_osd ? req->r_osd->o_osd : -1);
-
-       if (req->r_osd) {
-               __remove_osd_from_lru(req->r_osd);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-       } else {
-               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-       }
-}
-
-/*
- * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
- * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.  Move the request to the appropriate list
- * (unsent, homeless) or leave on in-flight lru.
- *
- * Return 0 if unchanged, 1 if changed, or negative on error.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __map_request(struct ceph_osd_client *osdc,
-                        struct ceph_osd_request *req, int force_resend)
-{
-       enum calc_target_result ct_res;
-       int err;
-
-       dout("map_request %p tid %lld\n", req, req->r_tid);
-
-       ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
-       switch (ct_res) {
-       case CALC_TARGET_POOL_DNE:
-               list_move(&req->r_req_lru_item, &osdc->req_notarget);
-               return -EIO;
-       case CALC_TARGET_NO_ACTION:
-               return 0;  /* no change */
-       default:
-               BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
-       }
-
-       dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
-            req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
-            req->r_osd ? req->r_osd->o_osd : -1);
-
-       if (req->r_osd) {
-               __cancel_request(req);
-               list_del_init(&req->r_osd_item);
-               list_del_init(&req->r_linger_osd_item);
-               req->r_osd = NULL;
-       }
-
-       req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
-       if (!req->r_osd && req->r_t.osd >= 0) {
-               err = -ENOMEM;
-               req->r_osd = create_osd(osdc, req->r_t.osd);
-               if (!req->r_osd) {
-                       list_move(&req->r_req_lru_item, &osdc->req_notarget);
-                       goto out;
-               }
-
-               dout("map_request osd %p is osd%d\n", req->r_osd,
-                    req->r_osd->o_osd);
-               insert_osd(&osdc->osds, req->r_osd);
-
-               ceph_con_open(&req->r_osd->o_con,
-                             CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
-                             &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
-       }
-
-       __enqueue_request(req);
-       err = 1;   /* osd or pg changed */
-
-out:
-       return err;
-}
-
 static void setup_request_data(struct ceph_osd_request *req,
                               struct ceph_msg *msg)
 {
@@ -1648,8 +1489,16 @@ static void send_request(struct ceph_osd_request *req)
 {
        struct ceph_osd *osd = req->r_osd;
 
+       verify_osd_locked(osd);
        WARN_ON(osd->o_osd != req->r_t.osd);
 
+       /*
+        * We may have a previously queued request message hanging
+        * around.  Cancel it to avoid corrupting the msgr.
+        */
+       if (req->r_sent)
+               ceph_msg_revoke(req->r_request);
+
        req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
        if (req->r_attempts)
                req->r_flags |= CEPH_OSD_FLAG_RETRY;
@@ -1671,24 +1520,11 @@ static void send_request(struct ceph_osd_request *req)
        ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
 }
 
-/*
- * Send any requests in the queue (req_unsent).
- */
-static void __send_queued(struct ceph_osd_client *osdc)
-{
-       struct ceph_osd_request *req, *tmp;
-
-       dout("__send_queued\n");
-       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
-               list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
-               send_request(req);
-       }
-}
-
 static void maybe_request_map(struct ceph_osd_client *osdc)
 {
        bool continuous = false;
 
+       verify_osdc_locked(osdc);
        WARN_ON(!osdc->osdmap->epoch);
 
        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
@@ -1705,38 +1541,121 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
                ceph_monc_renew_subs(&osdc->client->monc);
 }
 
-/*
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
-                                    struct ceph_osd_request *req,
-                                    bool nofail)
+static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 {
-       int rc;
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd *osd;
+       bool need_send = false;
+       bool promoted = false;
 
-       __register_request(osdc, req);
-       req->r_sent = 0;
-       req->r_got_reply = 0;
-       rc = __map_request(osdc, req, 0);
-       if (rc < 0) {
-               if (nofail) {
-                       dout("osdc_start_request failed map, "
-                               " will retry %lld\n", req->r_tid);
-                       rc = 0;
-               } else {
-                       __unregister_request(osdc, req);
-               }
-               return rc;
+       WARN_ON(req->r_tid || req->r_got_reply);
+       dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
+
+again:
+       calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+       osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
+       if (IS_ERR(osd)) {
+               WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
+               goto promote;
        }
 
-       if (req->r_osd == NULL) {
-               dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
+       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+           ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+               dout("req %p pausewr\n", req);
+               req->r_t.paused = true;
+               maybe_request_map(osdc);
+       } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+                  ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
+               dout("req %p pauserd\n", req);
+               req->r_t.paused = true;
+               maybe_request_map(osdc);
+       } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+                  !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
+                                    CEPH_OSD_FLAG_FULL_FORCE)) &&
+                  (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+                   pool_full(osdc, req->r_t.base_oloc.pool))) {
+               dout("req %p full/pool_full\n", req);
+               pr_warn_ratelimited("FULL or reached pool quota\n");
+               req->r_t.paused = true;
+               maybe_request_map(osdc);
+       } else if (!osd_homeless(osd)) {
+               need_send = true;
        } else {
-               __send_queued(osdc);
+               maybe_request_map(osdc);
        }
 
-       return 0;
+       mutex_lock(&osd->lock);
+       /*
+        * Assign the tid atomically with send_request() to protect
+        * multiple writes to the same object from racing with each
+        * other, resulting in out of order ops on the OSDs.
+        */
+       req->r_tid = atomic64_inc_return(&osdc->last_tid);
+       link_request(osd, req);
+       if (need_send)
+               send_request(req);
+       mutex_unlock(&osd->lock);
+
+       if (promoted)
+               downgrade_write(&osdc->lock);
+       return;
+
+promote:
+       up_read(&osdc->lock);
+       down_write(&osdc->lock);
+       wrlocked = true;
+       promoted = true;
+       goto again;
+}
+
+static void account_request(struct ceph_osd_request *req)
+{
+       unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+
+       if (req->r_flags & CEPH_OSD_FLAG_READ) {
+               WARN_ON(req->r_flags & mask);
+               req->r_flags |= CEPH_OSD_FLAG_ACK;
+       } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               WARN_ON(!(req->r_flags & mask));
+       else
+               WARN_ON(1);
+
+       WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
+       atomic_inc(&req->r_osdc->num_requests);
+}
+
+static void submit_request(struct ceph_osd_request *req, bool wrlocked)
+{
+       ceph_osdc_get_request(req);
+       account_request(req);
+       __submit_request(req, wrlocked);
+}
+
+static void __finish_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd *osd = req->r_osd;
+
+       verify_osd_locked(osd);
+       dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+       unlink_request(osd, req);
+       atomic_dec(&osdc->num_requests);
+
+       /*
+        * If an OSD has failed or returned and a request has been sent
+        * twice, it's possible to get a reply and end up here while the
+        * request message is queued for delivery.  We will ignore the
+        * reply, so not a big deal, but better to try and catch it.
+        */
+       ceph_msg_revoke(req->r_request);
+       ceph_msg_revoke_incoming(req->r_reply);
+}
+
+static void finish_request(struct ceph_osd_request *req)
+{
+       __finish_request(req);
+       ceph_osdc_put_request(req);
 }
 
 static void __complete_request(struct ceph_osd_request *req)
@@ -1747,6 +1666,13 @@ static void __complete_request(struct ceph_osd_request *req)
                complete_all(&req->r_completion);
 }
 
+static void cancel_request(struct ceph_osd_request *req)
+{
+       dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+       finish_request(req);
+}
+
 /*
  * Timeout callback, called every N seconds.  When 1 or more OSD
  * requests has been active for more than N seconds, we send a keepalive
@@ -1758,44 +1684,49 @@ static void handle_timeout(struct work_struct *work)
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
        struct ceph_options *opts = osdc->client->options;
-       struct ceph_osd_request *req;
-       struct ceph_osd *osd;
-       struct list_head slow_osds;
-       dout("timeout\n");
-       down_read(&osdc->map_sem);
-
-       ceph_monc_request_next_osdmap(&osdc->client->monc);
+       unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
+       LIST_HEAD(slow_osds);
+       struct rb_node *n, *p;
 
-       mutex_lock(&osdc->request_mutex);
+       dout("%s osdc %p\n", __func__, osdc);
+       down_write(&osdc->lock);
 
        /*
         * ping osds that are a bit slow.  this ensures that if there
         * is a break in the TCP connection we will notice, and reopen
         * a connection with that osd (from the fault callback).
         */
-       INIT_LIST_HEAD(&slow_osds);
-       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
-               if (time_before(jiffies,
-                               req->r_stamp + opts->osd_keepalive_timeout))
-                       break;
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+               bool found = false;
+
+               for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+                       struct ceph_osd_request *req =
+                           rb_entry(p, struct ceph_osd_request, r_node);
+
+                       if (time_before(req->r_stamp, cutoff)) {
+                               dout(" req %p tid %llu on osd%d is laggy\n",
+                                    req, req->r_tid, osd->o_osd);
+                               found = true;
+                       }
+               }
 
-               osd = req->r_osd;
-               BUG_ON(!osd);
-               dout(" tid %llu is slow, will send keepalive on osd%d\n",
-                    req->r_tid, osd->o_osd);
-               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+               if (found)
+                       list_move_tail(&osd->o_keepalive_item, &slow_osds);
        }
+
+       if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
+               maybe_request_map(osdc);
+
        while (!list_empty(&slow_osds)) {
-               osd = list_entry(slow_osds.next, struct ceph_osd,
-                                o_keepalive_item);
+               struct ceph_osd *osd = list_first_entry(&slow_osds,
+                                                       struct ceph_osd,
+                                                       o_keepalive_item);
                list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       __send_queued(osdc);
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
-
+       up_write(&osdc->lock);
        schedule_delayed_work(&osdc->timeout_work,
                              osdc->client->options->osd_keepalive_timeout);
 }
@@ -1809,18 +1740,17 @@ static void handle_osds_timeout(struct work_struct *work)
        struct ceph_osd *osd, *nosd;
 
        dout("%s osdc %p\n", __func__, osdc);
-       down_read(&osdc->map_sem);
-       mutex_lock(&osdc->request_mutex);
-
+       down_write(&osdc->lock);
        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
                if (time_before(jiffies, osd->lru_ttl))
                        break;
 
-               remove_osd(osdc, osd);
+               WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
+               WARN_ON(!list_empty(&osd->o_linger_requests));
+               close_osd(osd);
        }
 
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
+       up_write(&osdc->lock);
        schedule_delayed_work(&osdc->osds_timeout_work,
                              round_jiffies_relative(delay));
 }
@@ -2045,8 +1975,9 @@ static bool done_request(const struct ceph_osd_request *req,
  * when we get the safe reply  r_unsafe_cb(false),     r_cb/r_completion,
  *                             r_safe_completion       r_safe_completion
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 {
+       struct ceph_osd_client *osdc = osd->o_osdc;
        struct ceph_osd_request *req;
        struct MOSDOpReply m;
        u64 tid = le64_to_cpu(msg->hdr.tid);
@@ -2057,14 +1988,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
        dout("%s msg %p tid %llu\n", __func__, msg, tid);
 
-       down_read(&osdc->map_sem);
-       mutex_lock(&osdc->request_mutex);
-       req = lookup_request(&osdc->requests, tid);
+       down_read(&osdc->lock);
+       if (!osd_registered(osd)) {
+               dout("%s osd%d unknown\n", __func__, osd->o_osd);
+               goto out_unlock_osdc;
+       }
+       WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+       mutex_lock(&osd->lock);
+       req = lookup_request(&osd->o_requests, tid);
        if (!req) {
-               dout("%s no tid %llu\n", __func__, tid);
-               goto out_unlock;
+               dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
+               goto out_unlock_session;
        }
-       ceph_osdc_get_request(req);
 
        ret = decode_MOSDOpReply(msg, &m);
        if (ret) {
@@ -2083,7 +2019,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
                             req, req->r_tid, m.retry_attempt,
                             req->r_attempts - 1);
-                       goto out_put;
+                       goto out_unlock_session;
                }
        } else {
                WARN_ON(1); /* MOSDOpReply v4 is assumed */
@@ -2092,22 +2028,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        if (!ceph_oloc_empty(&m.redirect.oloc)) {
                dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
                     m.redirect.oloc.pool);
-               __unregister_request(osdc, req);
+               unlink_request(osd, req);
+               mutex_unlock(&osd->lock);
 
                ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
-
-               /*
-                * Start redirect requests with nofail=true.  If
-                * mapping fails, request will end up on the notarget
-                * list, waiting for the new osdmap (which can take
-                * a while), even though the original request mapped
-                * successfully.  In the future we might want to follow
-                * original request's nofail setting here.
-                */
-               ret = __ceph_osdc_start_request(osdc, req, true);
-               BUG_ON(ret);
-
-               goto out_put;
+               req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
+               req->r_tid = 0;
+               __submit_request(req, false);
+               goto out_unlock_osdc;
        }
 
        if (m.num_ops != req->r_num_ops) {
@@ -2137,19 +2065,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                req->r_got_reply = true;
        } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
                dout("req %p tid %llu dup ack\n", req, req->r_tid);
-               goto out_put;
+               goto out_unlock_session;
        }
 
        if (done_request(req, &m)) {
-               __unregister_request(osdc, req);
+               __finish_request(req);
                if (req->r_linger) {
                        WARN_ON(req->r_unsafe_callback);
-                       __register_linger_request(osdc, req);
+                       __register_linger_request(osd, req);
                }
        }
 
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
+       mutex_unlock(&osd->lock);
+       up_read(&osdc->lock);
 
        if (done_request(req, &m)) {
                if (already_acked && req->r_unsafe_callback) {
@@ -2175,14 +2103,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
 fail_request:
        req->r_result = -EIO;
-       __unregister_request(osdc, req);
+       __finish_request(req);
        __complete_request(req);
        complete_all(&req->r_safe_completion);
-out_put:
-       ceph_osdc_put_request(req);
-out_unlock:
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
+out_unlock_session:
+       mutex_unlock(&osd->lock);
+out_unlock_osdc:
+       up_read(&osdc->lock);
 }
 
 static void set_pool_was_full(struct ceph_osd_client *osdc)
@@ -2197,126 +2124,66 @@ static void set_pool_was_full(struct ceph_osd_client *osdc)
        }
 }
 
-static void reset_changed_osds(struct ceph_osd_client *osdc)
+static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
 {
-       struct rb_node *p, *n;
+       struct ceph_pg_pool_info *pi;
 
-       dout("%s %p\n", __func__, osdc);
-       for (p = rb_first(&osdc->osds); p; p = n) {
-               struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
+       pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+       if (!pi)
+               return false;
 
-               n = rb_next(p);
-               if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-                   memcmp(&osd->o_con.peer_addr,
-                          ceph_osd_addr(osdc->osdmap,
-                                        osd->o_osd),
-                          sizeof(struct ceph_entity_addr)) != 0)
-                       __reset_osd(osdc, osd);
-       }
+       return pi->was_full && !__pool_full(pi);
 }
 
 /*
- * Requeue requests whose mapping to an OSD has changed.  If requests map to
- * no osd, request a new map.
- *
- * Caller should hold map_sem for read.
+ * Requeue requests whose mapping to an OSD has changed.
  */
-static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
-                         bool force_resend_writes)
+static void scan_requests(struct ceph_osd *osd,
+                         bool force_resend,
+                         bool cleared_full,
+                         bool check_pool_cleared_full,
+                         struct rb_root *need_resend,
+                         struct list_head *need_resend_linger)
 {
-       struct ceph_osd_request *req, *nreq;
-       struct rb_node *p;
-       int needmap = 0;
-       int err;
-       bool force_resend_req;
-
-       dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
-               force_resend_writes ? " (force resend writes)" : "");
-       mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; ) {
-               req = rb_entry(p, struct ceph_osd_request, r_node);
-               p = rb_next(p);
-
-               /*
-                * For linger requests that have not yet been
-                * registered, move them to the linger list; they'll
-                * be sent to the osd in the loop below.  Unregister
-                * the request before re-registering it as a linger
-                * request to ensure the __map_request() below
-                * will decide it needs to be sent.
-                */
-               if (req->r_linger && list_empty(&req->r_linger_item)) {
-                       dout("%p tid %llu restart on osd%d\n",
-                            req, req->r_tid,
-                            req->r_osd ? req->r_osd->o_osd : -1);
-                       ceph_osdc_get_request(req);
-                       __unregister_request(osdc, req);
-                       __register_linger_request(osdc, req);
-                       ceph_osdc_put_request(req);
-                       continue;
-               }
-
-               force_resend_req = force_resend ||
-                       (force_resend_writes &&
-                               req->r_flags & CEPH_OSD_FLAG_WRITE);
-               err = __map_request(osdc, req, force_resend_req);
-               if (err < 0)
-                       continue;  /* error */
-               if (req->r_osd == NULL) {
-                       dout("%p tid %llu maps to no osd\n", req, req->r_tid);
-                       needmap++;  /* request a newer map */
-               } else if (err > 0) {
-                       if (!req->r_linger) {
-                               dout("%p tid %llu requeued on osd%d\n", req,
-                                    req->r_tid,
-                                    req->r_osd ? req->r_osd->o_osd : -1);
-                               req->r_flags |= CEPH_OSD_FLAG_RETRY;
-                       }
-               }
-       }
-
-       list_for_each_entry_safe(req, nreq, &osdc->req_linger,
-                                r_linger_item) {
-               dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
-
-               err = __map_request(osdc, req,
-                                   force_resend || force_resend_writes);
-               dout("__map_request returned %d\n", err);
-               if (err < 0)
-                       continue;  /* hrm! */
-               if (req->r_osd == NULL || err > 0) {
-                       if (req->r_osd == NULL) {
-                               dout("lingering %p tid %llu maps to no osd\n",
-                                    req, req->r_tid);
-                               /*
-                                * A homeless lingering request makes
-                                * no sense, as it's job is to keep
-                                * a particular OSD connection open.
-                                * Request a newer map and kick the
-                                * request, knowing that it won't be
-                                * resent until we actually get a map
-                                * that can tell us where to send it.
-                                */
-                               needmap++;
-                       }
-
-                       dout("kicking lingering %p tid %llu osd%d\n", req,
-                            req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
-                       __register_request(osdc, req);
-                       __unregister_linger_request(osdc, req);
+       struct ceph_osd_client *osdc = osd->o_osdc;
+       struct rb_node *n;
+       bool force_resend_writes;
+
+       for (n = rb_first(&osd->o_requests); n; ) {
+               struct ceph_osd_request *req =
+                   rb_entry(n, struct ceph_osd_request, r_node);
+               enum calc_target_result ct_res;
+
+               n = rb_next(n); /* unlink_request() */
+
+               dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+               ct_res = calc_target(osdc, &req->r_t,
+                                    &req->r_last_force_resend, false);
+               switch (ct_res) {
+               case CALC_TARGET_NO_ACTION:
+                       force_resend_writes = cleared_full ||
+                           (check_pool_cleared_full &&
+                            pool_cleared_full(osdc, req->r_t.base_oloc.pool));
+                       if (!force_resend &&
+                           (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
+                            !force_resend_writes))
+                               break;
+
+                       /* fall through */
+               case CALC_TARGET_NEED_RESEND:
+                       unlink_request(osd, req);
+                       insert_request(need_resend, req);
+                       break;
+               case CALC_TARGET_POOL_DNE:
+                       break;
                }
        }
-       reset_changed_osds(osdc);
-       mutex_unlock(&osdc->request_mutex);
-
-       if (needmap) {
-               dout("%d requests for down osds, need new map\n", needmap);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
-       }
 }
 
 static int handle_one_map(struct ceph_osd_client *osdc,
-                         void *p, void *end, bool incremental)
+                         void *p, void *end, bool incremental,
+                         struct rb_root *need_resend,
+                         struct list_head *need_resend_linger)
 {
        struct ceph_osdmap *newmap;
        struct rb_node *n;
@@ -2362,11 +2229,51 @@ static int handle_one_map(struct ceph_osd_client *osdc,
        }
 
        was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
-       kick_requests(osdc, skipped_map, was_full);
+       scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
+                     need_resend, need_resend_linger);
+
+       for (n = rb_first(&osdc->osds); n; ) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+               n = rb_next(n); /* close_osd() */
+
+               scan_requests(osd, skipped_map, was_full, true, need_resend,
+                             need_resend_linger);
+               if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+                   memcmp(&osd->o_con.peer_addr,
+                          ceph_osd_addr(osdc->osdmap, osd->o_osd),
+                          sizeof(struct ceph_entity_addr)))
+                       close_osd(osd);
+       }
 
        return 0;
 }
 
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct rb_root *need_resend,
+                         struct list_head *need_resend_linger)
+{
+       struct rb_node *n;
+
+       for (n = rb_first(need_resend); n; ) {
+               struct ceph_osd_request *req =
+                   rb_entry(n, struct ceph_osd_request, r_node);
+               struct ceph_osd *osd;
+
+               n = rb_next(n);
+               erase_request(need_resend, req); /* before link_request() */
+
+               WARN_ON(req->r_osd);
+               calc_target(osdc, &req->r_t, NULL, false);
+               osd = lookup_create_osd(osdc, req->r_t.osd, true);
+               link_request(osd, req);
+               if (!req->r_linger) {
+                       if (!osd_homeless(osd) && !req->r_t.paused)
+                               send_request(req);
+               }
+       }
+}
+
 /*
  * Process updated osd map.
  *
@@ -2381,13 +2288,15 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        u32 nr_maps, maplen;
        u32 epoch;
        struct ceph_fsid fsid;
+       struct rb_root need_resend = RB_ROOT;
+       LIST_HEAD(need_resend_linger);
        bool handled_incremental = false;
        bool was_pauserd, was_pausewr;
        bool pauserd, pausewr;
        int err;
 
        dout("%s have %u\n", __func__, osdc->osdmap->epoch);
-       down_write(&osdc->map_sem);
+       down_write(&osdc->lock);
 
        /* verify fsid */
        ceph_decode_need(&p, end, sizeof(fsid), bad);
@@ -2412,7 +2321,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                    osdc->osdmap->epoch + 1 == epoch) {
                        dout("applying incremental map %u len %d\n",
                             epoch, maplen);
-                       err = handle_one_map(osdc, p, p + maplen, true);
+                       err = handle_one_map(osdc, p, p + maplen, true,
+                                            &need_resend, &need_resend_linger);
                        if (err)
                                goto bad;
                        handled_incremental = true;
@@ -2443,7 +2353,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             osdc->osdmap->epoch);
                } else {
                        dout("taking full map %u len %d\n", epoch, maplen);
-                       err = handle_one_map(osdc, p, p + maplen, false);
+                       err = handle_one_map(osdc, p, p + maplen, false,
+                                            &need_resend, &need_resend_linger);
                        if (err)
                                goto bad;
                }
@@ -2464,20 +2375,60 @@ done:
        if (was_pauserd || was_pausewr || pauserd || pausewr)
                maybe_request_map(osdc);
 
-       mutex_lock(&osdc->request_mutex);
-       __send_queued(osdc);
-       mutex_unlock(&osdc->request_mutex);
+       kick_requests(osdc, &need_resend, &need_resend_linger);
 
        ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
                          osdc->osdmap->epoch);
-       up_write(&osdc->map_sem);
+       up_write(&osdc->lock);
        wake_up_all(&osdc->client->auth_wq);
        return;
 
 bad:
        pr_err("osdc handle_map corrupt msg\n");
        ceph_msg_dump(msg);
-       up_write(&osdc->map_sem);
+       up_write(&osdc->lock);
+}
+
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void kick_osd_requests(struct ceph_osd *osd)
+{
+       struct rb_node *n;
+
+       for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+               struct ceph_osd_request *req =
+                   rb_entry(n, struct ceph_osd_request, r_node);
+
+               if (!req->r_linger) {
+                       if (!req->r_t.paused)
+                               send_request(req);
+               }
+       }
+}
+
+/*
+ * If the osd connection drops, we need to resubmit all requests.
+ */
+static void osd_fault(struct ceph_connection *con)
+{
+       struct ceph_osd *osd = con->private;
+       struct ceph_osd_client *osdc = osd->o_osdc;
+
+       dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+       down_write(&osdc->lock);
+       if (!osd_registered(osd)) {
+               dout("%s osd%d unknown\n", __func__, osd->o_osd);
+               goto out_unlock;
+       }
+
+       if (!reopen_osd(osd))
+               kick_osd_requests(osd);
+       maybe_request_map(osdc);
+
+out_unlock:
+       up_write(&osdc->lock);
 }
 
 /*
@@ -2680,17 +2631,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                            struct ceph_osd_request *req,
                            bool nofail)
 {
-       int rc;
-
-       down_read(&osdc->map_sem);
-       mutex_lock(&osdc->request_mutex);
-
-       rc = __ceph_osdc_start_request(osdc, req, nofail);
+       down_read(&osdc->lock);
+       submit_request(req, false);
+       up_read(&osdc->lock);
 
-       mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
-
-       return rc;
+       return 0;
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
 
@@ -2703,13 +2648,12 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
 
-       mutex_lock(&osdc->request_mutex);
+       down_write(&osdc->lock);
        if (req->r_linger)
                __unregister_linger_request(osdc, req);
-       __unregister_request(osdc, req);
-       mutex_unlock(&osdc->request_mutex);
-
-       dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
+       if (req->r_osd)
+               cancel_request(req);
+       up_write(&osdc->lock);
 }
 EXPORT_SYMBOL(ceph_osdc_cancel_request);
 
@@ -2744,32 +2688,40 @@ EXPORT_SYMBOL(ceph_osdc_wait_request);
  */
 void ceph_osdc_sync(struct ceph_osd_client *osdc)
 {
-       struct ceph_osd_request *req;
-       u64 last_tid, next_tid = 0;
+       struct rb_node *n, *p;
+       u64 last_tid = atomic64_read(&osdc->last_tid);
 
-       mutex_lock(&osdc->request_mutex);
-       last_tid = osdc->last_tid;
-       while (1) {
-               req = __lookup_request_ge(osdc, next_tid);
-               if (!req)
-                       break;
-               if (req->r_tid > last_tid)
-                       break;
+again:
+       down_read(&osdc->lock);
+       for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+               struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+               mutex_lock(&osd->lock);
+               for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+                       struct ceph_osd_request *req =
+                           rb_entry(p, struct ceph_osd_request, r_node);
+
+                       if (req->r_tid > last_tid)
+                               break;
+
+                       if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
+                               continue;
 
-               next_tid = req->r_tid + 1;
-               if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
-                       continue;
+                       ceph_osdc_get_request(req);
+                       mutex_unlock(&osd->lock);
+                       up_read(&osdc->lock);
+                       dout("%s waiting on req %p tid %llu last_tid %llu\n",
+                            __func__, req, req->r_tid, last_tid);
+                       wait_for_completion(&req->r_safe_completion);
+                       ceph_osdc_put_request(req);
+                       goto again;
+               }
 
-               ceph_osdc_get_request(req);
-               mutex_unlock(&osdc->request_mutex);
-               dout("sync waiting on tid %llu (last is %llu)\n",
-                    req->r_tid, last_tid);
-               wait_for_completion(&req->r_safe_completion);
-               mutex_lock(&osdc->request_mutex);
-               ceph_osdc_put_request(req);
+               mutex_unlock(&osd->lock);
        }
-       mutex_unlock(&osdc->request_mutex);
-       dout("sync done (thru tid %llu)\n", last_tid);
+
+       up_read(&osdc->lock);
+       dout("%s done last_tid %llu\n", __func__, last_tid);
 }
 EXPORT_SYMBOL(ceph_osdc_sync);
 
@@ -2793,18 +2745,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 
        dout("init\n");
        osdc->client = client;
-       init_rwsem(&osdc->map_sem);
-       mutex_init(&osdc->request_mutex);
-       osdc->last_tid = 0;
+       init_rwsem(&osdc->lock);
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
        spin_lock_init(&osdc->osd_lru_lock);
-       osdc->requests = RB_ROOT;
-       INIT_LIST_HEAD(&osdc->req_lru);
-       INIT_LIST_HEAD(&osdc->req_unsent);
-       INIT_LIST_HEAD(&osdc->req_notarget);
        INIT_LIST_HEAD(&osdc->req_linger);
-       osdc->num_requests = 0;
+       osd_init(&osdc->homeless_osd);
+       osdc->homeless_osd.o_osdc = osdc;
+       osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
        spin_lock_init(&osdc->event_lock);
@@ -2861,13 +2809,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        cancel_delayed_work_sync(&osdc->timeout_work);
        cancel_delayed_work_sync(&osdc->osds_timeout_work);
 
-       mutex_lock(&osdc->request_mutex);
+       down_write(&osdc->lock);
        while (!RB_EMPTY_ROOT(&osdc->osds)) {
                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
                                                struct ceph_osd, o_node);
-               remove_osd(osdc, osd);
+               close_osd(osd);
        }
-       mutex_unlock(&osdc->request_mutex);
+       up_write(&osdc->lock);
+       WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+       osd_cleanup(&osdc->homeless_osd);
+
+       WARN_ON(!list_empty(&osdc->osd_lru));
+       WARN_ON(atomic_read(&osdc->num_requests));
+       WARN_ON(atomic_read(&osdc->num_homeless));
 
        ceph_osdmap_destroy(osdc->osdmap);
        mempool_destroy(osdc->req_mempool);
@@ -2982,19 +2936,15 @@ EXPORT_SYMBOL(ceph_osdc_cleanup);
 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 {
        struct ceph_osd *osd = con->private;
-       struct ceph_osd_client *osdc;
+       struct ceph_osd_client *osdc = osd->o_osdc;
        int type = le16_to_cpu(msg->hdr.type);
 
-       if (!osd)
-               goto out;
-       osdc = osd->o_osdc;
-
        switch (type) {
        case CEPH_MSG_OSD_MAP:
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg);
+               handle_reply(osd, msg);
                break;
        case CEPH_MSG_WATCH_NOTIFY:
                handle_watch_notify(osdc, msg);
@@ -3004,7 +2954,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                pr_err("received unknown message type %d %s\n", type,
                       ceph_msg_type_name(type));
        }
-out:
+
        ceph_msg_put(msg);
 }
 
@@ -3019,21 +2969,27 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
-       struct ceph_msg *m;
+       struct ceph_msg *m = NULL;
        struct ceph_osd_request *req;
        int front_len = le32_to_cpu(hdr->front_len);
        int data_len = le32_to_cpu(hdr->data_len);
-       u64 tid;
+       u64 tid = le64_to_cpu(hdr->tid);
 
-       tid = le64_to_cpu(hdr->tid);
-       mutex_lock(&osdc->request_mutex);
-       req = lookup_request(&osdc->requests, tid);
+       down_read(&osdc->lock);
+       if (!osd_registered(osd)) {
+               dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
+               *skip = 1;
+               goto out_unlock_osdc;
+       }
+       WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
+
+       mutex_lock(&osd->lock);
+       req = lookup_request(&osd->o_requests, tid);
        if (!req) {
                dout("%s osd%d tid %llu unknown, skipping\n", __func__,
                     osd->o_osd, tid);
-               m = NULL;
                *skip = 1;
-               goto out;
+               goto out_unlock_session;
        }
 
        ceph_msg_revoke_incoming(req->r_reply);
@@ -3045,7 +3001,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
                                 false);
                if (!m)
-                       goto out;
+                       goto out_unlock_session;
                ceph_msg_put(req->r_reply);
                req->r_reply = m;
        }
@@ -3056,14 +3012,16 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                        req->r_reply->data_length);
                m = NULL;
                *skip = 1;
-               goto out;
+               goto out_unlock_session;
        }
 
        m = ceph_msg_get(req->r_reply);
        dout("get_reply tid %lld %p\n", tid, m);
 
-out:
-       mutex_unlock(&osdc->request_mutex);
+out_unlock_session:
+       mutex_unlock(&osd->lock);
+out_unlock_osdc:
+       up_read(&osdc->lock);
        return m;
 }
 
@@ -3083,8 +3041,8 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);
        default:
-               pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
-                       osd->o_osd);
+               pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
+                       osd->o_osd, type);
                *skip = 1;
                return NULL;
        }
@@ -3188,5 +3146,5 @@ static const struct ceph_connection_operations osd_con_ops = {
        .alloc_msg = alloc_msg,
        .sign_message = osd_sign_message,
        .check_message_signature = osd_check_message_signature,
-       .fault = osd_reset,
+       .fault = osd_fault,
 };