X-Git-Url: http://git.cascardo.info/?a=blobdiff_plain;f=net%2Frds%2Fsend.c;h=35b9c2e9caf1bc79235b9b43732d398a325d42f4;hb=d139ff0907dac9ef72fb2cf301e345bac3aec42f;hp=19dfd025498eadbd5560fddaf9b721b5de30bd19;hpb=e779137aa76d38d5c33a98ed887092ae4e4f016f;p=cascardo%2Flinux.git diff --git a/net/rds/send.c b/net/rds/send.c index 19dfd025498e..35b9c2e9caf1 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -37,7 +37,6 @@ #include #include "rds.h" -#include "rdma.h" /* When transmitting messages in rds_send_xmit, we need to emerge from * time to time and briefly release the CPU. Otherwise the softlock watchdog @@ -53,8 +52,11 @@ static int send_batch_count = 64; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); +static void rds_send_remove_from_sock(struct list_head *messages, int status); + /* - * Reset the send state. Caller must hold c_send_lock when calling here. + * Reset the send state. Callers must ensure that this doesn't race with + * rds_send_xmit(). */ void rds_send_reset(struct rds_connection *conn) { @@ -62,18 +64,22 @@ void rds_send_reset(struct rds_connection *conn) unsigned long flags; if (conn->c_xmit_rm) { + rm = conn->c_xmit_rm; + conn->c_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's * no ongoing RDMA to/from that memory */ - rds_message_unmapped(conn->c_xmit_rm); - rds_message_put(conn->c_xmit_rm); - conn->c_xmit_rm = NULL; + rds_message_unmapped(rm); + rds_message_put(rm); } + conn->c_xmit_sg = 0; conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; + conn->c_xmit_atomic_sent = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_data_sent = 0; conn->c_map_queued = 0; @@ -90,6 +96,25 @@ void rds_send_reset(struct rds_connection *conn) spin_unlock_irqrestore(&conn->c_lock, flags); } +static int acquire_in_xmit(struct rds_connection *conn) +{ + return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; +} + +static void release_in_xmit(struct rds_connection *conn) +{ + clear_bit(RDS_IN_XMIT, &conn->c_flags); + smp_mb__after_clear_bit(); + /* + * We don't use wait_on_bit()/wake_up_bit() because our waking is in a + * hot path and finding waiters is very rare. We don't want to walk + * the system-wide hashed waitqueue buckets in the fast path only to + * almost never find waiters. + */ + if (waitqueue_active(&conn->c_waitq)) + wake_up_all(&conn->c_waitq); +} + /* * We're making the concious trade-off here to only send one message * down the connection at a time. @@ -109,96 +134,63 @@ int rds_send_xmit(struct rds_connection *conn) struct rds_message *rm; unsigned long flags; unsigned int tmp; - unsigned int send_quota = send_batch_count; struct scatterlist *sg; int ret = 0; - int was_empty = 0; LIST_HEAD(to_be_dropped); +restart: + /* * sendmsg calls here after having queued its message on the send * queue. We only have one task feeding the connection at a time. If * another thread is already feeding the queue then we back off. This * avoids blocking the caller and trading per-connection data between * caches per message. - * - * The sem holder will issue a retry if they notice that someone queued - * a message after they stopped walking the send queue but before they - * dropped the sem. */ - if (!mutex_trylock(&conn->c_send_lock)) { - rds_stats_inc(s_send_sem_contention); + if (!acquire_in_xmit(conn)) { + rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; } + /* + * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, + * we do the opposite to avoid races. + */ + if (!rds_conn_up(conn)) { + release_in_xmit(conn); + ret = 0; + goto out; + } + if (conn->c_trans->xmit_prepare) conn->c_trans->xmit_prepare(conn); /* * spin trying to push headers and data down the connection until - * the connection doens't make forward progress. + * the connection doesn't make forward progress. */ - while (--send_quota) { - /* - * See if need to send a congestion map update if we're - * between sending messages. The send_sem protects our sole - * use of c_map_offset and _bytes. - * Note this is used only by transports that define a special - * xmit_cong_map function. For all others, we create allocate - * a cong_map message and treat it just like any other send. - */ - if (conn->c_map_bytes) { - ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, - conn->c_map_offset); - if (ret <= 0) - break; + while (1) { - conn->c_map_offset += ret; - conn->c_map_bytes -= ret; - if (conn->c_map_bytes) - continue; - } - - /* If we're done sending the current message, clear the - * offset and S/G temporaries. - */ rm = conn->c_xmit_rm; - if (rm && - conn->c_xmit_hdr_off == sizeof(struct rds_header) && - conn->c_xmit_sg == rm->data.m_nents) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - - /* Release the reference to the previous message. */ - rds_message_put(rm); - rm = NULL; - } - /* If we're asked to send a cong map update, do so. + /* + * If between sending messages, we can send a pending congestion + * map update. */ if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { - if (conn->c_trans->xmit_cong_map) { - conn->c_map_offset = 0; - conn->c_map_bytes = sizeof(struct rds_header) + - RDS_CONG_MAP_BYTES; - continue; - } - rm = rds_cong_update_alloc(conn); if (IS_ERR(rm)) { ret = PTR_ERR(rm); break; } + rm->data.op_active = 1; conn->c_xmit_rm = rm; } /* - * Grab the next message from the send queue, if there is one. + * If not already working on one, grab the next message. * * c_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the @@ -224,10 +216,8 @@ int rds_send_xmit(struct rds_connection *conn) spin_unlock_irqrestore(&conn->c_lock, flags); - if (!rm) { - was_empty = 1; + if (!rm) break; - } /* Unfortunately, the way Infiniband deals with * RDMA to a bad MR key is by moving the entire @@ -236,13 +226,12 @@ int rds_send_xmit(struct rds_connection *conn) * connection. * Therefore, we never retransmit messages with RDMA ops. */ - if (rm->rdma.m_rdma_op && + if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); spin_unlock_irqrestore(&conn->c_lock, flags); - rds_message_put(rm); continue; } @@ -263,23 +252,55 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } - /* - * Try and send an rdma message. Let's see if we can - * keep this simple and require that the transport either - * send the whole rdma or none of it. - */ - if (rm->rdma.m_rdma_op && !conn->c_xmit_rdma_sent) { - ret = conn->c_trans->xmit_rdma(conn, rm->rdma.m_rdma_op); + /* The transport either sends the whole rdma or none of it */ + if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + rm->m_final_op = &rm->rdma; + ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) break; conn->c_xmit_rdma_sent = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); } - if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->data.m_nents) { + if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + rm->m_final_op = &rm->atomic; + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + if (ret) + break; + conn->c_xmit_atomic_sent = 1; + + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); + } + + /* + * A number of cases require an RDS header to be sent + * even if there is no data. + * We permit 0-byte sends; rds-ping depends on this. + * However, if there are exclusively attached silent ops, + * we skip the hdr/data send, to enable silent operation. + */ + if (rm->data.op_nents == 0) { + int ops_present; + int all_ops_are_silent = 1; + + ops_present = (rm->atomic.op_active || rm->rdma.op_active); + if (rm->atomic.op_active && !rm->atomic.op_silent) + all_ops_are_silent = 0; + if (rm->rdma.op_active && !rm->rdma.op_silent) + all_ops_are_silent = 0; + + if (ops_present && all_ops_are_silent + && !rm->m_rdma_cookie) + rm->data.op_active = 0; + } + + if (rm->data.op_active && !conn->c_xmit_data_sent) { + rm->m_final_op = &rm->data; ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, @@ -295,7 +316,7 @@ int rds_send_xmit(struct rds_connection *conn) ret -= tmp; } - sg = &rm->data.m_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[conn->c_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - conn->c_xmit_data_off); @@ -306,49 +327,63 @@ int rds_send_xmit(struct rds_connection *conn) sg++; conn->c_xmit_sg++; BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.m_nents); + conn->c_xmit_sg == rm->data.op_nents); } } + + if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && + (conn->c_xmit_sg == rm->data.op_nents)) + conn->c_xmit_data_sent = 1; } - } - /* Nuke any messages we decided not to retransmit. */ - if (!list_empty(&to_be_dropped)) - rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); + /* + * A rm will only take multiple times through this loop + * if there is a data op. Thus, if the data is sent (or there was + * none), then we're done with the rm. + */ + if (!rm->data.op_active || conn->c_xmit_data_sent) { + conn->c_xmit_rm = NULL; + conn->c_xmit_sg = 0; + conn->c_xmit_hdr_off = 0; + conn->c_xmit_data_off = 0; + conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; + conn->c_xmit_data_sent = 0; + + rds_message_put(rm); + } + } if (conn->c_trans->xmit_complete) conn->c_trans->xmit_complete(conn); - /* - * We might be racing with another sender who queued a message but - * backed off on noticing that we held the c_send_lock. If we check - * for queued messages after dropping the sem then either we'll - * see the queued message or the queuer will get the sem. If we - * notice the queued message then we trigger an immediate retry. - * - * We need to be careful only to do this when we stopped processing - * the send queue because it was empty. It's the only way we - * stop processing the loop when the transport hasn't taken - * responsibility for forward progress. - */ - mutex_unlock(&conn->c_send_lock); + release_in_xmit(conn); - if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { - /* We exhausted the send quota, but there's work left to - * do. Return and (re-)schedule the send worker. - */ - ret = -EAGAIN; + /* Nuke any messages we decided not to retransmit. */ + if (!list_empty(&to_be_dropped)) { + /* irqs on here, so we can put(), unlike above */ + list_for_each_entry(rm, &to_be_dropped, m_conn_item) + rds_message_put(rm); + rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); } - if (ret == 0 && was_empty) { - /* A simple bit test would be way faster than taking the - * spin lock */ - spin_lock_irqsave(&conn->c_lock, flags); + /* + * Other senders can queue a message after we last test the send queue + * but before we clear RDS_IN_XMIT. In that case they'd back off and + * not try and send their newly queued message. We need to check the + * send queue after having cleared RDS_IN_XMIT so that their message + * doesn't get stuck on the send queue. + * + * If the transport cannot continue (i.e ret != 0), then it must + * call us when more room is available, such as from the tx + * completion handler. + */ + if (ret == 0) { + smp_mb(); if (!list_empty(&conn->c_send_queue)) { - rds_stats_inc(s_send_sem_queue_raced); - ret = -EAGAIN; + rds_stats_inc(s_send_lock_queue_raced); + goto restart; } - spin_unlock_irqrestore(&conn->c_lock, flags); } out: return ret; @@ -376,53 +411,60 @@ static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, } /* - * Returns true if there are no messages on the send and retransmit queues - * which have a sequence number greater than or equal to the given sequence - * number. + * This is pretty similar to what happens below in the ACK + * handling code - except that we call here as soon as we get + * the IB send completion on the RDMA op and the accompanying + * message. */ -int rds_send_acked_before(struct rds_connection *conn, u64 seq) +void rds_rdma_send_complete(struct rds_message *rm, int status) { - struct rds_message *rm, *tmp; - int ret = 1; + struct rds_sock *rs = NULL; + struct rm_rdma_op *ro; + struct rds_notifier *notifier; + unsigned long flags; - spin_lock(&conn->c_lock); + spin_lock_irqsave(&rm->m_rs_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) - ret = 0; - break; - } + ro = &rm->rdma; + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && + ro->op_active && ro->op_notify && ro->op_notifier) { + notifier = ro->op_notifier; + rs = rm->m_rs; + sock_hold(rds_rs_to_sk(rs)); - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) - ret = 0; - break; + notifier->n_status = status; + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + + ro->op_notifier = NULL; } - spin_unlock(&conn->c_lock); + spin_unlock_irqrestore(&rm->m_rs_lock, flags); - return ret; + if (rs) { + rds_wake_sk_sleep(rs); + sock_put(rds_rs_to_sk(rs)); + } } +EXPORT_SYMBOL_GPL(rds_rdma_send_complete); /* - * This is pretty similar to what happens below in the ACK - * handling code - except that we call here as soon as we get - * the IB send completion on the RDMA op and the accompanying - * message. + * Just like above, except looks at atomic op */ -void rds_rdma_send_complete(struct rds_message *rm, int status) +void rds_atomic_send_complete(struct rds_message *rm, int status) { struct rds_sock *rs = NULL; - struct rds_rdma_op *ro; + struct rm_atomic_op *ao; struct rds_notifier *notifier; unsigned long flags; spin_lock_irqsave(&rm->m_rs_lock, flags); - ro = rm->rdma.m_rdma_op; - if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && - ro && ro->r_notify && ro->r_notifier) { - notifier = ro->r_notifier; + ao = &rm->atomic; + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) + && ao->op_active && ao->op_notify && ao->op_notifier) { + notifier = ao->op_notifier; rs = rm->m_rs; sock_hold(rds_rs_to_sk(rs)); @@ -431,7 +473,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); spin_unlock(&rs->rs_lock); - ro->r_notifier = NULL; + ao->op_notifier = NULL; } spin_unlock_irqrestore(&rm->m_rs_lock, flags); @@ -441,7 +483,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) sock_put(rds_rs_to_sk(rs)); } } -EXPORT_SYMBOL_GPL(rds_rdma_send_complete); +EXPORT_SYMBOL_GPL(rds_atomic_send_complete); /* * This is the same as rds_rdma_send_complete except we @@ -449,15 +491,23 @@ EXPORT_SYMBOL_GPL(rds_rdma_send_complete); * socket, socket lock) and can just move the notifier. */ static inline void -__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) +__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) { - struct rds_rdma_op *ro; + struct rm_rdma_op *ro; + struct rm_atomic_op *ao; + + ro = &rm->rdma; + if (ro->op_active && ro->op_notify && ro->op_notifier) { + ro->op_notifier->n_status = status; + list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); + ro->op_notifier = NULL; + } - ro = rm->rdma.m_rdma_op; - if (ro && ro->r_notify && ro->r_notifier) { - ro->r_notifier->n_status = status; - list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); - ro->r_notifier = NULL; + ao = &rm->atomic; + if (ao->op_active && ao->op_notify && ao->op_notifier) { + ao->op_notifier->n_status = status; + list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); + ao->op_notifier = NULL; } /* No need to wake the app - caller does this */ @@ -469,7 +519,7 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status * So speed is not an issue here. */ struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rds_rdma_op *op) + struct rm_rdma_op *op) { struct rds_message *rm, *tmp, *found = NULL; unsigned long flags; @@ -477,7 +527,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, spin_lock_irqsave(&conn->c_lock, flags); list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (rm->rdma.m_rdma_op == op) { + if (&rm->rdma == op) { atomic_inc(&rm->m_refcount); found = rm; goto out; @@ -485,7 +535,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, } list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (rm->rdma.m_rdma_op == op) { + if (&rm->rdma == op) { atomic_inc(&rm->m_refcount); found = rm; break; @@ -507,7 +557,7 @@ EXPORT_SYMBOL_GPL(rds_send_get_message); * removing the messages from the 'messages' list regardless of if it found * the messages on the socket list or not. */ -void rds_send_remove_from_sock(struct list_head *messages, int status) +static void rds_send_remove_from_sock(struct list_head *messages, int status) { unsigned long flags; struct rds_sock *rs = NULL; @@ -545,19 +595,20 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) spin_lock(&rs->rs_lock); if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { - struct rds_rdma_op *ro = rm->rdma.m_rdma_op; + struct rm_rdma_op *ro = &rm->rdma; struct rds_notifier *notifier; list_del_init(&rm->m_sock_item); rds_send_sndbuf_remove(rs, rm); - if (ro && ro->r_notifier && (status || ro->r_notify)) { - notifier = ro->r_notifier; + if (ro->op_active && ro->op_notifier && + (ro->op_notify || (ro->op_recverr && status))) { + notifier = ro->op_notifier; list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); if (!notifier->n_status) notifier->n_status = status; - rm->rdma.m_rdma_op->r_notifier = NULL; + rm->rdma.op_notifier = NULL; } was_on_sock = 1; rm->m_rs = NULL; @@ -669,7 +720,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) spin_lock_irqsave(&rm->m_rs_lock, flags); spin_lock(&rs->rs_lock); - __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); + __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); rm->m_rs = NULL; @@ -758,6 +809,63 @@ out: return *queued; } +/* + * rds_message is getting to be quite complicated, and we'd like to allocate + * it all in one go. This figures out how big it needs to be up front. + */ +static int rds_rm_size(struct msghdr *msg, int data_len) +{ + struct cmsghdr *cmsg; + int size = 0; + int cmsg_groups = 0; + int retval; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + if (!CMSG_OK(msg, cmsg)) + return -EINVAL; + + if (cmsg->cmsg_level != SOL_RDS) + continue; + + switch (cmsg->cmsg_type) { + case RDS_CMSG_RDMA_ARGS: + cmsg_groups |= 1; + retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); + if (retval < 0) + return retval; + size += retval; + + break; + + case RDS_CMSG_RDMA_DEST: + case RDS_CMSG_RDMA_MAP: + cmsg_groups |= 2; + /* these are valid but do no add any size */ + break; + + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + case RDS_CMSG_MASKED_ATOMIC_CSWP: + case RDS_CMSG_MASKED_ATOMIC_FADD: + cmsg_groups |= 1; + size += sizeof(struct scatterlist); + break; + + default: + return -EINVAL; + } + + } + + size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + + /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ + if (cmsg_groups == 3) + return -EINVAL; + + return size; +} + static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, struct msghdr *msg, int *allocated_mr) { @@ -772,7 +880,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, continue; /* As a side effect, RDMA_DEST and RDMA_MAP will set - * rm->m_rdma_cookie and rm->m_rdma_mr. + * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. */ switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: @@ -788,6 +896,12 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, if (!ret) *allocated_mr = 1; break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + case RDS_CMSG_MASKED_ATOMIC_CSWP: + case RDS_CMSG_MASKED_ATOMIC_FADD: + ret = rds_cmsg_atomic(rs, rm, cmsg); + break; default: return -EINVAL; @@ -845,13 +959,30 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - rm = rds_message_copy_from_user(msg->msg_iov, payload_len); - if (IS_ERR(rm)) { - ret = PTR_ERR(rm); - rm = NULL; + /* size of rm including all sgs */ + ret = rds_rm_size(msg, payload_len); + if (ret < 0) + goto out; + + rm = rds_message_alloc(ret, GFP_KERNEL); + if (!rm) { + ret = -ENOMEM; goto out; } + /* Attach data to the rm */ + if (payload_len) { + rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + if (!rm->data.op_sg) { + ret = -ENOMEM; + goto out; + } + ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); + if (ret) + goto out; + } + rm->data.op_active = 1; + rm->m_daddr = daddr; /* rds_conn_create has a spinlock that runs with IRQ off. @@ -874,22 +1005,23 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, if (ret) goto out; - if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op) && - !conn->c_trans->xmit_rdma) { + if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma, conn->c_trans->xmit_rdma); ret = -EOPNOTSUPP; goto out; } - /* If the connection is down, trigger a connect. We may - * have scheduled a delayed reconnect however - in this case - * we should not interfere. - */ - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { + if (printk_ratelimit()) + printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", + &rm->atomic, conn->c_trans->xmit_atomic); + ret = -EOPNOTSUPP; + goto out; + } + + rds_conn_connect_if_down(conn); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { @@ -933,7 +1065,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, rds_stats_inc(s_send_queued); if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_worker(&conn->c_send_w.work); + rds_send_xmit(conn); rds_message_put(rm); return payload_len; @@ -967,14 +1099,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) } rm->m_daddr = conn->c_faddr; + rm->data.op_active = 1; - /* If the connection is down, trigger a connect. We may - * have scheduled a delayed reconnect however - in this case - * we should not interfere. - */ - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_conn_w, 0); + rds_conn_connect_if_down(conn); ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); if (ret) @@ -994,7 +1121,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) + rds_send_xmit(conn); + rds_message_put(rm); return 0;