Merge branch 'xfs-buf-iosubmit' into for-next
[cascardo/linux.git] / fs / xfs / xfs_buf.c
index 409a8a0..017b6af 100644 (file)
@@ -623,10 +623,11 @@ _xfs_buf_read(
        bp->b_flags &= ~(XBF_WRITE | XBF_ASYNC | XBF_READ_AHEAD);
        bp->b_flags |= flags & (XBF_READ | XBF_ASYNC | XBF_READ_AHEAD);
 
-       xfs_buf_iorequest(bp);
-       if (flags & XBF_ASYNC)
+       if (flags & XBF_ASYNC) {
+               xfs_buf_submit(bp);
                return 0;
-       return xfs_buf_iowait(bp);
+       }
+       return xfs_buf_submit_wait(bp);
 }
 
 xfs_buf_t *
@@ -687,34 +688,39 @@ xfs_buf_readahead_map(
  * Read an uncached buffer from disk. Allocates and returns a locked
  * buffer containing the disk contents or nothing.
  */
-struct xfs_buf *
+int
 xfs_buf_read_uncached(
        struct xfs_buftarg      *target,
        xfs_daddr_t             daddr,
        size_t                  numblks,
        int                     flags,
+       struct xfs_buf          **bpp,
        const struct xfs_buf_ops *ops)
 {
        struct xfs_buf          *bp;
 
+       *bpp = NULL;
+
        bp = xfs_buf_get_uncached(target, numblks, flags);
        if (!bp)
-               return NULL;
+               return -ENOMEM;
 
        /* set up the buffer for a read IO */
        ASSERT(bp->b_map_count == 1);
-       bp->b_bn = daddr;
+       bp->b_bn = XFS_BUF_DADDR_NULL;  /* always null for uncached buffers */
        bp->b_maps[0].bm_bn = daddr;
        bp->b_flags |= XBF_READ;
        bp->b_ops = ops;
 
-       if (XFS_FORCED_SHUTDOWN(target->bt_mount)) {
+       xfs_buf_submit_wait(bp);
+       if (bp->b_error) {
+               int     error = bp->b_error;
                xfs_buf_relse(bp);
-               return NULL;
+               return error;
        }
-       xfs_buf_iorequest(bp);
-       xfs_buf_iowait(bp);
-       return bp;
+
+       *bpp = bp;
+       return 0;
 }
 
 /*
@@ -1028,12 +1034,8 @@ xfs_buf_ioend(
                (*(bp->b_iodone))(bp);
        else if (bp->b_flags & XBF_ASYNC)
                xfs_buf_relse(bp);
-       else {
+       else
                complete(&bp->b_iowait);
-
-               /* release the !XBF_ASYNC ref now we are done. */
-               xfs_buf_rele(bp);
-       }
 }
 
 static void
@@ -1074,45 +1076,6 @@ xfs_buf_ioerror_alert(
                (__uint64_t)XFS_BUF_ADDR(bp), func, -bp->b_error, bp->b_length);
 }
 
-/*
- * Same as xfs_bioerror, except that we are releasing the buffer
- * here ourselves, and avoiding the xfs_buf_ioend call.
- * This is meant for userdata errors; metadata bufs come with
- * iodone functions attached, so that we can track down errors.
- */
-int
-xfs_bioerror_relse(
-       struct xfs_buf  *bp)
-{
-       int64_t         fl = bp->b_flags;
-       /*
-        * No need to wait until the buffer is unpinned.
-        * We aren't flushing it.
-        *
-        * chunkhold expects B_DONE to be set, whether
-        * we actually finish the I/O or not. We don't want to
-        * change that interface.
-        */
-       XFS_BUF_UNREAD(bp);
-       XFS_BUF_DONE(bp);
-       xfs_buf_stale(bp);
-       bp->b_iodone = NULL;
-       if (!(fl & XBF_ASYNC)) {
-               /*
-                * Mark b_error and B_ERROR _both_.
-                * Lot's of chunkcache code assumes that.
-                * There's no reason to mark error for
-                * ASYNC buffers.
-                */
-               xfs_buf_ioerror(bp, -EIO);
-               complete(&bp->b_iowait);
-       } else {
-               xfs_buf_relse(bp);
-       }
-
-       return -EIO;
-}
-
 int
 xfs_bwrite(
        struct xfs_buf          *bp)
@@ -1125,21 +1088,7 @@ xfs_bwrite(
        bp->b_flags &= ~(XBF_ASYNC | XBF_READ | _XBF_DELWRI_Q |
                         XBF_WRITE_FAIL | XBF_DONE);
 
-       if (XFS_FORCED_SHUTDOWN(bp->b_target->bt_mount)) {
-               trace_xfs_bdstrat_shut(bp, _RET_IP_);
-
-               xfs_buf_ioerror(bp, -EIO);
-               xfs_buf_stale(bp);
-
-               /* sync IO, xfs_buf_ioend is going to remove a ref here */
-               xfs_buf_hold(bp);
-               xfs_buf_ioend(bp);
-               return -EIO;
-       }
-
-       xfs_buf_iorequest(bp);
-
-       error = xfs_buf_iowait(bp);
+       error = xfs_buf_submit_wait(bp);
        if (error) {
                xfs_force_shutdown(bp->b_target->bt_mount,
                                   SHUTDOWN_META_IO_ERROR);
@@ -1248,7 +1197,7 @@ next_chunk:
        } else {
                /*
                 * This is guaranteed not to be the last io reference count
-                * because the caller (xfs_buf_iorequest) holds a count itself.
+                * because the caller (xfs_buf_submit) holds a count itself.
                 */
                atomic_dec(&bp->b_io_remaining);
                xfs_buf_ioerror(bp, -EIO);
@@ -1338,13 +1287,29 @@ _xfs_buf_ioapply(
        blk_finish_plug(&plug);
 }
 
+/*
+ * Asynchronous IO submission path. This transfers the buffer lock ownership and
+ * the current reference to the IO. It is not safe to reference the buffer after
+ * a call to this function unless the caller holds an additional reference
+ * itself.
+ */
 void
-xfs_buf_iorequest(
-       xfs_buf_t               *bp)
+xfs_buf_submit(
+       struct xfs_buf  *bp)
 {
-       trace_xfs_buf_iorequest(bp, _RET_IP_);
+       trace_xfs_buf_submit(bp, _RET_IP_);
 
        ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
+       ASSERT(bp->b_flags & XBF_ASYNC);
+
+       /* on shutdown we stale and complete the buffer immediately */
+       if (XFS_FORCED_SHUTDOWN(bp->b_target->bt_mount)) {
+               xfs_buf_ioerror(bp, -EIO);
+               bp->b_flags &= ~XBF_DONE;
+               xfs_buf_stale(bp);
+               xfs_buf_ioend(bp);
+               return;
+       }
 
        if (bp->b_flags & XBF_WRITE)
                xfs_buf_wait_unpin(bp);
@@ -1353,25 +1318,14 @@ xfs_buf_iorequest(
        bp->b_io_error = 0;
 
        /*
-        * Take references to the buffer. For XBF_ASYNC buffers, holding a
-        * reference for as long as submission takes is all that is necessary
-        * here. The IO inherits the lock and hold count from the submitter,
-        * and these are release during IO completion processing. Taking a hold
-        * over submission ensures that the buffer is not freed until we have
-        * completed all processing, regardless of when IO errors occur or are
-        * reported.
-        *
-        * However, for synchronous IO, the IO does not inherit the submitters
-        * reference count, nor the buffer lock. Hence we need to take an extra
-        * reference to the buffer for the for the IO context so that we can
-        * guarantee the buffer is not freed until all IO completion processing
-        * is done. Otherwise the caller can drop their reference while the IO
-        * is still in progress and hence trigger a use-after-free situation.
+        * The caller's reference is released during I/O completion.
+        * This occurs some time after the last b_io_remaining reference is
+        * released, so after we drop our Io reference we have to have some
+        * other reference to ensure the buffer doesn't go away from underneath
+        * us. Take a direct reference to ensure we have safe access to the
+        * buffer until we are finished with it.
         */
        xfs_buf_hold(bp);
-       if (!(bp->b_flags & XBF_ASYNC))
-               xfs_buf_hold(bp);
-
 
        /*
         * Set the count to 1 initially, this will stop an I/O completion
@@ -1382,40 +1336,82 @@ xfs_buf_iorequest(
        _xfs_buf_ioapply(bp);
 
        /*
-        * If _xfs_buf_ioapply failed or we are doing synchronous IO that
-        * completes extremely quickly, we can get back here with only the IO
-        * reference we took above. If we drop it to zero, run completion
-        * processing synchronously so that we don't return to the caller with
-        * completion still pending. This avoids unnecessary context switches
-        * associated with the end_io workqueue.
+        * If _xfs_buf_ioapply failed, we can get back here with only the IO
+        * reference we took above. If we drop it to zero, run completion so
+        * that we don't return to the caller with completion still pending.
         */
        if (atomic_dec_and_test(&bp->b_io_remaining) == 1) {
-               if (bp->b_error || !(bp->b_flags & XBF_ASYNC))
+               if (bp->b_error)
                        xfs_buf_ioend(bp);
                else
                        xfs_buf_ioend_async(bp);
        }
 
        xfs_buf_rele(bp);
+       /* Note: it is not safe to reference bp now we've dropped our ref */
 }
 
 /*
- * Waits for I/O to complete on the buffer supplied.  It returns immediately if
- * no I/O is pending or there is already a pending error on the buffer, in which
- * case nothing will ever complete.  It returns the I/O error code, if any, or
- * 0 if there was no error.
+ * Synchronous buffer IO submission path, read or write.
  */
 int
-xfs_buf_iowait(
-       xfs_buf_t               *bp)
+xfs_buf_submit_wait(
+       struct xfs_buf  *bp)
 {
-       trace_xfs_buf_iowait(bp, _RET_IP_);
+       int             error;
 
-       if (!bp->b_error)
-               wait_for_completion(&bp->b_iowait);
+       trace_xfs_buf_submit_wait(bp, _RET_IP_);
+
+       ASSERT(!(bp->b_flags & (_XBF_DELWRI_Q | XBF_ASYNC)));
+
+       if (XFS_FORCED_SHUTDOWN(bp->b_target->bt_mount)) {
+               xfs_buf_ioerror(bp, -EIO);
+               xfs_buf_stale(bp);
+               bp->b_flags &= ~XBF_DONE;
+               return -EIO;
+       }
+
+       if (bp->b_flags & XBF_WRITE)
+               xfs_buf_wait_unpin(bp);
+
+       /* clear the internal error state to avoid spurious errors */
+       bp->b_io_error = 0;
+
+       /*
+        * For synchronous IO, the IO does not inherit the submitters reference
+        * count, nor the buffer lock. Hence we cannot release the reference we
+        * are about to take until we've waited for all IO completion to occur,
+        * including any xfs_buf_ioend_async() work that may be pending.
+        */
+       xfs_buf_hold(bp);
 
+       /*
+        * Set the count to 1 initially, this will stop an I/O completion
+        * callout which happens before we have started all the I/O from calling
+        * xfs_buf_ioend too early.
+        */
+       atomic_set(&bp->b_io_remaining, 1);
+       _xfs_buf_ioapply(bp);
+
+       /*
+        * make sure we run completion synchronously if it raced with us and is
+        * already complete.
+        */
+       if (atomic_dec_and_test(&bp->b_io_remaining) == 1)
+               xfs_buf_ioend(bp);
+
+       /* wait for completion before gathering the error from the buffer */
+       trace_xfs_buf_iowait(bp, _RET_IP_);
+       wait_for_completion(&bp->b_iowait);
        trace_xfs_buf_iowait_done(bp, _RET_IP_);
-       return bp->b_error;
+       error = bp->b_error;
+
+       /*
+        * all done now, we can release the hold that keeps the buffer
+        * referenced for the entire IO.
+        */
+       xfs_buf_rele(bp);
+       return error;
 }
 
 xfs_caddr_t
@@ -1821,15 +1817,7 @@ __xfs_buf_delwri_submit(
                else
                        list_del_init(&bp->b_list);
 
-               if (XFS_FORCED_SHUTDOWN(bp->b_target->bt_mount)) {
-                       trace_xfs_bdstrat_shut(bp, _RET_IP_);
-
-                       xfs_buf_ioerror(bp, -EIO);
-                       xfs_buf_stale(bp);
-                       xfs_buf_ioend(bp);
-                       continue;
-               }
-               xfs_buf_iorequest(bp);
+               xfs_buf_submit(bp);
        }
        blk_finish_plug(&plug);
 
@@ -1897,7 +1885,7 @@ xfs_buf_init(void)
                goto out;
 
        xfslogd_workqueue = alloc_workqueue("xfslogd",
-                                       WQ_MEM_RECLAIM | WQ_HIGHPRI, 1);
+                               WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_FREEZABLE, 1);
        if (!xfslogd_workqueue)
                goto out_free_buf_zone;