libceph: handle_one_map()
authorIlya Dryomov <idryomov@gmail.com>
Thu, 28 Apr 2016 14:07:25 +0000 (16:07 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:12:27 +0000 (01:12 +0200)
Separate osdmap handling from decoding and iterating over a bag of maps
in a fresh MOSDMap message.  This sets up the scene for the updated OSD
client.

Of particular importance here is the addition of pi->was_full, which
can be used to answer "did this pool go full -> not-full in this map?".
This is the key bit for supporting pool quotas.

We won't be able to downgrade map_sem for much longer, so drop
downgrade_write().

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/mon_client.h
include/linux/ceph/osdmap.h
net/ceph/mon_client.c
net/ceph/osd_client.c

index 330d045..c14e9d8 100644 (file)
@@ -115,6 +115,7 @@ extern const char *ceph_sub_str[];
 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
                        bool continuous);
 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
+void ceph_monc_renew_subs(struct ceph_mon_client *monc);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
 extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
index 8468c73..821e16f 100644 (file)
@@ -45,6 +45,8 @@ struct ceph_pg_pool_info {
        s64 write_tier; /* wins for read+write ops */
        u64 flags; /* CEPH_POOL_FLAG_* */
        char *name;
+
+       bool was_full;  /* for handle_one_map() */
 };
 
 static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
index a426a4b..98bfbe1 100644 (file)
@@ -376,6 +376,14 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 }
 EXPORT_SYMBOL(ceph_monc_got_map);
 
+void ceph_monc_renew_subs(struct ceph_mon_client *monc)
+{
+       mutex_lock(&monc->mutex);
+       __send_subscribe(monc);
+       mutex_unlock(&monc->mutex);
+}
+EXPORT_SYMBOL(ceph_monc_renew_subs);
+
 /*
  * Register interest in the next osdmap
  */
index 9c35fd8..4227c55 100644 (file)
@@ -1245,6 +1245,21 @@ static bool __pool_full(struct ceph_pg_pool_info *pi)
        return pi->flags & CEPH_POOL_FLAG_FULL;
 }
 
+static bool have_pool_full(struct ceph_osd_client *osdc)
+{
+       struct rb_node *n;
+
+       for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+               struct ceph_pg_pool_info *pi =
+                   rb_entry(n, struct ceph_pg_pool_info, node);
+
+               if (__pool_full(pi))
+                       return true;
+       }
+
+       return false;
+}
+
 /*
  * Returns whether a request should be blocked from being sent
  * based on the current osdmap and osd_client settings.
@@ -1639,6 +1654,26 @@ static void __send_queued(struct ceph_osd_client *osdc)
        }
 }
 
+static void maybe_request_map(struct ceph_osd_client *osdc)
+{
+       bool continuous = false;
+
+       WARN_ON(!osdc->osdmap->epoch);
+
+       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+           ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+           ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+               dout("%s osdc %p continuous\n", __func__, osdc);
+               continuous = true;
+       } else {
+               dout("%s osdc %p onetime\n", __func__, osdc);
+       }
+
+       if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+                              osdc->osdmap->epoch + 1, continuous))
+               ceph_monc_renew_subs(&osdc->client->monc);
+}
+
 /*
  * Caller should hold map_sem for read and request_mutex.
  */
@@ -2119,6 +2154,18 @@ out_unlock:
        up_read(&osdc->map_sem);
 }
 
+static void set_pool_was_full(struct ceph_osd_client *osdc)
+{
+       struct rb_node *n;
+
+       for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+               struct ceph_pg_pool_info *pi =
+                   rb_entry(n, struct ceph_pg_pool_info, node);
+
+               pi->was_full = __pool_full(pi);
+       }
+}
+
 static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
        struct rb_node *p, *n;
@@ -2237,6 +2284,57 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
        }
 }
 
+static int handle_one_map(struct ceph_osd_client *osdc,
+                         void *p, void *end, bool incremental)
+{
+       struct ceph_osdmap *newmap;
+       struct rb_node *n;
+       bool skipped_map = false;
+       bool was_full;
+
+       was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+       set_pool_was_full(osdc);
+
+       if (incremental)
+               newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
+       else
+               newmap = ceph_osdmap_decode(&p, end);
+       if (IS_ERR(newmap))
+               return PTR_ERR(newmap);
+
+       if (newmap != osdc->osdmap) {
+               /*
+                * Preserve ->was_full before destroying the old map.
+                * For pools that weren't in the old map, ->was_full
+                * should be false.
+                */
+               for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
+                       struct ceph_pg_pool_info *pi =
+                           rb_entry(n, struct ceph_pg_pool_info, node);
+                       struct ceph_pg_pool_info *old_pi;
+
+                       old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
+                       if (old_pi)
+                               pi->was_full = old_pi->was_full;
+                       else
+                               WARN_ON(pi->was_full);
+               }
+
+               if (osdc->osdmap->epoch &&
+                   osdc->osdmap->epoch + 1 < newmap->epoch) {
+                       WARN_ON(incremental);
+                       skipped_map = true;
+               }
+
+               ceph_osdmap_destroy(osdc->osdmap);
+               osdc->osdmap = newmap;
+       }
+
+       was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+       kick_requests(osdc, skipped_map, was_full);
+
+       return 0;
+}
 
 /*
  * Process updated osd map.
@@ -2247,27 +2345,29 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
  */
 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
-       void *p, *end, *next;
+       void *p = msg->front.iov_base;
+       void *const end = p + msg->front.iov_len;
        u32 nr_maps, maplen;
        u32 epoch;
-       struct ceph_osdmap *newmap = NULL, *oldmap;
-       int err;
        struct ceph_fsid fsid;
-       bool was_full;
+       bool handled_incremental = false;
+       bool was_pauserd, was_pausewr;
+       bool pauserd, pausewr;
+       int err;
 
-       dout("handle_map have %u\n", osdc->osdmap->epoch);
-       p = msg->front.iov_base;
-       end = p + msg->front.iov_len;
+       dout("%s have %u\n", __func__, osdc->osdmap->epoch);
+       down_write(&osdc->map_sem);
 
        /* verify fsid */
        ceph_decode_need(&p, end, sizeof(fsid), bad);
        ceph_decode_copy(&p, &fsid, sizeof(fsid));
        if (ceph_check_fsid(osdc->client, &fsid) < 0)
-               return;
-
-       down_write(&osdc->map_sem);
+               goto bad;
 
-       was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+       was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+       was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+                     ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+                     have_pool_full(osdc);
 
        /* incremental maps */
        ceph_decode_32_safe(&p, end, nr_maps, bad);
@@ -2277,33 +2377,22 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                epoch = ceph_decode_32(&p);
                maplen = ceph_decode_32(&p);
                ceph_decode_need(&p, end, maplen, bad);
-               next = p + maplen;
-               if (osdc->osdmap->epoch+1 == epoch) {
+               if (osdc->osdmap->epoch &&
+                   osdc->osdmap->epoch + 1 == epoch) {
                        dout("applying incremental map %u len %d\n",
                             epoch, maplen);
-                       newmap = osdmap_apply_incremental(&p, next,
-                                                         osdc->osdmap);
-                       if (IS_ERR(newmap)) {
-                               err = PTR_ERR(newmap);
+                       err = handle_one_map(osdc, p, p + maplen, true);
+                       if (err)
                                goto bad;
-                       }
-                       BUG_ON(!newmap);
-                       if (newmap != osdc->osdmap) {
-                               ceph_osdmap_destroy(osdc->osdmap);
-                               osdc->osdmap = newmap;
-                       }
-                       was_full = was_full ||
-                               ceph_osdmap_flag(osdc->osdmap,
-                                                CEPH_OSDMAP_FULL);
-                       kick_requests(osdc, 0, was_full);
+                       handled_incremental = true;
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
                }
-               p = next;
+               p += maplen;
                nr_maps--;
        }
-       if (newmap)
+       if (handled_incremental)
                goto done;
 
        /* full maps */
@@ -2322,50 +2411,35 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             "older than our %u\n", epoch, maplen,
                             osdc->osdmap->epoch);
                } else {
-                       int skipped_map = 0;
-
                        dout("taking full map %u len %d\n", epoch, maplen);
-                       newmap = ceph_osdmap_decode(&p, p+maplen);
-                       if (IS_ERR(newmap)) {
-                               err = PTR_ERR(newmap);
+                       err = handle_one_map(osdc, p, p + maplen, false);
+                       if (err)
                                goto bad;
-                       }
-                       BUG_ON(!newmap);
-                       oldmap = osdc->osdmap;
-                       osdc->osdmap = newmap;
-                       if (oldmap) {
-                               if (oldmap->epoch + 1 < newmap->epoch)
-                                       skipped_map = 1;
-                               ceph_osdmap_destroy(oldmap);
-                       }
-                       was_full = was_full ||
-                               ceph_osdmap_flag(osdc->osdmap,
-                                                CEPH_OSDMAP_FULL);
-                       kick_requests(osdc, skipped_map, was_full);
                }
                p += maplen;
                nr_maps--;
        }
 
 done:
-       downgrade_write(&osdc->map_sem);
-       ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
-                         osdc->osdmap->epoch);
-
        /*
         * subscribe to subsequent osdmap updates if full to ensure
         * we find out when we are no longer full and stop returning
         * ENOSPC.
         */
-       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
-               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
-               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
+       pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+       pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+                 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+                 have_pool_full(osdc);
+       if (was_pauserd || was_pausewr || pauserd || pausewr)
+               maybe_request_map(osdc);
 
        mutex_lock(&osdc->request_mutex);
        __send_queued(osdc);
        mutex_unlock(&osdc->request_mutex);
-       up_read(&osdc->map_sem);
+
+       ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+                         osdc->osdmap->epoch);
+       up_write(&osdc->map_sem);
        wake_up_all(&osdc->client->auth_wq);
        return;