Merge branch 'for_linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jack/linux-fs
[cascardo/linux.git] / net / rds / send.c
index e9430f5..4df61a5 100644 (file)
@@ -282,26 +282,34 @@ restart:
                /* The transport either sends the whole rdma or none of it */
                if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
                        rm->m_final_op = &rm->rdma;
+                       /* The transport owns the mapped memory for now.
+                        * You can't unmap it while it's on the send queue
+                        */
+                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
-                       if (ret)
+                       if (ret) {
+                               clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+                               wake_up_interruptible(&rm->m_flush_wait);
                                break;
+                       }
                        conn->c_xmit_rdma_sent = 1;
 
-                       /* The transport owns the mapped memory for now.
-                        * You can't unmap it while it's on the send queue */
-                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
                if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
                        rm->m_final_op = &rm->atomic;
+                       /* The transport owns the mapped memory for now.
+                        * You can't unmap it while it's on the send queue
+                        */
+                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
-                       if (ret)
+                       if (ret) {
+                               clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+                               wake_up_interruptible(&rm->m_flush_wait);
                                break;
+                       }
                        conn->c_xmit_atomic_sent = 1;
 
-                       /* The transport owns the mapped memory for now.
-                        * You can't unmap it while it's on the send queue */
-                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
                /*
@@ -411,7 +419,8 @@ over_batch:
         */
        if (ret == 0) {
                smp_mb();
-               if (!list_empty(&conn->c_send_queue) &&
+               if ((test_bit(0, &conn->c_map_queued) ||
+                    !list_empty(&conn->c_send_queue)) &&
                    send_gen == conn->c_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
                        goto restart;
@@ -769,8 +778,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
        while (!list_empty(&list)) {
                rm = list_entry(list.next, struct rds_message, m_sock_item);
                list_del_init(&rm->m_sock_item);
-
                rds_message_wait(rm);
+
+               /* just in case the code above skipped this message
+                * because RDS_MSG_ON_CONN wasn't set, run it again here
+                * taking m_rs_lock is the only thing that keeps us
+                * from racing with ack processing.
+                */
+               spin_lock_irqsave(&rm->m_rs_lock, flags);
+
+               spin_lock(&rs->rs_lock);
+               __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
+               spin_unlock(&rs->rs_lock);
+
+               rm->m_rs = NULL;
+               spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
                rds_message_put(rm);
        }
 }
@@ -992,6 +1015,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                goto out;
        }
 
+       if (payload_len > rds_sk_sndbuf(rs)) {
+               ret = -EMSGSIZE;
+               goto out;
+       }
+
        /* size of rm including all sgs */
        ret = rds_rm_size(msg, payload_len);
        if (ret < 0)
@@ -1023,7 +1051,8 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
                conn = rs->rs_conn;
        else {
-               conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr,
+               conn = rds_conn_create_outgoing(sock_net(sock->sk),
+                                               rs->rs_bound_addr, daddr,
                                        rs->rs_transport,
                                        sock->sk->sk_allocation);
                if (IS_ERR(conn)) {
@@ -1063,11 +1092,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
                                  dport, &queued)) {
                rds_stats_inc(s_send_queue_full);
-               /* XXX make sure this is reasonable */
-               if (payload_len > rds_sk_sndbuf(rs)) {
-                       ret = -EMSGSIZE;
-                       goto out;
-               }
+
                if (nonblock) {
                        ret = -EAGAIN;
                        goto out;