packet: rollover huge flows before small flows
[cascardo/linux.git] / net / packet / af_packet.c
index 5102c3c..8f0156b 100644 (file)
@@ -1234,27 +1234,86 @@ static void packet_free_pending(struct packet_sock *po)
        free_percpu(po->tx_ring.pending_refcnt);
 }
 
-static bool packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
+#define ROOM_POW_OFF   2
+#define ROOM_NONE      0x0
+#define ROOM_LOW       0x1
+#define ROOM_NORMAL    0x2
+
+static bool __tpacket_has_room(struct packet_sock *po, int pow_off)
+{
+       int idx, len;
+
+       len = po->rx_ring.frame_max + 1;
+       idx = po->rx_ring.head;
+       if (pow_off)
+               idx += len >> pow_off;
+       if (idx >= len)
+               idx -= len;
+       return packet_lookup_frame(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
+}
+
+static bool __tpacket_v3_has_room(struct packet_sock *po, int pow_off)
+{
+       int idx, len;
+
+       len = po->rx_ring.prb_bdqc.knum_blocks;
+       idx = po->rx_ring.prb_bdqc.kactive_blk_num;
+       if (pow_off)
+               idx += len >> pow_off;
+       if (idx >= len)
+               idx -= len;
+       return prb_lookup_block(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
+}
+
+static int __packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
 {
        struct sock *sk = &po->sk;
+       int ret = ROOM_NONE;
+
+       if (po->prot_hook.func != tpacket_rcv) {
+               int avail = sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc)
+                                         - (skb ? skb->truesize : 0);
+               if (avail > (sk->sk_rcvbuf >> ROOM_POW_OFF))
+                       return ROOM_NORMAL;
+               else if (avail > 0)
+                       return ROOM_LOW;
+               else
+                       return ROOM_NONE;
+       }
+
+       if (po->tp_version == TPACKET_V3) {
+               if (__tpacket_v3_has_room(po, ROOM_POW_OFF))
+                       ret = ROOM_NORMAL;
+               else if (__tpacket_v3_has_room(po, 0))
+                       ret = ROOM_LOW;
+       } else {
+               if (__tpacket_has_room(po, ROOM_POW_OFF))
+                       ret = ROOM_NORMAL;
+               else if (__tpacket_has_room(po, 0))
+                       ret = ROOM_LOW;
+       }
+
+       return ret;
+}
+
+static int packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
+{
+       int ret;
        bool has_room;
 
-       if (po->prot_hook.func != tpacket_rcv)
-               return (atomic_read(&sk->sk_rmem_alloc) + skb->truesize)
-                       <= sk->sk_rcvbuf;
+       if (po->prot_hook.func == tpacket_rcv) {
+               spin_lock(&po->sk.sk_receive_queue.lock);
+               ret = __packet_rcv_has_room(po, skb);
+               spin_unlock(&po->sk.sk_receive_queue.lock);
+       } else {
+               ret = __packet_rcv_has_room(po, skb);
+       }
 
-       spin_lock(&sk->sk_receive_queue.lock);
-       if (po->tp_version == TPACKET_V3)
-               has_room = prb_lookup_block(po, &po->rx_ring,
-                                           po->rx_ring.prb_bdqc.kactive_blk_num,
-                                           TP_STATUS_KERNEL);
-       else
-               has_room = packet_lookup_frame(po, &po->rx_ring,
-                                              po->rx_ring.head,
-                                              TP_STATUS_KERNEL);
-       spin_unlock(&sk->sk_receive_queue.lock);
+       has_room = ret == ROOM_NORMAL;
+       if (po->pressure == has_room)
+               xchg(&po->pressure, !has_room);
 
-       return has_room;
+       return ret;
 }
 
 static void packet_sock_destruct(struct sock *sk)
@@ -1282,6 +1341,20 @@ static int fanout_rr_next(struct packet_fanout *f, unsigned int num)
        return x;
 }
 
+static bool fanout_flow_is_huge(struct packet_sock *po, struct sk_buff *skb)
+{
+       u32 rxhash;
+       int i, count = 0;
+
+       rxhash = skb_get_hash(skb);
+       for (i = 0; i < ROLLOVER_HLEN; i++)
+               if (po->rollover->history[i] == rxhash)
+                       count++;
+
+       po->rollover->history[prandom_u32() % ROLLOVER_HLEN] = rxhash;
+       return count > (ROLLOVER_HLEN >> 1);
+}
+
 static unsigned int fanout_demux_hash(struct packet_fanout *f,
                                      struct sk_buff *skb,
                                      unsigned int num)
@@ -1318,18 +1391,31 @@ static unsigned int fanout_demux_rnd(struct packet_fanout *f,
 
 static unsigned int fanout_demux_rollover(struct packet_fanout *f,
                                          struct sk_buff *skb,
-                                         unsigned int idx, unsigned int skip,
+                                         unsigned int idx, bool try_self,
                                          unsigned int num)
 {
-       unsigned int i, j;
+       struct packet_sock *po, *po_next;
+       unsigned int i, j, room;
 
-       i = j = min_t(int, f->next[idx], num - 1);
+       po = pkt_sk(f->arr[idx]);
+
+       if (try_self) {
+               room = packet_rcv_has_room(po, skb);
+               if (room == ROOM_NORMAL ||
+                   (room == ROOM_LOW && !fanout_flow_is_huge(po, skb)))
+                       return idx;
+       }
+
+       i = j = min_t(int, po->rollover->sock, num - 1);
        do {
-               if (i != skip && packet_rcv_has_room(pkt_sk(f->arr[i]), skb)) {
+               po_next = pkt_sk(f->arr[i]);
+               if (po_next != po && !po_next->pressure &&
+                   packet_rcv_has_room(po_next, skb) == ROOM_NORMAL) {
                        if (i != j)
-                               f->next[idx] = i;
+                               po->rollover->sock = i;
                        return i;
                }
+
                if (++i == num)
                        i = 0;
        } while (i != j);
@@ -1386,17 +1472,14 @@ static int packet_rcv_fanout(struct sk_buff *skb, struct net_device *dev,
                idx = fanout_demux_qm(f, skb, num);
                break;
        case PACKET_FANOUT_ROLLOVER:
-               idx = fanout_demux_rollover(f, skb, 0, (unsigned int) -1, num);
+               idx = fanout_demux_rollover(f, skb, 0, false, num);
                break;
        }
 
-       po = pkt_sk(f->arr[idx]);
-       if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER) &&
-           unlikely(!packet_rcv_has_room(po, skb))) {
-               idx = fanout_demux_rollover(f, skb, idx, idx, num);
-               po = pkt_sk(f->arr[idx]);
-       }
+       if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER))
+               idx = fanout_demux_rollover(f, skb, idx, true, num);
 
+       po = pkt_sk(f->arr[idx]);
        return po->prot_hook.func(skb, dev, &po->prot_hook, orig_dev);
 }
 
@@ -1467,6 +1550,12 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
        if (po->fanout)
                return -EALREADY;
 
+       if (type_flags & PACKET_FANOUT_FLAG_ROLLOVER) {
+               po->rollover = kzalloc(sizeof(*po->rollover), GFP_KERNEL);
+               if (!po->rollover)
+                       return -ENOMEM;
+       }
+
        mutex_lock(&fanout_mutex);
        match = NULL;
        list_for_each_entry(f, &fanout_list, list) {
@@ -1515,6 +1604,10 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
        }
 out:
        mutex_unlock(&fanout_mutex);
+       if (err) {
+               kfree(po->rollover);
+               po->rollover = NULL;
+       }
        return err;
 }
 
@@ -1536,6 +1629,8 @@ static void fanout_release(struct sock *sk)
                kfree(f);
        }
        mutex_unlock(&fanout_mutex);
+
+       kfree(po->rollover);
 }
 
 static const struct proto_ops packet_ops;
@@ -2311,11 +2406,14 @@ static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
                tlen = dev->needed_tailroom;
                skb = sock_alloc_send_skb(&po->sk,
                                hlen + tlen + sizeof(struct sockaddr_ll),
-                               0, &err);
+                               !need_wait, &err);
 
-               if (unlikely(skb == NULL))
+               if (unlikely(skb == NULL)) {
+                       /* we assume the socket was initially writeable ... */
+                       if (likely(len_sum > 0))
+                               err = len_sum;
                        goto out_status;
-
+               }
                tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
                                          addr, hlen);
                if (tp_len > dev->mtu + dev->hard_header_len) {
@@ -2832,7 +2930,7 @@ static int packet_create(struct net *net, struct socket *sock, int protocol,
        sock->state = SS_UNCONNECTED;
 
        err = -ENOBUFS;
-       sk = sk_alloc(net, PF_PACKET, GFP_KERNEL, &packet_proto);
+       sk = sk_alloc(net, PF_PACKET, GFP_KERNEL, &packet_proto, kern);
        if (sk == NULL)
                goto out;
 
@@ -2862,6 +2960,7 @@ static int packet_create(struct net *net, struct socket *sock, int protocol,
 
        spin_lock_init(&po->bind_lock);
        mutex_init(&po->pg_vec_lock);
+       po->rollover = NULL;
        po->prot_hook.func = packet_rcv;
 
        if (sock->type == SOCK_PACKET)
@@ -2939,6 +3038,9 @@ static int packet_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
        if (skb == NULL)
                goto out;
 
+       if (pkt_sk(sk)->pressure)
+               packet_rcv_has_room(pkt_sk(sk), NULL);
+
        if (pkt_sk(sk)->has_vnet_hdr) {
                struct virtio_net_hdr vnet_hdr = { 0 };
 
@@ -3694,6 +3796,8 @@ static unsigned int packet_poll(struct file *file, struct socket *sock,
                        TP_STATUS_KERNEL))
                        mask |= POLLIN | POLLRDNORM;
        }
+       if (po->pressure && __packet_rcv_has_room(po, NULL) == ROOM_NORMAL)
+               xchg(&po->pressure, 0);
        spin_unlock_bh(&sk->sk_receive_queue.lock);
        spin_lock_bh(&sk->sk_write_queue.lock);
        if (po->tx_ring.pg_vec) {