X-Git-Url: http://git.cascardo.info/?a=blobdiff_plain;ds=sidebyside;f=net%2Frds%2Fsend.c;h=0bc9db17a87dd216bcc5ae914dbae20d0953ac89;hb=27afe58fe60fbf71a25f1f592472c0e7b72b3502;hp=de5693cdcefb10fdd2cef374d2f5983ac392cccc;hpb=acfcd4d4ec4ed8cb504f96d4fabb7a94029b362b;p=cascardo%2Flinux.git diff --git a/net/rds/send.c b/net/rds/send.c index de5693cdcefb..0bc9db17a87d 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -52,8 +52,11 @@ static int send_batch_count = 64; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); +static void rds_send_remove_from_sock(struct list_head *messages, int status); + /* - * Reset the send state. Caller must hold c_send_lock when calling here. + * Reset the send state. Callers must ensure that this doesn't race with + * rds_send_xmit(). */ void rds_send_reset(struct rds_connection *conn) { @@ -61,14 +64,16 @@ void rds_send_reset(struct rds_connection *conn) unsigned long flags; if (conn->c_xmit_rm) { + rm = conn->c_xmit_rm; + conn->c_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's * no ongoing RDMA to/from that memory */ - rds_message_unmapped(conn->c_xmit_rm); - rds_message_put(conn->c_xmit_rm); - conn->c_xmit_rm = NULL; + rds_message_unmapped(rm); + rds_message_put(rm); } + conn->c_xmit_sg = 0; conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; @@ -91,6 +96,25 @@ void rds_send_reset(struct rds_connection *conn) spin_unlock_irqrestore(&conn->c_lock, flags); } +static int acquire_in_xmit(struct rds_connection *conn) +{ + return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; +} + +static void release_in_xmit(struct rds_connection *conn) +{ + clear_bit(RDS_IN_XMIT, &conn->c_flags); + smp_mb__after_clear_bit(); + /* + * We don't use wait_on_bit()/wake_up_bit() because our waking is in a + * hot path and finding waiters is very rare. We don't want to walk + * the system-wide hashed waitqueue buckets in the fast path only to + * almost never find waiters. + */ + if (waitqueue_active(&conn->c_waitq)) + wake_up_all(&conn->c_waitq); +} + /* * We're making the concious trade-off here to only send one message * down the connection at a time. @@ -115,8 +139,6 @@ int rds_send_xmit(struct rds_connection *conn) LIST_HEAD(to_be_dropped); restart: - if (!rds_conn_up(conn)) - goto out; /* * sendmsg calls here after having queued its message on the send @@ -125,12 +147,22 @@ restart: * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { + if (!acquire_in_xmit(conn)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; } + /* + * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, + * we do the opposite to avoid races. + */ + if (!rds_conn_up(conn)) { + release_in_xmit(conn); + ret = 0; + goto out; + } + if (conn->c_trans->xmit_prepare) conn->c_trans->xmit_prepare(conn); @@ -167,7 +199,7 @@ restart: if (!rm) { unsigned int len; - spin_lock(&conn->c_lock); + spin_lock_irqsave(&conn->c_lock, flags); if (!list_empty(&conn->c_send_queue)) { rm = list_entry(conn->c_send_queue.next, @@ -182,7 +214,7 @@ restart: list_move_tail(&rm->m_conn_item, &conn->c_retrans); } - spin_unlock(&conn->c_lock); + spin_unlock_irqrestore(&conn->c_lock, flags); if (!rm) break; @@ -196,10 +228,10 @@ restart: */ if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock(&conn->c_lock); + spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock(&conn->c_lock); + spin_unlock_irqrestore(&conn->c_lock, flags); continue; } @@ -325,19 +357,7 @@ restart: if (conn->c_trans->xmit_complete) conn->c_trans->xmit_complete(conn); - /* - * We might be racing with another sender who queued a message but - * backed off on noticing that we held the c_send_lock. If we check - * for queued messages after dropping the sem then either we'll - * see the queued message or the queuer will get the sem. If we - * notice the queued message then we trigger an immediate retry. - * - * We need to be careful only to do this when we stopped processing - * the send queue because it was empty. It's the only way we - * stop processing the loop when the transport hasn't taken - * responsibility for forward progress. - */ - spin_unlock_irqrestore(&conn->c_send_lock, flags); + release_in_xmit(conn); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -348,24 +368,22 @@ restart: } /* - * Other senders will see we have c_send_lock and exit. We - * need to recheck the send queue and race again for c_send_lock - * to make sure messages don't just sit on the send queue. + * Other senders can queue a message after we last test the send queue + * but before we clear RDS_IN_XMIT. In that case they'd back off and + * not try and send their newly queued message. We need to check the + * send queue after having cleared RDS_IN_XMIT so that their message + * doesn't get stuck on the send queue. * * If the transport cannot continue (i.e ret != 0), then it must * call us when more room is available, such as from the tx * completion handler. */ if (ret == 0) { - /* A simple bit test would be way faster than taking the - * spin lock */ - spin_lock_irqsave(&conn->c_lock, flags); + smp_mb(); if (!list_empty(&conn->c_send_queue)) { rds_stats_inc(s_send_lock_queue_raced); - spin_unlock_irqrestore(&conn->c_lock, flags); goto restart; } - spin_unlock_irqrestore(&conn->c_lock, flags); } out: return ret; @@ -392,35 +410,6 @@ static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; } -/* - * Returns true if there are no messages on the send and retransmit queues - * which have a sequence number greater than or equal to the given sequence - * number. - */ -int rds_send_acked_before(struct rds_connection *conn, u64 seq) -{ - struct rds_message *rm, *tmp; - int ret = 1; - - spin_lock(&conn->c_lock); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) - ret = 0; - break; - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) - ret = 0; - break; - } - - spin_unlock(&conn->c_lock); - - return ret; -} - /* * This is pretty similar to what happens below in the ACK * handling code - except that we call here as soon as we get @@ -568,7 +557,7 @@ EXPORT_SYMBOL_GPL(rds_send_get_message); * removing the messages from the 'messages' list regardless of if it found * the messages on the socket list or not. */ -void rds_send_remove_from_sock(struct list_head *messages, int status) +static void rds_send_remove_from_sock(struct list_head *messages, int status) { unsigned long flags; struct rds_sock *rs = NULL; @@ -856,6 +845,8 @@ static int rds_rm_size(struct msghdr *msg, int data_len) case RDS_CMSG_ATOMIC_CSWP: case RDS_CMSG_ATOMIC_FADD: + case RDS_CMSG_MASKED_ATOMIC_CSWP: + case RDS_CMSG_MASKED_ATOMIC_FADD: cmsg_groups |= 1; size += sizeof(struct scatterlist); break; @@ -907,6 +898,8 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, break; case RDS_CMSG_ATOMIC_CSWP: case RDS_CMSG_ATOMIC_FADD: + case RDS_CMSG_MASKED_ATOMIC_CSWP: + case RDS_CMSG_MASKED_ATOMIC_FADD: ret = rds_cmsg_atomic(rs, rm, cmsg); break; @@ -1024,13 +1017,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - /* If the connection is down, trigger a connect. We may - * have scheduled a delayed reconnect however - in this case - * we should not interfere. - */ - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + rds_conn_connect_if_down(conn); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { @@ -1110,13 +1097,7 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) rm->m_daddr = conn->c_faddr; rm->data.op_active = 1; - /* If the connection is down, trigger a connect. We may - * have scheduled a delayed reconnect however - in this case - * we should not interfere. - */ - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + rds_conn_connect_if_down(conn); ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); if (ret)