xfs: rework log recovery to submit buffers on LSN boundaries
authorBrian Foster <bfoster@redhat.com>
Sun, 25 Sep 2016 22:22:16 +0000 (08:22 +1000)
committerDave Chinner <david@fromorbit.com>
Sun, 25 Sep 2016 22:22:16 +0000 (08:22 +1000)
The fix to log recovery to update the metadata LSN in recovered buffers
introduces the requirement that a buffer is submitted only once per
current LSN. Log recovery currently submits buffers on transaction
boundaries. This is not sufficient as the abstraction between log
records and transactions allows for various scenarios where multiple
transactions can share the same current LSN. If independent transactions
share an LSN and both modify the same buffer, log recovery can
incorrectly skip updates and leave the filesystem in an inconsisent
state.

In preparation for proper metadata LSN updates during log recovery,
update log recovery to submit buffers for write on LSN change boundaries
rather than transaction boundaries. Explicitly track the current LSN in
a new struct xlog field to handle the various corner cases of when the
current LSN may or may not change.

Signed-off-by: Brian Foster <bfoster@redhat.com>
Reviewed-by: Dave Chinner <dchinner@redhat.com>
Signed-off-by: Dave Chinner <david@fromorbit.com>
fs/xfs/xfs_log_priv.h
fs/xfs/xfs_log_recover.c

index 765f084..2b6eec5 100644 (file)
@@ -413,7 +413,8 @@ struct xlog {
        /* log record crc error injection factor */
        uint32_t                l_badcrc_factor;
 #endif
-
+       /* log recovery lsn tracking (for buffer submission */
+       xfs_lsn_t               l_recovery_lsn;
 };
 
 #define XLOG_BUF_CANCEL_BUCKET(log, blkno) \
index e8638fd..e24fb7b 100644 (file)
@@ -3846,14 +3846,13 @@ STATIC int
 xlog_recover_commit_trans(
        struct xlog             *log,
        struct xlog_recover     *trans,
-       int                     pass)
+       int                     pass,
+       struct list_head        *buffer_list)
 {
        int                             error = 0;
-       int                             error2;
        int                             items_queued = 0;
        struct xlog_recover_item        *item;
        struct xlog_recover_item        *next;
-       LIST_HEAD                       (buffer_list);
        LIST_HEAD                       (ra_list);
        LIST_HEAD                       (done_list);
 
@@ -3876,7 +3875,7 @@ xlog_recover_commit_trans(
                        items_queued++;
                        if (items_queued >= XLOG_RECOVER_COMMIT_QUEUE_MAX) {
                                error = xlog_recover_items_pass2(log, trans,
-                                               &buffer_list, &ra_list);
+                                               buffer_list, &ra_list);
                                list_splice_tail_init(&ra_list, &done_list);
                                items_queued = 0;
                        }
@@ -3894,15 +3893,14 @@ out:
        if (!list_empty(&ra_list)) {
                if (!error)
                        error = xlog_recover_items_pass2(log, trans,
-                                       &buffer_list, &ra_list);
+                                       buffer_list, &ra_list);
                list_splice_tail_init(&ra_list, &done_list);
        }
 
        if (!list_empty(&done_list))
                list_splice_init(&done_list, &trans->r_itemq);
 
-       error2 = xfs_buf_delwri_submit(&buffer_list);
-       return error ? error : error2;
+       return error;
 }
 
 STATIC void
@@ -4085,7 +4083,8 @@ xlog_recovery_process_trans(
        char                    *dp,
        unsigned int            len,
        unsigned int            flags,
-       int                     pass)
+       int                     pass,
+       struct list_head        *buffer_list)
 {
        int                     error = 0;
        bool                    freeit = false;
@@ -4109,7 +4108,8 @@ xlog_recovery_process_trans(
                error = xlog_recover_add_to_cont_trans(log, trans, dp, len);
                break;
        case XLOG_COMMIT_TRANS:
-               error = xlog_recover_commit_trans(log, trans, pass);
+               error = xlog_recover_commit_trans(log, trans, pass,
+                                                 buffer_list);
                /* success or fail, we are now done with this transaction. */
                freeit = true;
                break;
@@ -4191,10 +4191,12 @@ xlog_recover_process_ophdr(
        struct xlog_op_header   *ohead,
        char                    *dp,
        char                    *end,
-       int                     pass)
+       int                     pass,
+       struct list_head        *buffer_list)
 {
        struct xlog_recover     *trans;
        unsigned int            len;
+       int                     error;
 
        /* Do we understand who wrote this op? */
        if (ohead->oh_clientid != XFS_TRANSACTION &&
@@ -4221,8 +4223,39 @@ xlog_recover_process_ophdr(
                return 0;
        }
 
+       /*
+        * The recovered buffer queue is drained only once we know that all
+        * recovery items for the current LSN have been processed. This is
+        * required because:
+        *
+        * - Buffer write submission updates the metadata LSN of the buffer.
+        * - Log recovery skips items with a metadata LSN >= the current LSN of
+        *   the recovery item.
+        * - Separate recovery items against the same metadata buffer can share
+        *   a current LSN. I.e., consider that the LSN of a recovery item is
+        *   defined as the starting LSN of the first record in which its
+        *   transaction appears, that a record can hold multiple transactions,
+        *   and/or that a transaction can span multiple records.
+        *
+        * In other words, we are allowed to submit a buffer from log recovery
+        * once per current LSN. Otherwise, we may incorrectly skip recovery
+        * items and cause corruption.
+        *
+        * We don't know up front whether buffers are updated multiple times per
+        * LSN. Therefore, track the current LSN of each commit log record as it
+        * is processed and drain the queue when it changes. Use commit records
+        * because they are ordered correctly by the logging code.
+        */
+       if (log->l_recovery_lsn != trans->r_lsn &&
+           ohead->oh_flags & XLOG_COMMIT_TRANS) {
+               error = xfs_buf_delwri_submit(buffer_list);
+               if (error)
+                       return error;
+               log->l_recovery_lsn = trans->r_lsn;
+       }
+
        return xlog_recovery_process_trans(log, trans, dp, len,
-                                          ohead->oh_flags, pass);
+                                          ohead->oh_flags, pass, buffer_list);
 }
 
 /*
@@ -4240,7 +4273,8 @@ xlog_recover_process_data(
        struct hlist_head       rhash[],
        struct xlog_rec_header  *rhead,
        char                    *dp,
-       int                     pass)
+       int                     pass,
+       struct list_head        *buffer_list)
 {
        struct xlog_op_header   *ohead;
        char                    *end;
@@ -4262,7 +4296,7 @@ xlog_recover_process_data(
 
                /* errors will abort recovery */
                error = xlog_recover_process_ophdr(log, rhash, rhead, ohead,
-                                                   dp, end, pass);
+                                                  dp, end, pass, buffer_list);
                if (error)
                        return error;
 
@@ -4685,7 +4719,8 @@ xlog_recover_process(
        struct hlist_head       rhash[],
        struct xlog_rec_header  *rhead,
        char                    *dp,
-       int                     pass)
+       int                     pass,
+       struct list_head        *buffer_list)
 {
        int                     error;
        __le32                  crc;
@@ -4732,7 +4767,8 @@ xlog_recover_process(
        if (error)
                return error;
 
-       return xlog_recover_process_data(log, rhash, rhead, dp, pass);
+       return xlog_recover_process_data(log, rhash, rhead, dp, pass,
+                                        buffer_list);
 }
 
 STATIC int
@@ -4793,9 +4829,11 @@ xlog_do_recovery_pass(
        char                    *offset;
        xfs_buf_t               *hbp, *dbp;
        int                     error = 0, h_size, h_len;
+       int                     error2 = 0;
        int                     bblks, split_bblks;
        int                     hblks, split_hblks, wrapped_hblks;
        struct hlist_head       rhash[XLOG_RHASH_SIZE];
+       LIST_HEAD               (buffer_list);
 
        ASSERT(head_blk != tail_blk);
        rhead_blk = 0;
@@ -4981,7 +5019,7 @@ xlog_do_recovery_pass(
                        }
 
                        error = xlog_recover_process(log, rhash, rhead, offset,
-                                                    pass);
+                                                    pass, &buffer_list);
                        if (error)
                                goto bread_err2;
 
@@ -5012,7 +5050,8 @@ xlog_do_recovery_pass(
                if (error)
                        goto bread_err2;
 
-               error = xlog_recover_process(log, rhash, rhead, offset, pass);
+               error = xlog_recover_process(log, rhash, rhead, offset, pass,
+                                            &buffer_list);
                if (error)
                        goto bread_err2;
 
@@ -5025,10 +5064,17 @@ xlog_do_recovery_pass(
  bread_err1:
        xlog_put_bp(hbp);
 
+       /*
+        * Submit buffers that have been added from the last record processed,
+        * regardless of error status.
+        */
+       if (!list_empty(&buffer_list))
+               error2 = xfs_buf_delwri_submit(&buffer_list);
+
        if (error && first_bad)
                *first_bad = rhead_blk;
 
-       return error;
+       return error ? error : error2;
 }
 
 /*