dpif-netdev: Allow different numbers of rx queues for different ports.
[cascardo/ovs.git] / lib / dpif-netdev.c
index 9d399a0..1b9793b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -221,9 +221,7 @@ struct dp_netdev {
      * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
     ovsthread_key_t per_pmd_key;
 
-    /* Number of rx queues for each dpdk interface and the cpu mask
-     * for pin of pmd threads. */
-    size_t n_dpdk_rxqs;
+    /* Cpu mask for pin of pmd threads. */
     char *pmd_cmask;
     uint64_t last_tnl_conf_seq;
 };
@@ -254,6 +252,8 @@ struct dp_netdev_port {
     struct netdev_rxq **rxq;
     struct ovs_refcount ref_cnt;
     char *type;                 /* Port type as requested by user. */
+    int latest_requested_n_rxq; /* Latest requested from netdev number
+                                   of rx queues. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -478,7 +478,9 @@ static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       const struct nlattr *actions,
                                       size_t actions_len);
 static void dp_netdev_input(struct dp_netdev_pmd_thread *,
-                            struct dp_packet **, int cnt);
+                            struct dp_packet **, int cnt, odp_port_t port_no);
+static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
+                                  struct dp_packet **, int cnt);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
@@ -864,7 +866,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovsthread_key_create(&dp->per_pmd_key, NULL);
 
     dp_netdev_set_nonpmd(dp);
-    dp->n_dpdk_rxqs = NR_QUEUE;
 
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -927,15 +928,16 @@ dp_netdev_free(struct dp_netdev *dp)
     shash_find_and_delete(&dp_netdevs, dp->name);
 
     dp_netdev_destroy_all_pmds(dp);
-    cmap_destroy(&dp->poll_threads);
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
     ovs_mutex_lock(&dp->port_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
+        /* PMD threads are destroyed here. do_del_port() cannot quiesce */
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
+    cmap_destroy(&dp->poll_threads);
 
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
@@ -1091,7 +1093,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         /* There can only be ovs_numa_get_n_cores() pmd threads,
          * so creates a txq for each, and one extra for the non
          * pmd threads. */
-        error = netdev_set_multiq(netdev, n_cores + 1, dp->n_dpdk_rxqs);
+        error = netdev_set_multiq(netdev, n_cores + 1,
+                                  netdev_requested_n_rxq(netdev));
         if (error && (error != EOPNOTSUPP)) {
             VLOG_ERR("%s, cannot set multiq", devname);
             return errno;
@@ -1102,6 +1105,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     port->netdev = netdev;
     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
     port->type = xstrdup(type);
+    port->latest_requested_n_rxq = netdev_requested_n_rxq(netdev);
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
         error = netdev_rxq_open(netdev, &port->rxq[i], i);
         if (error
@@ -2405,32 +2409,42 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
 /* Returns true if the configuration for rx queues or cpu mask
  * is changed. */
 static bool
-pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
+pmd_config_changed(const struct dp_netdev *dp, const char *cmask)
 {
-    if (dp->n_dpdk_rxqs != rxqs) {
-        return true;
-    } else {
-        if (dp->pmd_cmask != NULL && cmask != NULL) {
-            return strcmp(dp->pmd_cmask, cmask);
-        } else {
-            return (dp->pmd_cmask != NULL || cmask != NULL);
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        struct netdev *netdev = port->netdev;
+        int requested_n_rxq = netdev_requested_n_rxq(netdev);
+        if (netdev_is_pmd(netdev)
+            && port->latest_requested_n_rxq != requested_n_rxq) {
+            return true;
         }
     }
+
+    if (dp->pmd_cmask != NULL && cmask != NULL) {
+        return strcmp(dp->pmd_cmask, cmask);
+    } else {
+        return (dp->pmd_cmask != NULL || cmask != NULL);
+    }
 }
 
 /* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
 static int
-dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
+dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
-    if (pmd_config_changed(dp, n_rxqs, cmask)) {
+    if (pmd_config_changed(dp, cmask)) {
         struct dp_netdev_port *port;
 
         dp_netdev_destroy_all_pmds(dp);
 
         CMAP_FOR_EACH (port, node, &dp->ports) {
-            if (netdev_is_pmd(port->netdev)) {
+            struct netdev *netdev = port->netdev;
+            int requested_n_rxq = netdev_requested_n_rxq(netdev);
+            if (netdev_is_pmd(port->netdev)
+                && port->latest_requested_n_rxq != requested_n_rxq) {
                 int i, err;
 
                 /* Closes the existing 'rxq's. */
@@ -2442,14 +2456,14 @@ dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
                 /* Sets the new rx queue config.  */
                 err = netdev_set_multiq(port->netdev,
                                         ovs_numa_get_n_cores() + 1,
-                                        n_rxqs);
+                                        requested_n_rxq);
                 if (err && (err != EOPNOTSUPP)) {
                     VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
                              " %u", netdev_get_name(port->netdev),
-                             n_rxqs);
+                             requested_n_rxq);
                     return err;
                 }
-
+                port->latest_requested_n_rxq = requested_n_rxq;
                 /* If the set_multiq() above succeeds, reopens the 'rxq's. */
                 port->rxq = xrealloc(port->rxq, sizeof *port->rxq
                                      * netdev_n_rxq(port->netdev));
@@ -2458,8 +2472,6 @@ dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
                 }
             }
         }
-        dp->n_dpdk_rxqs = n_rxqs;
-
         /* Reconfigures the cpu mask. */
         ovs_numa_set_cpu_mask(cmask);
         free(dp->pmd_cmask);
@@ -2555,16 +2567,10 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     error = netdev_rxq_recv(rxq, packets, &cnt);
     cycles_count_end(pmd, PMD_CYCLES_POLLING);
     if (!error) {
-        int i;
-
         *recirc_depth_get() = 0;
 
-        /* XXX: initialize md in netdev implementation. */
-        for (i = 0; i < cnt; i++) {
-            pkt_metadata_init(&packets[i]->md, port->port_no);
-        }
         cycles_count_start(pmd);
-        dp_netdev_input(pmd, packets, cnt);
+        dp_netdev_input(pmd, packets, cnt, port->port_no);
         cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -2914,10 +2920,24 @@ static void
 dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
+    size_t k = 0, n_pmds;
+
+    n_pmds = cmap_count(&dp->poll_threads);
+    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        dp_netdev_del_pmd(dp, pmd);
+        /* We cannot call dp_netdev_del_pmd(), since it alters
+         * 'dp->poll_threads' (while we're iterating it) and it
+         * might quiesce. */
+        ovs_assert(k < n_pmds);
+        pmd_list[k++] = pmd;
+    }
+
+    for (size_t i = 0; i < k; i++) {
+        dp_netdev_del_pmd(dp, pmd_list[i]);
     }
+    free(pmd_list);
 }
 
 /* Deletes all pmd threads on numa node 'numa_id' and
@@ -2928,18 +2948,28 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
     struct dp_netdev_pmd_thread *pmd;
     int n_pmds_on_numa, n_pmds;
     int *free_idx, k = 0;
+    struct dp_netdev_pmd_thread **pmd_list;
 
     n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
-    free_idx = xmalloc(n_pmds_on_numa * sizeof *free_idx);
+    free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
+    pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /* We cannot call dp_netdev_del_pmd(), since it alters
+         * 'dp->poll_threads' (while we're iterating it) and it
+         * might quiesce. */
         if (pmd->numa_id == numa_id) {
             atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
+            pmd_list[k] = pmd;
+            ovs_assert(k < n_pmds_on_numa);
             k++;
-            dp_netdev_del_pmd(dp, pmd);
         }
     }
 
+    for (int i = 0; i < k; i++) {
+        dp_netdev_del_pmd(dp, pmd_list[i]);
+    }
+
     n_pmds = get_n_pmd_threads(dp);
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         int old_tx_qid;
@@ -2953,6 +2983,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
         }
     }
 
+    free(pmd_list);
     free(free_idx);
 }
 
@@ -3275,69 +3306,70 @@ dp_netdev_queue_batches(struct dp_packet *pkt,
 {
     struct packet_batch *batch = flow->batch;
 
-    if (OVS_LIKELY(batch)) {
-        packet_batch_update(batch, pkt, mf);
-        return;
+    if (OVS_UNLIKELY(!batch)) {
+        batch = &batches[(*n_batches)++];
+        packet_batch_init(batch, flow);
     }
 
-    batch = &batches[(*n_batches)++];
-    packet_batch_init(batch, flow);
     packet_batch_update(batch, pkt, mf);
 }
 
-static inline void
-dp_packet_swap(struct dp_packet **a, struct dp_packet **b)
-{
-    struct dp_packet *tmp = *a;
-    *a = *b;
-    *b = tmp;
-}
-
 /* Try to process all ('cnt') the 'packets' using only the exact match cache
- * 'flow_cache'. If a flow is not found for a packet 'packets[i]', the
+ * 'pmd->flow_cache'. If a flow is not found for a packet 'packets[i]', the
  * miniflow is copied into 'keys' and the packet pointer is moved at the
  * beginning of the 'packets' array.
  *
  * The function returns the number of packets that needs to be processed in the
  * 'packets' array (they have been moved to the beginning of the vector).
+ *
+ * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must be
+ * initialized by this function using 'port_no'.
  */
 static inline size_t
 emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet **packets,
                size_t cnt, struct netdev_flow_key *keys,
-               struct packet_batch batches[], size_t *n_batches)
+               struct packet_batch batches[], size_t *n_batches,
+               bool md_is_valid, odp_port_t port_no)
 {
     struct emc_cache *flow_cache = &pmd->flow_cache;
-    struct netdev_flow_key key;
+    struct netdev_flow_key *key = &keys[0];
     size_t i, n_missed = 0, n_dropped = 0;
 
     for (i = 0; i < cnt; i++) {
         struct dp_netdev_flow *flow;
+        struct dp_packet *packet = packets[i];
 
-        if (OVS_UNLIKELY(dp_packet_size(packets[i]) < ETH_HEADER_LEN)) {
-            dp_packet_delete(packets[i]);
+        if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
+            dp_packet_delete(packet);
             n_dropped++;
             continue;
         }
 
         if (i != cnt - 1) {
-            /* Prefetch next packet data */
+            /* Prefetch next packet data and metadata. */
             OVS_PREFETCH(dp_packet_data(packets[i+1]));
+            pkt_metadata_prefetch_init(&packets[i+1]->md);
         }
 
-        miniflow_extract(packets[i], &key.mf);
-        key.len = 0; /* Not computed yet. */
-        key.hash = dpif_netdev_packet_get_rss_hash(packets[i], &key.mf);
+        if (!md_is_valid) {
+            pkt_metadata_init(&packet->md, port_no);
+        }
+        miniflow_extract(packet, &key->mf);
+        key->len = 0; /* Not computed yet. */
+        key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
 
-        flow = emc_lookup(flow_cache, &key);
+        flow = emc_lookup(flow_cache, key);
         if (OVS_LIKELY(flow)) {
-            dp_netdev_queue_batches(packets[i], flow, &key.mf, batches,
+            dp_netdev_queue_batches(packet, flow, &key->mf, batches,
                                     n_batches);
         } else {
-            if (i != n_missed) {
-                dp_packet_swap(&packets[i], &packets[n_missed]);
-            }
-
-            keys[n_missed++] = key;
+            /* Exact match cache missed. Group missed packets together at
+             * the beginning of the 'packets' array.  */
+            packets[n_missed] = packet;
+            /* 'key[n_missed]' contains the key of the current packet and it
+             * must be returned to the caller. The next key should be extracted
+             * to 'keys[n_missed + 1]'. */
+            key = &keys[++n_missed];
         }
     }
 
@@ -3485,9 +3517,16 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
     dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt);
 }
 
+/* Packets enter the datapath from a port (or from recirculation) here.
+ *
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets': in this case 'mdinit' is false and this function needs to
+ * initialize it using 'port_no'.  If the metadata in 'packets' is already
+ * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
 static void
-dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
-                struct dp_packet **packets, int cnt)
+dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
+                  struct dp_packet **packets, int cnt,
+                  bool md_is_valid, odp_port_t port_no)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
     const size_t PKT_ARRAY_SIZE = cnt;
@@ -3501,7 +3540,8 @@ dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
     size_t newcnt, n_batches, i;
 
     n_batches = 0;
-    newcnt = emc_processing(pmd, packets, cnt, keys, batches, &n_batches);
+    newcnt = emc_processing(pmd, packets, cnt, keys, batches, &n_batches,
+                            md_is_valid, port_no);
     if (OVS_UNLIKELY(newcnt)) {
         fast_path_processing(pmd, packets, newcnt, keys, batches, &n_batches);
     }
@@ -3515,6 +3555,21 @@ dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
     }
 }
 
+static void
+dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
+                struct dp_packet **packets, int cnt,
+                odp_port_t port_no)
+{
+     dp_netdev_input__(pmd, packets, cnt, false, port_no);
+}
+
+static void
+dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_packet **packets, int cnt)
+{
+     dp_netdev_input__(pmd, packets, cnt, true, 0);
+}
+
 struct dp_netdev_execute_aux {
     struct dp_netdev_pmd_thread *pmd;
 };
@@ -3618,7 +3673,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
             err = push_tnl_action(dp, a, packets, cnt);
             if (!err) {
                 (*depth)++;
-                dp_netdev_input(pmd, packets, cnt);
+                dp_netdev_recirculate(pmd, packets, cnt);
                 (*depth)--;
             } else {
                 dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
@@ -3649,7 +3704,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
                     }
 
                     (*depth)++;
-                    dp_netdev_input(pmd, packets, cnt);
+                    dp_netdev_recirculate(pmd, packets, cnt);
                     (*depth)--;
                 } else {
                     dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
@@ -3707,7 +3762,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
             }
 
             (*depth)++;
-            dp_netdev_input(pmd, packets, cnt);
+            dp_netdev_recirculate(pmd, packets, cnt);
             (*depth)--;
 
             return;