rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
struct rconn_packet_counter *counter, int queue_limit)
{
- if (counter->n_packets < queue_limit) {
+ if (rconn_packet_counter_n_packets(counter) < queue_limit) {
return rconn_send(rc, b, counter);
} else {
COVERAGE_INC(rconn_overflow);
rconn_packet_counter_create(void)
{
struct rconn_packet_counter *c = xzalloc(sizeof *c);
+ ovs_mutex_init(&c->mutex);
+ ovs_mutex_lock(&c->mutex);
c->ref_cnt = 1;
+ ovs_mutex_unlock(&c->mutex);
return c;
}
rconn_packet_counter_destroy(struct rconn_packet_counter *c)
{
if (c) {
+ bool dead;
+
+ ovs_mutex_lock(&c->mutex);
ovs_assert(c->ref_cnt > 0);
- if (!--c->ref_cnt && !c->n_packets) {
+ dead = !--c->ref_cnt && !c->n_packets;
+ ovs_mutex_unlock(&c->mutex);
+
+ if (dead) {
+ ovs_mutex_destroy(&c->mutex);
free(c);
}
}
void
rconn_packet_counter_inc(struct rconn_packet_counter *c, unsigned int n_bytes)
{
+ ovs_mutex_lock(&c->mutex);
c->n_packets++;
c->n_bytes += n_bytes;
+ ovs_mutex_unlock(&c->mutex);
}
void
rconn_packet_counter_dec(struct rconn_packet_counter *c, unsigned int n_bytes)
{
- ovs_assert(c->n_packets > 0);
- ovs_assert(c->n_bytes >= n_bytes);
+ bool dead = false;
- c->n_bytes -= n_bytes;
+ ovs_mutex_lock(&c->mutex);
+ ovs_assert(c->n_packets > 0);
+ ovs_assert(c->n_packets == 1
+ ? c->n_bytes == n_bytes
+ : c->n_bytes > n_bytes);
c->n_packets--;
- if (!c->n_packets) {
- ovs_assert(!c->n_bytes);
- if (!c->ref_cnt) {
- free(c);
- }
+ c->n_bytes -= n_bytes;
+ dead = !c->n_packets && !c->ref_cnt;
+ ovs_mutex_unlock(&c->mutex);
+
+ if (dead) {
+ ovs_mutex_destroy(&c->mutex);
+ free(c);
}
}
+
+unsigned int
+rconn_packet_counter_n_packets(const struct rconn_packet_counter *c)
+{
+ unsigned int n;
+
+ ovs_mutex_lock(&c->mutex);
+ n = c->n_packets;
+ ovs_mutex_unlock(&c->mutex);
+
+ return n;
+}
+
+unsigned int
+rconn_packet_counter_n_bytes(const struct rconn_packet_counter *c)
+{
+ unsigned int n;
+
+ ovs_mutex_lock(&c->mutex);
+ n = c->n_bytes;
+ ovs_mutex_unlock(&c->mutex);
+
+ return n;
+}
\f
/* Set rc->target and rc->name to 'target' and 'name', respectively. If 'name'
* is null, 'target' is used.
/*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <stdint.h>
#include <time.h>
#include "openvswitch/types.h"
+#include "ovs-thread.h"
/* A wrapper around vconn that provides queuing and optionally reliability.
*
/* Counts packets and bytes queued into an rconn by a given source. */
struct rconn_packet_counter {
- unsigned int n_packets; /* Number of packets queued. */
- unsigned int n_bytes; /* Number of bytes queued. */
- int ref_cnt; /* Number of owners. */
+ struct ovs_mutex mutex;
+ unsigned int n_packets OVS_GUARDED; /* Number of packets queued. */
+ unsigned int n_bytes OVS_GUARDED; /* Number of bytes queued. */
+ int ref_cnt OVS_GUARDED; /* Number of owners. */
};
struct rconn_packet_counter *rconn_packet_counter_create(void);
void rconn_packet_counter_inc(struct rconn_packet_counter *, unsigned n_bytes);
void rconn_packet_counter_dec(struct rconn_packet_counter *, unsigned n_bytes);
+unsigned int rconn_packet_counter_n_packets(
+ const struct rconn_packet_counter *);
+unsigned int rconn_packet_counter_n_bytes(const struct rconn_packet_counter *);
+
#endif /* rconn.h */
static bool
ofconn_may_recv(const struct ofconn *ofconn)
{
- int count = ofconn->reply_counter->n_packets;
+ int count = rconn_packet_counter_n_packets(ofconn->reply_counter);
return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
}
struct ofpbuf *msg, *next;
LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+ unsigned int n_bytes;
+
list_remove(&msg->list_node);
ofconn_send(ofconn, msg, ofconn->monitor_counter);
- if (!ofconn->monitor_paused
- && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+ n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter);
+ if (!ofconn->monitor_paused && n_bytes > 128 * 1024) {
struct ofpbuf *pause;
COVERAGE_INC(ofmonitor_pause);
struct ofconn *ofconn;
LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
- if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ if (ofconn->monitor_paused
+ && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) {
COVERAGE_INC(ofmonitor_resume);
ofmonitor_resume(ofconn);
}
struct ofconn *ofconn;
LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
- if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ if (ofconn->monitor_paused
+ && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) {
poll_immediate_wake();
}
}