module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
+static void rds_send_remove_from_sock(struct list_head *messages, int status);
+
/*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state. Callers must ensure that this doesn't race with
+ * rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
{
unsigned long flags;
if (conn->c_xmit_rm) {
+ rm = conn->c_xmit_rm;
+ conn->c_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
- rds_message_unmapped(conn->c_xmit_rm);
- rds_message_put(conn->c_xmit_rm);
- conn->c_xmit_rm = NULL;
+ rds_message_unmapped(rm);
+ rds_message_put(rm);
}
+
conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0;
spin_unlock_irqrestore(&conn->c_lock, flags);
}
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+ return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+ clear_bit(RDS_IN_XMIT, &conn->c_flags);
+ smp_mb__after_clear_bit();
+ /*
+ * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+ * hot path and finding waiters is very rare. We don't want to walk
+ * the system-wide hashed waitqueue buckets in the fast path only to
+ * almost never find waiters.
+ */
+ if (waitqueue_active(&conn->c_waitq))
+ wake_up_all(&conn->c_waitq);
+}
+
/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
struct rds_message *rm;
unsigned long flags;
unsigned int tmp;
- unsigned int send_quota = send_batch_count;
struct scatterlist *sg;
int ret = 0;
- int was_empty = 0;
LIST_HEAD(to_be_dropped);
+restart:
+
/*
* sendmsg calls here after having queued its message on the send
* queue. We only have one task feeding the connection at a time. If
* another thread is already feeding the queue then we back off. This
* avoids blocking the caller and trading per-connection data between
* caches per message.
- *
- * The sem holder will issue a retry if they notice that someone queued
- * a message after they stopped walking the send queue but before they
- * dropped the sem.
*/
- if (!mutex_trylock(&conn->c_send_lock)) {
- rds_stats_inc(s_send_sem_contention);
+ if (!acquire_in_xmit(conn)) {
+ rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
+ /*
+ * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+ * we do the opposite to avoid races.
+ */
+ if (!rds_conn_up(conn)) {
+ release_in_xmit(conn);
+ ret = 0;
+ goto out;
+ }
+
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
*/
- while (--send_quota) {
+ while (1) {
rm = conn->c_xmit_rm;
/*
* If between sending messages, we can send a pending congestion
* map update.
- *
- * Transports either define a special xmit_cong_map function,
- * or we allocate a cong_map message and treat it just like any
- * other send.
*/
if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
- if (conn->c_trans->xmit_cong_map) {
- unsigned long map_offset = 0;
- unsigned long map_bytes = sizeof(struct rds_header) +
- RDS_CONG_MAP_BYTES;
-
- while (map_bytes) {
- ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
- map_offset);
- if (ret <= 0) {
- /* too far down the rabbithole! */
- mutex_unlock(&conn->c_send_lock);
- rds_conn_error(conn, "Cong map xmit failed\n");
- goto out;
- }
-
- map_offset += ret;
- map_bytes -= ret;
- }
- } else {
- /* send cong update like a normal rm */
- rm = rds_cong_update_alloc(conn);
- if (IS_ERR(rm)) {
- ret = PTR_ERR(rm);
- break;
- }
- rm->data.op_active = 1;
-
- conn->c_xmit_rm = rm;
+ rm = rds_cong_update_alloc(conn);
+ if (IS_ERR(rm)) {
+ ret = PTR_ERR(rm);
+ break;
}
+ rm->data.op_active = 1;
+
+ conn->c_xmit_rm = rm;
}
/*
spin_unlock_irqrestore(&conn->c_lock, flags);
- if (!rm) {
- was_empty = 1;
+ if (!rm)
break;
- }
/* Unfortunately, the way Infiniband deals with
* RDMA to a bad MR key is by moving the entire
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
spin_unlock_irqrestore(&conn->c_lock, flags);
- rds_message_put(rm);
continue;
}
/* The transport either sends the whole rdma or none of it */
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+ rm->m_final_op = &rm->rdma;
ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
if (ret)
break;
}
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
- ret = conn->c_trans->xmit_atomic(conn, rm);
+ rm->m_final_op = &rm->atomic;
+ ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
if (ret)
break;
conn->c_xmit_atomic_sent = 1;
+
/* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags);
}
if (rm->data.op_active && !conn->c_xmit_data_sent) {
+ rm->m_final_op = &rm->data;
ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off,
conn->c_xmit_sg,
}
}
- /* Nuke any messages we decided not to retransmit. */
- if (!list_empty(&to_be_dropped))
- rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
-
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
- /*
- * We might be racing with another sender who queued a message but
- * backed off on noticing that we held the c_send_lock. If we check
- * for queued messages after dropping the sem then either we'll
- * see the queued message or the queuer will get the sem. If we
- * notice the queued message then we trigger an immediate retry.
- *
- * We need to be careful only to do this when we stopped processing
- * the send queue because it was empty. It's the only way we
- * stop processing the loop when the transport hasn't taken
- * responsibility for forward progress.
- */
- mutex_unlock(&conn->c_send_lock);
+ release_in_xmit(conn);
- if (send_quota == 0 && !was_empty) {
- /* We exhausted the send quota, but there's work left to
- * do. Return and (re-)schedule the send worker.
- */
- ret = -EAGAIN;
+ /* Nuke any messages we decided not to retransmit. */
+ if (!list_empty(&to_be_dropped)) {
+ /* irqs on here, so we can put(), unlike above */
+ list_for_each_entry(rm, &to_be_dropped, m_conn_item)
+ rds_message_put(rm);
+ rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- if (ret == 0 && was_empty) {
- /* A simple bit test would be way faster than taking the
- * spin lock */
- spin_lock_irqsave(&conn->c_lock, flags);
+ /*
+ * Other senders can queue a message after we last test the send queue
+ * but before we clear RDS_IN_XMIT. In that case they'd back off and
+ * not try and send their newly queued message. We need to check the
+ * send queue after having cleared RDS_IN_XMIT so that their message
+ * doesn't get stuck on the send queue.
+ *
+ * If the transport cannot continue (i.e ret != 0), then it must
+ * call us when more room is available, such as from the tx
+ * completion handler.
+ */
+ if (ret == 0) {
+ smp_mb();
if (!list_empty(&conn->c_send_queue)) {
- rds_stats_inc(s_send_sem_queue_raced);
- ret = -EAGAIN;
+ rds_stats_inc(s_send_lock_queue_raced);
+ goto restart;
}
- spin_unlock_irqrestore(&conn->c_lock, flags);
}
out:
return ret;
return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
}
-/*
- * Returns true if there are no messages on the send and retransmit queues
- * which have a sequence number greater than or equal to the given sequence
- * number.
- */
-int rds_send_acked_before(struct rds_connection *conn, u64 seq)
-{
- struct rds_message *rm, *tmp;
- int ret = 1;
-
- spin_lock(&conn->c_lock);
-
- list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
- if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq)
- ret = 0;
- break;
- }
-
- list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
- if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq)
- ret = 0;
- break;
- }
-
- spin_unlock(&conn->c_lock);
-
- return ret;
-}
-
/*
* This is pretty similar to what happens below in the ACK
* handling code - except that we call here as soon as we get
struct rds_sock *rs = NULL;
struct rm_atomic_op *ao;
struct rds_notifier *notifier;
+ unsigned long flags;
- spin_lock(&rm->m_rs_lock);
+ spin_lock_irqsave(&rm->m_rs_lock, flags);
ao = &rm->atomic;
if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
ao->op_notifier = NULL;
}
- spin_unlock(&rm->m_rs_lock);
+ spin_unlock_irqrestore(&rm->m_rs_lock, flags);
if (rs) {
rds_wake_sk_sleep(rs);
* removing the messages from the 'messages' list regardless of if it found
* the messages on the socket list or not.
*/
-void rds_send_remove_from_sock(struct list_head *messages, int status)
+static void rds_send_remove_from_sock(struct list_head *messages, int status)
{
unsigned long flags;
struct rds_sock *rs = NULL;
{
struct cmsghdr *cmsg;
int size = 0;
+ int cmsg_groups = 0;
int retval;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
switch (cmsg->cmsg_type) {
case RDS_CMSG_RDMA_ARGS:
+ cmsg_groups |= 1;
retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
if (retval < 0)
return retval;
size += retval;
+
break;
case RDS_CMSG_RDMA_DEST:
case RDS_CMSG_RDMA_MAP:
+ cmsg_groups |= 2;
/* these are valid but do no add any size */
break;
case RDS_CMSG_ATOMIC_CSWP:
case RDS_CMSG_ATOMIC_FADD:
+ case RDS_CMSG_MASKED_ATOMIC_CSWP:
+ case RDS_CMSG_MASKED_ATOMIC_FADD:
+ cmsg_groups |= 1;
size += sizeof(struct scatterlist);
break;
size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
+ /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
+ if (cmsg_groups == 3)
+ return -EINVAL;
+
return size;
}
break;
case RDS_CMSG_ATOMIC_CSWP:
case RDS_CMSG_ATOMIC_FADD:
+ case RDS_CMSG_MASKED_ATOMIC_CSWP:
+ case RDS_CMSG_MASKED_ATOMIC_FADD:
ret = rds_cmsg_atomic(rs, rm, cmsg);
break;
goto out;
}
- /* If the connection is down, trigger a connect. We may
- * have scheduled a delayed reconnect however - in this case
- * we should not interfere.
- */
- if (rds_conn_state(conn) == RDS_CONN_DOWN &&
- !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
- queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+ rds_conn_connect_if_down(conn);
ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
if (ret) {
rds_stats_inc(s_send_queued);
if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
- rds_send_worker(&conn->c_send_w.work);
+ rds_send_xmit(conn);
rds_message_put(rm);
return payload_len;
}
rm->m_daddr = conn->c_faddr;
+ rm->data.op_active = 1;
- /* If the connection is down, trigger a connect. We may
- * have scheduled a delayed reconnect however - in this case
- * we should not interfere.
- */
- if (rds_conn_state(conn) == RDS_CONN_DOWN &&
- !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
- queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+ rds_conn_connect_if_down(conn);
ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
if (ret)
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
+ rds_send_xmit(conn);
+
rds_message_put(rm);
return 0;