RDS/IB: track signaled sends
[cascardo/linux.git] / net / rds / ib_send.c
index e88cb4a..15f7569 100644 (file)
@@ -219,6 +219,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
        }
 }
 
+/*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+       if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+           waitqueue_active(&rds_ib_ring_empty_wait))
+               wake_up(&rds_ib_ring_empty_wait);
+       BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
 /*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
        u32 oldest;
        u32 i = 0;
        int ret;
+       int nr_sig = 0;
 
        rdsdebug("cq %p conn %p\n", cq, conn);
        rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 
                for (i = 0; i < completed; i++) {
                        send = &ic->i_sends[oldest];
+                       if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                               nr_sig++;
 
                        rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
                }
 
                rds_ib_ring_free(&ic->i_send_ring, completed);
+               rds_ib_sub_signaled(ic, nr_sig);
+               nr_sig = 0;
 
                if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
                    test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-                                             struct rds_ib_send_work *send,
-                                             bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+                                            struct rds_ib_send_work *send,
+                                            bool notify)
 {
        /*
         * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
        if (ic->i_unsignaled_wrs-- == 0 || notify) {
                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                send->s_wr.send_flags |= IB_SEND_SIGNALED;
+               return 1;
        }
+       return 0;
 }
 
 /*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        int bytes_sent = 0;
        int ret;
        int flow_controlled = 0;
+       int nr_sig = 0;
 
        BUG_ON(off % RDS_FRAG_SIZE);
        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
                        send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
+
                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        if (ic->i_flowctl && i < credit_alloc)
                rds_ib_send_add_credits(conn, credit_alloc - i);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        /* XXX need to worry about failed_wr and partial sends. */
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                if (prev->s_op) {
                        ic->i_data_op = prev->s_op;
                        prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        u32 pos;
        u32 work_alloc;
        int ret;
+       int nr_sig = 0;
 
        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                send->s_wr.wr.atomic.compare_add = op->op_swap_add;
                send->s_wr.wr.atomic.swap = 0;
        }
-       rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+       nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
        send->s_wr.num_sge = 1;
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
                 send->s_sge[0].addr, send->s_sge[0].length);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &send->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }
 
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
        int sent;
        int ret;
        int num_sge;
+       int nr_sig = 0;
 
        /* map the op the first time we see it */
        if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                send->s_queued = jiffies;
                send->s_op = NULL;
 
-               rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+               nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                work_alloc = i;
        }
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }