libceph: protect osdc->osd_lru list with a spinlock
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:26 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:12:30 +0000 (01:12 +0200)
OSD client is getting moved from the big per-client lock to a set of
per-session locks.  The big rwlock would only be held for read most of
the time, so a global osdc->osd_lru needs additional protection.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index 2415dc0..486d681 100644 (file)
@@ -224,6 +224,7 @@ struct ceph_osd_client {
        struct mutex           request_mutex;
        struct rb_root         osds;          /* osds */
        struct list_head       osd_lru;       /* idle osds */
+       spinlock_t             osd_lru_lock;
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */
        struct list_head       req_lru;       /* in-flight lru */
index b6950c2..d1c8e06 100644 (file)
@@ -1101,31 +1101,37 @@ static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        }
 }
 
-static void __move_osd_to_lru(struct ceph_osd_client *osdc,
-                             struct ceph_osd *osd)
+static void __move_osd_to_lru(struct ceph_osd *osd)
 {
-       dout("%s %p\n", __func__, osd);
+       struct ceph_osd_client *osdc = osd->o_osdc;
+
+       dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
        BUG_ON(!list_empty(&osd->o_osd_lru));
 
+       spin_lock(&osdc->osd_lru_lock);
        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+       spin_unlock(&osdc->osd_lru_lock);
+
        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
 }
 
-static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
-                                 struct ceph_osd *osd)
+static void maybe_move_osd_to_lru(struct ceph_osd *osd)
 {
-       dout("%s %p\n", __func__, osd);
-
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests))
-               __move_osd_to_lru(osdc, osd);
+               __move_osd_to_lru(osd);
 }
 
 static void __remove_osd_from_lru(struct ceph_osd *osd)
 {
-       dout("__remove_osd_from_lru %p\n", osd);
+       struct ceph_osd_client *osdc = osd->o_osdc;
+
+       dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+       spin_lock(&osdc->osd_lru_lock);
        if (!list_empty(&osd->o_osd_lru))
                list_del_init(&osd->o_osd_lru);
+       spin_unlock(&osdc->osd_lru_lock);
 }
 
 /*
@@ -1199,7 +1205,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                ceph_msg_revoke(req->r_request);
 
                list_del_init(&req->r_osd_item);
-               maybe_move_osd_to_lru(osdc, req->r_osd);
+               maybe_move_osd_to_lru(req->r_osd);
                if (list_empty(&req->r_linger_osd_item))
                        req->r_osd = NULL;
        }
@@ -1248,7 +1254,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 
        if (req->r_osd) {
                list_del_init(&req->r_linger_osd_item);
-               maybe_move_osd_to_lru(osdc, req->r_osd);
+               maybe_move_osd_to_lru(req->r_osd);
                if (list_empty(&req->r_osd_item))
                        req->r_osd = NULL;
        }
@@ -2792,6 +2798,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
+       spin_lock_init(&osdc->osd_lru_lock);
        osdc->requests = RB_ROOT;
        INIT_LIST_HEAD(&osdc->req_lru);
        INIT_LIST_HEAD(&osdc->req_unsent);