ceph: cleanup async writeback, truncation, invalidate helpers
[cascardo/linux.git] / fs / ceph / caps.c
index 775e6f6..68ee781 100644 (file)
@@ -610,7 +610,6 @@ retry:
        cap->issue_seq = seq;
        cap->mseq = mseq;
        cap->cap_gen = session->s_cap_gen;
-       cap->recon_gen = session->s_recon_gen;
 
        if (fmode >= 0)
                __ceph_get_fmode(ci, fmode);
@@ -627,21 +626,13 @@ retry:
 static int __cap_is_valid(struct ceph_cap *cap)
 {
        unsigned long ttl;
-       u32 gen, recon_gen;
+       u32 gen;
 
        spin_lock(&cap->session->s_cap_lock);
        gen = cap->session->s_cap_gen;
-       recon_gen = cap->session->s_recon_gen;
        ttl = cap->session->s_cap_ttl;
        spin_unlock(&cap->session->s_cap_lock);
 
-       if (cap->recon_gen != recon_gen) {
-               dout("__cap_is_valid %p cap %p issued %s "
-                    "but DEAD (recon_gen %u vs %u)\n", &cap->ci->vfs_inode,
-                    cap, ceph_cap_string(cap->issued), cap->recon_gen,
-                    recon_gen);
-               return 0;
-       }
        if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
                dout("__cap_is_valid %p cap %p issued %s "
                     "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode,
@@ -706,10 +697,15 @@ static void __touch_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *s = cap->session;
 
-       dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
-            s->s_mds);
        spin_lock(&s->s_cap_lock);
-       list_move_tail(&cap->session_caps, &s->s_caps);
+       if (!s->s_iterating_caps) {
+               dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
+                    s->s_mds);
+               list_move_tail(&cap->session_caps, &s->s_caps);
+       } else {
+               dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
+                    &cap->ci->vfs_inode, cap, s->s_mds);
+       }
        spin_unlock(&s->s_cap_lock);
 }
 
@@ -926,14 +922,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
        if (IS_ERR(msg))
                return PTR_ERR(msg);
 
-       fc = msg->front.iov_base;
+       msg->hdr.tid = cpu_to_le64(flush_tid);
 
+       fc = msg->front.iov_base;
        memset(fc, 0, sizeof(*fc));
 
        fc->cap_id = cpu_to_le64(cid);
        fc->op = cpu_to_le32(op);
        fc->seq = cpu_to_le32(seq);
-       fc->client_tid = cpu_to_le64(flush_tid);
        fc->issue_seq = cpu_to_le32(issue_seq);
        fc->migrate_seq = cpu_to_le32(mseq);
        fc->caps = cpu_to_le32(caps);
@@ -1046,10 +1042,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        struct ceph_inode_info *ci = cap->ci;
        struct inode *inode = &ci->vfs_inode;
        u64 cap_id = cap->cap_id;
-       int held = cap->issued | cap->implemented;
-       int revoking = cap->implemented & ~cap->issued;
-       int dropping = cap->issued & ~retain;
-       int keep;
+       int held, revoking, dropping, keep;
        u64 seq, issue_seq, mseq, time_warp_seq, follows;
        u64 size, max_size;
        struct timespec mtime, atime;
@@ -1064,6 +1057,11 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        int i;
        int ret;
 
+       held = cap->issued | cap->implemented;
+       revoking = cap->implemented & ~cap->issued;
+       retain &= ~revoking;
+       dropping = cap->issued & ~retain;
+
        dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
             inode, cap, cap->session,
             ceph_cap_string(held), ceph_cap_string(held & retain),
@@ -1325,7 +1323,7 @@ static int __mark_caps_flushing(struct inode *inode,
        struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int flushing;
-       
+
        BUG_ON(ci->i_dirty_caps == 0);
        BUG_ON(list_empty(&ci->i_dirty_item));
 
@@ -1378,12 +1376,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        int file_wanted, used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
        int drop_session_lock = session ? 0 : 1;
-       int want, retain, revoking, flushing = 0;
+       int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
        int tried_invalidate = 0;
        int delayed = 0, sent = 0, force_requeue = 0, num;
+       int queue_invalidate = 0;
        int is_delayed = flags & CHECK_CAPS_NODELAY;
 
        /* if we are unmounting, flush any unused caps immediately. */
@@ -1405,6 +1404,8 @@ retry_locked:
        file_wanted = __ceph_caps_file_wanted(ci);
        used = __ceph_caps_used(ci);
        want = file_wanted | used;
+       issued = __ceph_caps_issued(ci, &implemented);
+       revoking = implemented & ~issued;
 
        retain = want | CEPH_CAP_PIN;
        if (!mdsc->stopping && inode->i_nlink > 0) {
@@ -1423,11 +1424,11 @@ retry_locked:
        }
 
        dout("check_caps %p file_want %s used %s dirty %s flushing %s"
-            " issued %s retain %s %s%s%s\n", inode,
+            " issued %s revoking %s retain %s %s%s%s\n", inode,
             ceph_cap_string(file_wanted),
             ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
             ceph_cap_string(ci->i_flushing_caps),
-            ceph_cap_string(__ceph_caps_issued(ci, NULL)),
+            ceph_cap_string(issued), ceph_cap_string(revoking),
             ceph_cap_string(retain),
             (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
             (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
@@ -1441,7 +1442,8 @@ retry_locked:
        if ((!is_delayed || mdsc->stopping) &&
            ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
            ci->i_rdcache_gen &&                     /* may have cached pages */
-           file_wanted == 0 &&                      /* no open files */
+           (file_wanted == 0 ||                     /* no open files */
+            (revoking & CEPH_CAP_FILE_CACHE)) &&     /*  or revoking cache */
            !ci->i_truncate_pending &&
            !tried_invalidate) {
                u32 invalidating_gen = ci->i_rdcache_gen;
@@ -1449,12 +1451,16 @@ retry_locked:
 
                dout("check_caps trying to invalidate on %p\n", inode);
                spin_unlock(&inode->i_lock);
-               ret = invalidate_inode_pages2(&inode->i_data);
+               ret = invalidate_mapping_pages(&inode->i_data, 0, -1);
                spin_lock(&inode->i_lock);
                if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) {
                        /* success. */
                        ci->i_rdcache_gen = 0;
                        ci->i_rdcache_revoking = 0;
+               } else if (revoking & CEPH_CAP_FILE_CACHE) {
+                       dout("check_caps queuing invalidate\n");
+                       queue_invalidate = 1;
+                       ci->i_rdcache_revoking = ci->i_rdcache_gen;
                } else {
                        dout("check_caps failed to invalidate pages\n");
                        /* we failed to invalidate pages.  check these
@@ -1480,7 +1486,7 @@ retry_locked:
 
                revoking = cap->implemented & ~cap->issued;
                if (revoking)
-                       dout("mds%d revoking %s\n", cap->mds,
+                       dout(" mds%d revoking %s\n", cap->mds,
                             ceph_cap_string(revoking));
 
                if (cap == ci->i_auth_cap &&
@@ -1595,6 +1601,9 @@ ack:
 
        spin_unlock(&inode->i_lock);
 
+       if (queue_invalidate)
+               ceph_queue_invalidate(inode);
+
        if (session && drop_session_lock)
                mutex_unlock(&session->s_mutex);
        if (took_snap_rwsem)
@@ -2168,7 +2177,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        int wake = 0;
        int writeback = 0;
        int revoked_rdcache = 0;
-       int invalidate_async = 0;
+       int queue_invalidate = 0;
        int tried_invalidate = 0;
        int ret;
 
@@ -2189,13 +2198,13 @@ restart:
                spin_unlock(&inode->i_lock);
                tried_invalidate = 1;
 
-               ret = invalidate_inode_pages2(&inode->i_data);
+               ret = invalidate_mapping_pages(&inode->i_data, 0, -1);
                spin_lock(&inode->i_lock);
                if (ret < 0) {
                        /* there were locked pages.. invalidate later
                           in a separate thread. */
                        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-                               invalidate_async = 1;
+                               queue_invalidate = 1;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
                        }
                } else {
@@ -2213,7 +2222,6 @@ restart:
        issued |= implemented | __ceph_caps_dirty(ci);
 
        cap->cap_gen = session->s_cap_gen;
-       cap->recon_gen = session->s_recon_gen;
 
        __check_cap_issue(ci, cap, newcaps);
 
@@ -2310,21 +2318,15 @@ restart:
        }
 
        spin_unlock(&inode->i_lock);
-       if (writeback) {
+       if (writeback)
                /*
                 * queue inode for writeback: we can't actually call
                 * filemap_write_and_wait, etc. from message handler
                 * context.
                 */
-               dout("queueing %p for writeback\n", inode);
-               if (ceph_queue_writeback(inode))
-                       igrab(inode);
-       }
-       if (invalidate_async) {
-               dout("queueing %p for page invalidation\n", inode);
-               if (ceph_queue_page_invalidation(inode))
-                       igrab(inode);
-       }
+               ceph_queue_writeback(inode);
+       if (queue_invalidate)
+               ceph_queue_invalidate(inode);
        if (wake)
                wake_up(&ci->i_cap_wq);
        return reply;
@@ -2334,7 +2336,7 @@ restart:
  * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
  * MDS has been safely committed.
  */
-static void handle_cap_flush_ack(struct inode *inode,
+static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                                 struct ceph_mds_caps *m,
                                 struct ceph_mds_session *session,
                                 struct ceph_cap *cap)
@@ -2345,7 +2347,6 @@ static void handle_cap_flush_ack(struct inode *inode,
        unsigned seq = le32_to_cpu(m->seq);
        int dirty = le32_to_cpu(m->dirty);
        int cleaned = 0;
-       u64 flush_tid = le64_to_cpu(m->client_tid);
        int drop = 0;
        int i;
 
@@ -2401,13 +2402,12 @@ out:
  *
  * Caller hold s_mutex.
  */
-static void handle_cap_flushsnap_ack(struct inode *inode,
+static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
                                     struct ceph_mds_caps *m,
                                     struct ceph_mds_session *session)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 follows = le64_to_cpu(m->snap_follows);
-       u64 flush_tid = le64_to_cpu(m->client_tid);
        struct ceph_cap_snap *capsnap;
        int drop = 0;
 
@@ -2472,9 +2472,7 @@ static void handle_cap_trunc(struct inode *inode,
        spin_unlock(&inode->i_lock);
 
        if (queue_trunc)
-               if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                              &ci->i_vmtruncate_work))
-                       igrab(inode);
+               ceph_queue_vmtruncate(inode);
 }
 
 /*
@@ -2592,12 +2590,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct ceph_vino vino;
        u64 cap_id;
        u64 size, max_size;
+       u64 tid;
        int check_caps = 0;
        int r;
 
        dout("handle_caps from mds%d\n", mds);
 
        /* decode */
+       tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
@@ -2626,7 +2626,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        /* these will work even if we don't have a cap yet */
        switch (op) {
        case CEPH_CAP_OP_FLUSHSNAP_ACK:
-               handle_cap_flushsnap_ack(inode, h, session);
+               handle_cap_flushsnap_ack(inode, tid, h, session);
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
@@ -2667,7 +2667,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                break;
 
        case CEPH_CAP_OP_FLUSH_ACK:
-               handle_cap_flush_ack(inode, h, session, cap);
+               handle_cap_flush_ack(inode, tid, h, session, cap);
                break;
 
        case CEPH_CAP_OP_TRUNC:
@@ -2691,6 +2691,7 @@ done:
 
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
+       ceph_msg_dump(msg);
        return;
 }