dpif_linux_recv,
dpif_linux_recv_wait,
dpif_linux_recv_purge,
+ NULL, /* register_upcall_cb */
+ NULL, /* enable_upcall */
+ NULL, /* disable_upcall */
};
\f
static int
/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
-/* Queues. */
-enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */
-enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
-BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
-
/* Protects against changes to 'dp_netdevs'. */
static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
= SHASH_INITIALIZER(&dp_netdevs);
-struct dp_netdev_upcall {
- struct dpif_upcall upcall; /* Queued upcall information. */
- struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */
-};
-
-/* A queue passing packets from a struct dp_netdev to its clients (handlers).
- *
- *
- * Thread-safety
- * =============
- *
- * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
- * its own mutex. */
struct dp_netdev_queue {
- struct ovs_mutex mutex;
- struct seq *seq; /* Incremented whenever a packet is queued. */
- struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
- unsigned int head OVS_GUARDED;
- unsigned int tail OVS_GUARDED;
+ unsigned int packet_count;
+
+ struct dpif_upcall upcalls[NETDEV_MAX_RX_BATCH];
+ struct ofpbuf bufs[NETDEV_MAX_RX_BATCH];
};
+#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 }
+
/* Datapath based on the network device interface from netdev.h.
*
*
* dp_netdev_mutex (global)
* port_mutex
* flow_mutex
- * queue_rwlock
*/
struct dp_netdev {
const struct dpif_class *const class;
const char *const name;
+ struct dpif *dpif;
struct ovs_refcount ref_cnt;
atomic_flag destroyed;
struct classifier cls;
struct cmap flow_table OVS_GUARDED; /* Flow table. */
- /* Queues.
- *
- * 'queue_rwlock' protects the modification of 'handler_queues' and
- * 'n_handlers'. The queue elements are protected by its
- * 'handler_queues''s mutex. */
- struct fat_rwlock queue_rwlock;
- struct dp_netdev_queue *handler_queues;
- uint32_t n_handlers;
-
/* Statistics.
*
* ovsthread_stats is internally synchronized. */
struct cmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
+ /* Protects access to ofproto-dpif-upcall interface during revalidator
+ * thread synchronization. */
+ struct fat_rwlock upcall_rwlock;
+ exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */
+
/* Forwarding threads. */
struct latch exit_latch;
struct pmd_thread *pmd_threads;
OVS_REQUIRES(dp->port_mutex);
static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
OVS_REQUIRES(dp->port_mutex);
-static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
- OVS_REQ_WRLOCK(dp->queue_rwlock);
static int dpif_netdev_open(const struct dpif_class *, const char *name,
bool create, struct dpif **);
-static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
- int queue_no, int type,
- const struct miniflow *,
- const struct nlattr *userdata);
+static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *,
+ struct ofpbuf *, int type,
+ const struct miniflow *,
+ const struct nlattr *);
+static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *,
+ struct dp_netdev *);
static void dp_netdev_execute_actions(struct dp_netdev *dp,
struct dpif_packet **, int c,
bool may_steal, struct pkt_metadata *,
odp_port_t port_no);
static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
+static void dp_netdev_disable_upcall(struct dp_netdev *);
static struct dpif_netdev *
dpif_netdev_cast(const struct dpif *dpif)
classifier_init(&dp->cls, NULL);
cmap_init(&dp->flow_table);
- fat_rwlock_init(&dp->queue_rwlock);
-
ovsthread_stats_init(&dp->stats);
ovs_mutex_init(&dp->port_mutex);
cmap_init(&dp->ports);
dp->port_seq = seq_create();
latch_init(&dp->exit_latch);
+ fat_rwlock_init(&dp->upcall_rwlock);
+
+ /* Disable upcalls by default. */
+ dp_netdev_disable_upcall(dp);
+ dp->upcall_cb = NULL;
ovs_mutex_lock(&dp->port_mutex);
error = do_add_port(dp, name, "internal", ODPP_LOCAL);
}
if (!error) {
*dpifp = create_dpif_netdev(dp);
+ dp->dpif = *dpifp;
}
ovs_mutex_unlock(&dp_netdev_mutex);
return error;
}
-static void
-dp_netdev_purge_queues(struct dp_netdev *dp)
- OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
- int i;
-
- for (i = 0; i < dp->n_handlers; i++) {
- struct dp_netdev_queue *q = &dp->handler_queues[i];
-
- ovs_mutex_lock(&q->mutex);
- while (q->tail != q->head) {
- struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
- ofpbuf_uninit(&u->upcall.packet);
- ofpbuf_uninit(&u->buf);
- }
- ovs_mutex_unlock(&q->mutex);
- }
-}
-
/* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
* through the 'dp_netdevs' shash while freeing 'dp'. */
static void
}
ovsthread_stats_destroy(&dp->stats);
- fat_rwlock_wrlock(&dp->queue_rwlock);
- dp_netdev_destroy_all_queues(dp);
- fat_rwlock_unlock(&dp->queue_rwlock);
-
- fat_rwlock_destroy(&dp->queue_rwlock);
-
classifier_destroy(&dp->cls);
cmap_destroy(&dp->flow_table);
ovs_mutex_destroy(&dp->flow_mutex);
seq_destroy(dp->port_seq);
cmap_destroy(&dp->ports);
+ fat_rwlock_destroy(&dp->upcall_rwlock);
latch_destroy(&dp->exit_latch);
free(CONST_CAST(char *, dp->name));
free(dp);
return 0;
}
-static void
-dp_netdev_destroy_all_queues(struct dp_netdev *dp)
- OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
- size_t i;
-
- dp_netdev_purge_queues(dp);
-
- for (i = 0; i < dp->n_handlers; i++) {
- struct dp_netdev_queue *q = &dp->handler_queues[i];
-
- ovs_mutex_destroy(&q->mutex);
- seq_destroy(q->seq);
- }
- free(dp->handler_queues);
- dp->handler_queues = NULL;
- dp->n_handlers = 0;
-}
-
-static void
-dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
- OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
- if (dp->n_handlers != n_handlers) {
- size_t i;
-
- dp_netdev_destroy_all_queues(dp);
-
- dp->n_handlers = n_handlers;
- dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
-
- for (i = 0; i < n_handlers; i++) {
- struct dp_netdev_queue *q = &dp->handler_queues[i];
-
- ovs_mutex_init(&q->mutex);
- q->seq = seq_create();
- }
- }
-}
-
-static int
-dpif_netdev_recv_set(struct dpif *dpif, bool enable)
-{
- struct dp_netdev *dp = get_dp_netdev(dpif);
-
- if ((dp->handler_queues != NULL) == enable) {
- return 0;
- }
-
- fat_rwlock_wrlock(&dp->queue_rwlock);
- if (!enable) {
- dp_netdev_destroy_all_queues(dp);
- } else {
- dp_netdev_refresh_queues(dp, 1);
- }
- fat_rwlock_unlock(&dp->queue_rwlock);
-
- return 0;
-}
-
-static int
-dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
-{
- struct dp_netdev *dp = get_dp_netdev(dpif);
-
- fat_rwlock_wrlock(&dp->queue_rwlock);
- if (dp->handler_queues) {
- dp_netdev_refresh_queues(dp, n_handlers);
- }
- fat_rwlock_unlock(&dp->queue_rwlock);
-
- return 0;
-}
-
static int
dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
uint32_t queue_id, uint32_t *priority)
return 0;
}
-static bool
-dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
- OVS_REQ_RDLOCK(dp->queue_rwlock)
-{
- static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
- if (!dp->handler_queues) {
- VLOG_WARN_RL(&rl, "receiving upcall disabled");
- return false;
- }
-
- if (handler_id >= dp->n_handlers) {
- VLOG_WARN_RL(&rl, "handler index out of bound");
- return false;
- }
-
- return true;
-}
-
-static int
-dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
- struct dpif_upcall *upcall, struct ofpbuf *buf)
-{
- struct dp_netdev *dp = get_dp_netdev(dpif);
- struct dp_netdev_queue *q;
- int error = 0;
-
- fat_rwlock_rdlock(&dp->queue_rwlock);
-
- if (!dp_netdev_recv_check(dp, handler_id)) {
- error = EAGAIN;
- goto out;
- }
-
- q = &dp->handler_queues[handler_id];
- ovs_mutex_lock(&q->mutex);
- if (q->head != q->tail) {
- struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
-
- *upcall = u->upcall;
-
- ofpbuf_uninit(buf);
- *buf = u->buf;
- } else {
- error = EAGAIN;
- }
- ovs_mutex_unlock(&q->mutex);
-
-out:
- fat_rwlock_unlock(&dp->queue_rwlock);
-
- return error;
-}
-
-static void
-dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
-{
- struct dp_netdev *dp = get_dp_netdev(dpif);
- struct dp_netdev_queue *q;
- uint64_t seq;
-
- fat_rwlock_rdlock(&dp->queue_rwlock);
-
- if (!dp_netdev_recv_check(dp, handler_id)) {
- goto out;
- }
-
- q = &dp->handler_queues[handler_id];
- ovs_mutex_lock(&q->mutex);
- seq = seq_read(q->seq);
- if (q->head != q->tail) {
- poll_immediate_wake();
- } else {
- seq_wait(q->seq, seq);
- }
-
- ovs_mutex_unlock(&q->mutex);
-
-out:
- fat_rwlock_unlock(&dp->queue_rwlock);
-}
-
-static void
-dpif_netdev_recv_purge(struct dpif *dpif)
-{
- struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-
- fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
- dp_netdev_purge_queues(dpif_netdev->dp);
- fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
-}
\f
/* Creates and returns a new 'struct dp_netdev_actions', with a reference count
* of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
return NULL;
}
+static void
+dp_netdev_disable_upcall(struct dp_netdev *dp)
+ OVS_ACQUIRES(dp->upcall_rwlock)
+{
+ fat_rwlock_wrlock(&dp->upcall_rwlock);
+}
+
+static void
+dpif_netdev_disable_upcall(struct dpif *dpif)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ dp_netdev_disable_upcall(dp);
+}
+
+static void
+dp_netdev_enable_upcall(struct dp_netdev *dp)
+ OVS_RELEASES(dp->upcall_rwlock)
+{
+ fat_rwlock_unlock(&dp->upcall_rwlock);
+}
+
+static void
+dpif_netdev_enable_upcall(struct dpif *dpif)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ dp_netdev_enable_upcall(dp);
+}
+
static void
dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
{
dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
struct pkt_metadata *md)
{
+ struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
struct packet_batch batches[NETDEV_MAX_RX_BATCH];
struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
}
if (OVS_UNLIKELY(!rules[i])) {
+ struct ofpbuf *buf = &packets[i]->ofpbuf;
dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
-
- if (OVS_LIKELY(dp->handler_queues)) {
- uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
- struct ofpbuf *buf = &packets[i]->ofpbuf;
-
- dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers,
- DPIF_UC_MISS, mfs[i], NULL);
- }
-
+ dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS,
+ mfs[i], NULL);
dpif_packet_delete(packets[i]);
continue;
}
for (i = 0; i < n_batches; i++) {
packet_batch_execute(&batches[i], dp);
}
+
+ if (q.packet_count) {
+ dp_netdev_execute_userspace_queue(&q, dp);
+ }
}
static void
struct ofpbuf *packet, int type,
const struct miniflow *key,
const struct nlattr *userdata)
-OVS_REQUIRES(q->mutex)
{
- if (q->head - q->tail < MAX_QUEUE_LEN) {
- struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
- struct dpif_upcall *upcall = &u->upcall;
- struct ofpbuf *buf = &u->buf;
+ if (q->packet_count < NETDEV_MAX_RX_BATCH) {
+ int cnt = q->packet_count;
+ struct dpif_upcall *upcall = &q->upcalls[cnt];
+ struct ofpbuf *buf = &q->bufs[cnt];
size_t buf_size;
struct flow flow;
void *data;
/* Put userdata. */
if (userdata) {
upcall->userdata = ofpbuf_put(buf, userdata,
- NLA_ALIGN(userdata->nla_len));
+ NLA_ALIGN(userdata->nla_len));
}
/* We have to perform a copy of the packet, because we cannot send DPDK
ofpbuf_use_stub(&upcall->packet, data, ofpbuf_size(packet));
ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
- seq_change(q->seq);
-
+ q->packet_count++;
return 0;
} else {
return ENOBUFS;
}
}
-static int
-dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
- int queue_no, int type,
- const struct miniflow *key,
- const struct nlattr *userdata)
+static void
+dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q,
+ struct dp_netdev *dp)
{
- struct dp_netdev_queue *q;
- int error;
+ struct dpif_upcall *upcalls = q->upcalls;
+ struct ofpbuf *bufs = q->bufs;
+ int cnt = q->packet_count;
- fat_rwlock_rdlock(&dp->queue_rwlock);
- q = &dp->handler_queues[queue_no];
- ovs_mutex_lock(&q->mutex);
- error = dp_netdev_queue_userspace_packet(q, packet, type, key,
- userdata);
- if (error == ENOBUFS) {
- dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
- }
- ovs_mutex_unlock(&q->mutex);
- fat_rwlock_unlock(&dp->queue_rwlock);
+ if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
+ ovs_assert(dp->upcall_cb);
+ dp->upcall_cb(dp->dpif, upcalls, bufs, cnt);
+ fat_rwlock_unlock(&dp->upcall_rwlock);
+ } else {
+ int i;
- return error;
+ for (i = 0; i < cnt; i++) {
+ ofpbuf_uninit(&bufs[i]);
+ ofpbuf_uninit(&upcalls[i].packet);
+ }
+ }
}
struct dp_netdev_execute_aux {
struct dp_netdev *dp;
};
+static void
+dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ dp->upcall_cb = cb;
+}
+
static void
dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
struct pkt_metadata *md,
case OVS_ACTION_ATTR_USERSPACE: {
const struct nlattr *userdata;
struct netdev_flow_key key;
+ struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
miniflow_extract(packet, md, &key.flow);
- dp_netdev_output_userspace(aux->dp, packet,
- miniflow_hash_5tuple(&key.flow, 0)
- % aux->dp->n_handlers,
- DPIF_UC_ACTION, &key.flow,
- userdata);
+ dp_netdev_queue_userspace_packet(&q, packet,
+ DPIF_UC_ACTION, &key.flow,
+ userdata);
if (may_steal) {
dpif_packet_delete(packets[i]);
}
}
+
+ if (q.packet_count) {
+ dp_netdev_execute_userspace_queue(&q, aux->dp);
+ }
break;
}
dpif_netdev_flow_dump_next,
dpif_netdev_execute,
NULL, /* operate */
- dpif_netdev_recv_set,
- dpif_netdev_handlers_set,
+ NULL, /* recv_set */
+ NULL, /* handlers_set */
dpif_netdev_queue_to_priority,
- dpif_netdev_recv,
- dpif_netdev_recv_wait,
- dpif_netdev_recv_purge,
+ NULL, /* recv */
+ NULL, /* recv_wait */
+ NULL, /* recv_purge */
+ dpif_netdev_register_upcall_cb,
+ dpif_netdev_enable_upcall,
+ dpif_netdev_disable_upcall,
};
static void
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
+#include "dpif.h"
#include "openvswitch/types.h"
#include "ofpbuf.h"
#include "packets.h"
/* Throws away any queued upcalls that 'dpif' currently has ready to
* return. */
void (*recv_purge)(struct dpif *dpif);
+
+ /* For datapaths that run in userspace (i.e. dpif-netdev), threads polling
+ * for incoming packets can directly call upcall functions instead of
+ * offloading packet processing to separate handler threads. Datapaths
+ * that directly call upcall functions should use the functions below to
+ * to register an upcall function and enable / disable upcalls.
+ *
+ * Registers an upcall callback function with 'dpif'. This is only used if
+ * if 'dpif' directly executes upcall functions. */
+ void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *);
+
+ /* Enables upcalls if 'dpif' directly executes upcall functions. */
+ void (*enable_upcall)(struct dpif *);
+
+ /* Disables upcalls if 'dpif' directly executes upcall functions. */
+ void (*disable_upcall)(struct dpif *);
};
extern const struct dpif_class dpif_linux_class;
int
dpif_recv_set(struct dpif *dpif, bool enable)
{
- int error = dpif->dpif_class->recv_set(dpif, enable);
- log_operation(dpif, "recv_set", error);
+ int error = 0;
+
+ if (dpif->dpif_class->recv_set) {
+ error = dpif->dpif_class->recv_set(dpif, enable);
+ log_operation(dpif, "recv_set", error);
+ }
return error;
}
int
dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers)
{
- int error = dpif->dpif_class->handlers_set(dpif, n_handlers);
- log_operation(dpif, "handlers_set", error);
+ int error = 0;
+
+ if (dpif->dpif_class->handlers_set) {
+ error = dpif->dpif_class->handlers_set(dpif, n_handlers);
+ log_operation(dpif, "handlers_set", error);
+ }
return error;
}
+void
+dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
+{
+ if (dpif->dpif_class->register_upcall_cb) {
+ dpif->dpif_class->register_upcall_cb(dpif, cb);
+ }
+}
+
+void
+dpif_enable_upcall(struct dpif *dpif)
+{
+ if (dpif->dpif_class->enable_upcall) {
+ dpif->dpif_class->enable_upcall(dpif);
+ }
+}
+
+void
+dpif_disable_upcall(struct dpif *dpif)
+{
+ if (dpif->dpif_class->disable_upcall) {
+ dpif->dpif_class->disable_upcall(dpif);
+ }
+}
+
+void
+dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
+{
+ if (!VLOG_DROP_DBG(&dpmsg_rl)) {
+ struct ds flow;
+ char *packet;
+
+ packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
+ ofpbuf_size(&upcall->packet));
+
+ ds_init(&flow);
+ odp_flow_key_format(upcall->key, upcall->key_len, &flow);
+
+ VLOG_DBG("%s: %s upcall:\n%s\n%s",
+ dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
+ ds_cstr(&flow), packet);
+
+ ds_destroy(&flow);
+ free(packet);
+ }
+}
+
/* Polls for an upcall from 'dpif' for an upcall handler. Since there
* there can be multiple poll loops, 'handler_id' is needed as index to
* identify the corresponding poll loop. If successful, stores the upcall
dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,
struct ofpbuf *buf)
{
- int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
- if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
- struct ds flow;
- char *packet;
+ int error = EAGAIN;
- packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
- ofpbuf_size(&upcall->packet));
-
- ds_init(&flow);
- odp_flow_key_format(upcall->key, upcall->key_len, &flow);
-
- VLOG_DBG("%s: %s upcall:\n%s\n%s",
- dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
- ds_cstr(&flow), packet);
-
- ds_destroy(&flow);
- free(packet);
- } else if (error && error != EAGAIN) {
- log_operation(dpif, "recv", error);
+ if (dpif->dpif_class->recv) {
+ error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
+ if (!error) {
+ dpif_print_packet(dpif, upcall);
+ } else if (error != EAGAIN) {
+ log_operation(dpif, "recv", error);
+ }
}
return error;
}
void
dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
{
- dpif->dpif_class->recv_wait(dpif, handler_id);
+ if (dpif->dpif_class->recv_wait) {
+ dpif->dpif_class->recv_wait(dpif, handler_id);
+ }
}
/* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'
struct nlattr *userdata; /* Argument to OVS_ACTION_ATTR_USERSPACE. */
};
+typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *,
+ struct ofpbuf *, int cnt);
+
int dpif_recv_set(struct dpif *, bool enable);
int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
struct ofpbuf *);
void dpif_recv_purge(struct dpif *);
void dpif_recv_wait(struct dpif *, uint32_t handler_id);
+void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *);
+void dpif_enable_upcall(struct dpif *);
+void dpif_disable_upcall(struct dpif *);
+
+void dpif_print_packet(struct dpif *, struct dpif_upcall *);
\f
/* Miscellaneous. */
#include "vlog.h"
#define MAX_QUEUE_LENGTH 512
-#define UPCALL_MAX_BATCH 50
+#define UPCALL_MAX_BATCH 64
#define REVALIDATE_MAX_BATCH 50
VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
static size_t read_upcalls(struct handler *,
struct upcall upcalls[UPCALL_MAX_BATCH]);
-static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
+static void free_upcall(struct upcall *);
+static int convert_upcall(struct udpif *, struct upcall *);
+static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
static void udpif_stop_threads(struct udpif *);
static void udpif_start_threads(struct udpif *, size_t n_handlers,
size_t n_revalidators);
atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
ovs_mutex_init(&udpif->n_flows_mutex);
+ dpif_register_upcall_cb(dpif, exec_upcalls);
+
return udpif;
}
xpthread_join(udpif->revalidators[i].thread, NULL);
}
+ dpif_disable_upcall(udpif->dpif);
+
for (i = 0; i < udpif->n_revalidators; i++) {
struct revalidator *revalidator = &udpif->revalidators[i];
"handler", udpif_upcall_handler, handler);
}
+ dpif_enable_upcall(udpif->dpif);
+
ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
udpif->reval_exit = false;
udpif->revalidators = xzalloc(udpif->n_revalidators
latch_wait(&udpif->exit_latch);
poll_block();
} else {
- handle_upcalls(handler, upcalls, n_upcalls);
+ handle_upcalls(handler->udpif, upcalls, n_upcalls);
for (i = 0; i < n_upcalls; i++) {
- xlate_out_uninit(&upcalls[i].xout);
- ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
- ofpbuf_uninit(&upcalls[i].upcall_buf);
+ free_upcall(&upcalls[i]);
}
}
coverage_clear();
xlate_actions(&xin, &upcall->xout);
}
+void
+free_upcall(struct upcall *upcall)
+{
+ xlate_out_uninit(&upcall->xout);
+ ofpbuf_uninit(&upcall->dpif_upcall.packet);
+ ofpbuf_uninit(&upcall->upcall_buf);
+}
+
+static struct udpif *
+find_udpif(struct dpif *dpif)
+{
+ struct udpif *udpif;
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ if (udpif->dpif == dpif) {
+ return udpif;
+ }
+ }
+ return NULL;
+}
+
+void
+exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls,
+ struct ofpbuf *bufs, int cnt)
+{
+ struct upcall upcalls[UPCALL_MAX_BATCH];
+ struct udpif *udpif;
+ int i, j;
+
+ udpif = find_udpif(dpif);
+ ovs_assert(udpif);
+
+ for (i = 0; i < cnt; i += UPCALL_MAX_BATCH) {
+ size_t n_upcalls = 0;
+ for (j = i; j < MIN(i + UPCALL_MAX_BATCH, cnt); j++) {
+ struct upcall *upcall = &upcalls[n_upcalls];
+ struct dpif_upcall *dupcall = &dupcalls[j];
+ struct ofpbuf *buf = &bufs[j];
+
+ upcall->dpif_upcall = *dupcall;
+ upcall->upcall_buf = *buf;
+
+ dpif_print_packet(dpif, dupcall);
+ if (!convert_upcall(udpif, upcall)) {
+ n_upcalls += 1;
+ }
+ }
+
+ if (n_upcalls) {
+ handle_upcalls(udpif, upcalls, n_upcalls);
+ for (j = 0; j < n_upcalls; j++) {
+ free_upcall(&upcalls[j]);
+ }
+ }
+ }
+}
+
/* Reads and classifies upcalls. Returns the number of upcalls successfully
* read. */
static size_t
/* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
for (i = 0; i < UPCALL_MAX_BATCH; i++) {
struct upcall *upcall = &upcalls[n_upcalls];
- struct dpif_upcall *dupcall;
- struct ofpbuf *packet;
- struct ofproto_dpif *ofproto;
- struct dpif_sflow *sflow;
- struct dpif_ipfix *ipfix;
- struct flow flow;
- enum upcall_type type;
- odp_port_t odp_in_port;
int error;
ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
break;
}
- dupcall = &upcall->dpif_upcall;
- packet = &dupcall->packet;
- error = xlate_receive(udpif->backer, packet, dupcall->key,
- dupcall->key_len, &flow,
- &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
- if (error) {
- if (error == ENODEV) {
- /* Received packet on datapath port for which we couldn't
- * associate an ofproto. This can happen if a port is removed
- * while traffic is being received. Print a rate-limited
- * message in case it happens frequently. Install a drop flow
- * so that future packets of the flow are inexpensively dropped
- * in the kernel. */
- VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
- "port %"PRIu32, odp_in_port);
- dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
- dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
- NULL);
- }
- goto destroy_upcall;
+ if (!convert_upcall(udpif, upcall)) {
+ n_upcalls += 1;
}
+ }
+ return n_upcalls;
+}
- type = classify_upcall(upcall);
- if (type == MISS_UPCALL) {
- upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
- n_upcalls++;
- continue;
- }
+int
+convert_upcall(struct udpif *udpif, struct upcall *upcall)
+{
+ struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+ struct ofpbuf *packet = &dupcall->packet;
+ struct ofproto_dpif *ofproto;
+ struct dpif_sflow *sflow;
+ struct dpif_ipfix *ipfix;
+ struct flow flow;
+ enum upcall_type type;
+ odp_port_t odp_in_port;
+ int error;
- switch (type) {
- case SFLOW_UPCALL:
- if (sflow) {
- union user_action_cookie cookie;
+ error = xlate_receive(udpif->backer, packet, dupcall->key,
+ dupcall->key_len, &flow,
+ &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
- memset(&cookie, 0, sizeof cookie);
- memcpy(&cookie, nl_attr_get(dupcall->userdata),
- sizeof cookie.sflow);
- dpif_sflow_received(sflow, packet, &flow, odp_in_port,
- &cookie);
- }
- break;
- case IPFIX_UPCALL:
- if (ipfix) {
- dpif_ipfix_bridge_sample(ipfix, packet, &flow);
- }
- break;
- case FLOW_SAMPLE_UPCALL:
- if (ipfix) {
- union user_action_cookie cookie;
-
- memset(&cookie, 0, sizeof cookie);
- memcpy(&cookie, nl_attr_get(dupcall->userdata),
- sizeof cookie.flow_sample);
-
- /* The flow reflects exactly the contents of the packet.
- * Sample the packet using it. */
- dpif_ipfix_flow_sample(ipfix, packet, &flow,
- cookie.flow_sample.collector_set_id,
- cookie.flow_sample.probability,
- cookie.flow_sample.obs_domain_id,
- cookie.flow_sample.obs_point_id);
- }
- break;
- case BAD_UPCALL:
- break;
- case MISS_UPCALL:
- OVS_NOT_REACHED();
+ if (error) {
+ if (error == ENODEV) {
+ /* Received packet on datapath port for which we couldn't
+ * associate an ofproto. This can happen if a port is removed
+ * while traffic is being received. Print a rate-limited
+ * message in case it happens frequently. Install a drop flow
+ * so that future packets of the flow are inexpensively dropped
+ * in the kernel. */
+ VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
+ "port %"PRIu32, odp_in_port);
+ dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
+ dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
+ NULL);
}
+ goto destroy_upcall;
+ }
- dpif_ipfix_unref(ipfix);
- dpif_sflow_unref(sflow);
+ type = classify_upcall(upcall);
+ if (type == MISS_UPCALL) {
+ upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
+ return error;
+ }
-destroy_upcall:
- ofpbuf_uninit(&upcall->dpif_upcall.packet);
- ofpbuf_uninit(&upcall->upcall_buf);
+ switch (type) {
+ case SFLOW_UPCALL:
+ if (sflow) {
+ union user_action_cookie cookie;
+
+ memset(&cookie, 0, sizeof cookie);
+ memcpy(&cookie, nl_attr_get(dupcall->userdata),
+ sizeof cookie.sflow);
+ dpif_sflow_received(sflow, packet, &flow, odp_in_port,
+ &cookie);
+ }
+ break;
+ case IPFIX_UPCALL:
+ if (ipfix) {
+ dpif_ipfix_bridge_sample(ipfix, packet, &flow);
+ }
+ break;
+ case FLOW_SAMPLE_UPCALL:
+ if (ipfix) {
+ union user_action_cookie cookie;
+
+ memset(&cookie, 0, sizeof cookie);
+ memcpy(&cookie, nl_attr_get(dupcall->userdata),
+ sizeof cookie.flow_sample);
+
+ /* The flow reflects exactly the contents of the packet.
+ * Sample the packet using it. */
+ dpif_ipfix_flow_sample(ipfix, packet, &flow,
+ cookie.flow_sample.collector_set_id,
+ cookie.flow_sample.probability,
+ cookie.flow_sample.obs_domain_id,
+ cookie.flow_sample.obs_point_id);
+ }
+ break;
+ case BAD_UPCALL:
+ break;
+ case MISS_UPCALL:
+ OVS_NOT_REACHED();
}
- return n_upcalls;
+ dpif_ipfix_unref(ipfix);
+ dpif_sflow_unref(sflow);
+ error = EAGAIN;
+
+destroy_upcall:
+ ofpbuf_uninit(&upcall->dpif_upcall.packet);
+ ofpbuf_uninit(&upcall->upcall_buf);
+ return error;
}
static void
-handle_upcalls(struct handler *handler, struct upcall *upcalls,
+handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
size_t n_upcalls)
{
- struct udpif *udpif = handler->udpif;
struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
struct dpif_op ops[UPCALL_MAX_BATCH * 2];
size_t n_ops, i;
struct dpif;
struct dpif_backer;
+struct dpif_upcall;
+struct ofpbuf;
struct seq;
struct simap;
* them. Additionally, it's responsible for maintaining the datapath flow
* table. */
+void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *,
+ int cnt);
+
struct udpif *udpif_create(struct dpif_backer *, struct dpif *);
void udpif_run(struct udpif *udpif);
void udpif_set_threads(struct udpif *, size_t n_handlers,