2 * Copyright (c) 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include "dynamic-string.h"
28 #include "poll-loop.h"
30 #include "reconnect.h"
34 #define THIS_MODULE VLM_jsonrpc
38 struct stream *stream;
44 struct json_parser *parser;
45 struct jsonrpc_msg *received;
48 struct ovs_queue output;
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
59 jsonrpc_open(struct stream *stream)
63 assert(stream != NULL);
65 rpc = xzalloc(sizeof *rpc);
66 rpc->name = xstrdup(stream_get_name(stream));
68 byteq_init(&rpc->input);
69 queue_init(&rpc->output);
75 jsonrpc_close(struct jsonrpc *rpc)
85 jsonrpc_run(struct jsonrpc *rpc)
91 while (!queue_is_empty(&rpc->output)) {
92 struct ofpbuf *buf = rpc->output.head;
95 retval = stream_send(rpc->stream, buf->data, buf->size);
97 rpc->backlog -= retval;
98 ofpbuf_pull(buf, retval);
100 ofpbuf_delete(queue_pop_head(&rpc->output));
103 if (retval != -EAGAIN) {
104 VLOG_WARN_RL(&rl, "%s: send error: %s",
105 rpc->name, strerror(-retval));
106 jsonrpc_error(rpc, -retval);
114 jsonrpc_wait(struct jsonrpc *rpc)
116 if (!rpc->status && !queue_is_empty(&rpc->output)) {
117 stream_send_wait(rpc->stream);
122 jsonrpc_get_status(const struct jsonrpc *rpc)
128 jsonrpc_get_backlog(const struct jsonrpc *rpc)
130 return rpc->status ? 0 : rpc->backlog;
134 jsonrpc_get_name(const struct jsonrpc *rpc)
140 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
141 const struct jsonrpc_msg *msg)
143 if (VLOG_IS_DBG_ENABLED()) {
144 struct ds s = DS_EMPTY_INITIALIZER;
146 ds_put_format(&s, ", method=\"%s\"", msg->method);
149 ds_put_cstr(&s, ", params=");
150 ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
153 ds_put_cstr(&s, ", result=");
154 ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
157 ds_put_cstr(&s, ", error=");
158 ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
161 ds_put_cstr(&s, ", id=");
162 ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
164 VLOG_DBG("%s: %s %s%s", rpc->name, title,
165 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
171 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
179 jsonrpc_msg_destroy(msg);
183 jsonrpc_log_msg(rpc, "send", msg);
185 json = jsonrpc_msg_to_json(msg);
186 s = json_to_string(json, 0);
190 buf = xmalloc(sizeof *buf);
191 ofpbuf_use(buf, s, length);
193 queue_push_tail(&rpc->output, buf);
194 rpc->backlog += length;
196 if (rpc->output.n == 1) {
203 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
210 while (!rpc->received) {
211 if (byteq_is_empty(&rpc->input)) {
215 chunk = byteq_headroom(&rpc->input);
216 retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
218 if (retval == -EAGAIN) {
221 VLOG_WARN_RL(&rl, "%s: receive error: %s",
222 rpc->name, strerror(-retval));
223 jsonrpc_error(rpc, -retval);
226 } else if (retval == 0) {
227 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
228 jsonrpc_error(rpc, EOF);
231 byteq_advance_head(&rpc->input, retval);
236 rpc->parser = json_parser_create(0);
238 n = byteq_tailroom(&rpc->input);
239 used = json_parser_feed(rpc->parser,
240 (char *) byteq_tail(&rpc->input), n);
241 byteq_advance_tail(&rpc->input, used);
242 if (json_parser_is_done(rpc->parser)) {
243 jsonrpc_received(rpc);
251 *msgp = rpc->received;
252 rpc->received = NULL;
257 jsonrpc_recv_wait(struct jsonrpc *rpc)
259 if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
260 poll_immediate_wake();
262 stream_recv_wait(rpc->stream);
267 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
271 error = jsonrpc_send(rpc, msg);
276 while (!queue_is_empty(&rpc->output) && !rpc->status) {
285 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
288 int error = jsonrpc_recv(rpc, msgp);
289 if (error != EAGAIN) {
295 jsonrpc_recv_wait(rpc);
301 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
302 struct jsonrpc_msg **replyp)
304 struct jsonrpc_msg *reply = NULL;
308 id = json_clone(request->id);
309 error = jsonrpc_send_block(rpc, request);
312 error = jsonrpc_recv_block(rpc, &reply);
314 || (reply->type == JSONRPC_REPLY
315 && json_equal(id, reply->id))) {
318 jsonrpc_msg_destroy(reply);
321 *replyp = error ? NULL : reply;
327 jsonrpc_received(struct jsonrpc *rpc)
329 struct jsonrpc_msg *msg;
333 json = json_parser_finish(rpc->parser);
335 if (json->type == JSON_STRING) {
336 VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
337 rpc->name, json_string(json));
338 jsonrpc_error(rpc, EPROTO);
343 error = jsonrpc_msg_from_json(json, &msg);
345 VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
348 jsonrpc_error(rpc, EPROTO);
352 jsonrpc_log_msg(rpc, "received", msg);
357 jsonrpc_error(struct jsonrpc *rpc, int error)
362 jsonrpc_cleanup(rpc);
367 jsonrpc_cleanup(struct jsonrpc *rpc)
369 stream_close(rpc->stream);
372 json_parser_abort(rpc->parser);
375 jsonrpc_msg_destroy(rpc->received);
376 rpc->received = NULL;
378 queue_clear(&rpc->output);
382 static struct jsonrpc_msg *
383 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
384 struct json *params, struct json *result, struct json *error,
387 struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
389 msg->method = method ? xstrdup(method) : NULL;
390 msg->params = params;
391 msg->result = result;
398 jsonrpc_create_id(void)
400 static unsigned int id;
401 return json_integer_create(id++);
405 jsonrpc_create_request(const char *method, struct json *params)
407 return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL,
408 jsonrpc_create_id());
412 jsonrpc_create_notify(const char *method, struct json *params)
414 return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
418 jsonrpc_create_reply(struct json *result, const struct json *id)
420 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
425 jsonrpc_create_error(struct json *error, const struct json *id)
427 return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
432 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
435 case JSONRPC_REQUEST:
439 return "notification";
451 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
453 const char *type_name;
454 unsigned int pattern;
456 if (m->params && m->params->type != JSON_ARRAY) {
457 return xstrdup("\"params\" must be JSON array");
461 case JSONRPC_REQUEST:
478 return xasprintf("invalid JSON-RPC message type %d", m->type);
481 type_name = jsonrpc_msg_type_to_string(m->type);
482 if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
483 return xasprintf("%s must%s have \"method\"",
484 type_name, (pattern & 0x10000) ? "" : " not");
487 if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
488 return xasprintf("%s must%s have \"params\"",
489 type_name, (pattern & 0x1000) ? "" : " not");
492 if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
493 return xasprintf("%s must%s have \"result\"",
494 type_name, (pattern & 0x100) ? "" : " not");
497 if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
498 return xasprintf("%s must%s have \"error\"",
499 type_name, (pattern & 0x10) ? "" : " not");
502 if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
503 return xasprintf("%s must%s have \"id\"",
504 type_name, (pattern & 0x1) ? "" : " not");
511 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
515 json_destroy(m->params);
516 json_destroy(m->result);
517 json_destroy(m->error);
524 null_from_json_null(struct json *json)
526 if (json && json->type == JSON_NULL) {
534 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
536 struct json *method = NULL;
537 struct jsonrpc_msg *msg = NULL;
538 struct shash *object;
541 if (json->type != JSON_OBJECT) {
542 error = xstrdup("message is not a JSON object");
545 object = json_object(json);
547 method = shash_find_and_delete(object, "method");
548 if (method && method->type != JSON_STRING) {
549 error = xstrdup("method is not a JSON string");
553 msg = xzalloc(sizeof *msg);
554 msg->method = method ? xstrdup(method->u.string) : NULL;
555 msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
556 msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
557 msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
558 msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
559 msg->type = (msg->result ? JSONRPC_REPLY
560 : msg->error ? JSONRPC_ERROR
561 : msg->id ? JSONRPC_REQUEST
563 if (!shash_is_empty(object)) {
564 error = xasprintf("message has unexpected member \"%s\"",
565 shash_first(object)->name);
568 error = jsonrpc_msg_is_valid(msg);
574 json_destroy(method);
577 jsonrpc_msg_destroy(msg);
585 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
587 struct json *json = json_object_create();
590 json_object_put(json, "method", json_string_create_nocopy(m->method));
594 json_object_put(json, "params", m->params);
598 json_object_put(json, "result", m->result);
599 } else if (m->type == JSONRPC_ERROR) {
600 json_object_put(json, "result", json_null_create());
604 json_object_put(json, "error", m->error);
605 } else if (m->type == JSONRPC_REPLY) {
606 json_object_put(json, "error", json_null_create());
610 json_object_put(json, "id", m->id);
611 } else if (m->type == JSONRPC_NOTIFY) {
612 json_object_put(json, "id", json_null_create());
620 /* A JSON-RPC session with reconnection. */
622 struct jsonrpc_session {
623 struct reconnect *reconnect;
625 struct stream *stream;
629 struct jsonrpc_session *
630 jsonrpc_session_open(const char *name)
632 struct jsonrpc_session *s;
634 s = xmalloc(sizeof *s);
635 s->reconnect = reconnect_create(time_msec());
636 reconnect_set_name(s->reconnect, name);
637 reconnect_enable(s->reconnect, time_msec());
646 jsonrpc_session_close(struct jsonrpc_session *s)
649 jsonrpc_close(s->rpc);
650 reconnect_destroy(s->reconnect);
656 jsonrpc_session_disconnect(struct jsonrpc_session *s)
658 reconnect_disconnected(s->reconnect, time_msec(), 0);
660 jsonrpc_error(s->rpc, EOF);
661 jsonrpc_close(s->rpc);
664 } else if (s->stream) {
665 stream_close(s->stream);
672 jsonrpc_session_connect(struct jsonrpc_session *s)
676 jsonrpc_session_disconnect(s);
677 error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
679 reconnect_connect_failed(s->reconnect, time_msec(), error);
681 reconnect_connecting(s->reconnect, time_msec());
687 jsonrpc_session_run(struct jsonrpc_session *s)
693 error = jsonrpc_get_status(s->rpc);
695 jsonrpc_session_disconnect(s);
697 } else if (s->stream) {
698 int error = stream_connect(s->stream);
700 reconnect_connected(s->reconnect, time_msec());
701 s->rpc = jsonrpc_open(s->stream);
703 } else if (error != EAGAIN) {
704 reconnect_connect_failed(s->reconnect, time_msec(), error);
705 stream_close(s->stream);
710 switch (reconnect_run(s->reconnect, time_msec())) {
711 case RECONNECT_CONNECT:
712 jsonrpc_session_connect(s);
715 case RECONNECT_DISCONNECT:
716 jsonrpc_session_disconnect(s);
719 case RECONNECT_PROBE:
722 struct jsonrpc_msg *request;
724 params = json_array_create_empty();
725 request = jsonrpc_create_request("echo", params);
726 json_destroy(request->id);
727 request->id = json_string_create("echo");
728 jsonrpc_send(s->rpc, request);
735 jsonrpc_session_wait(struct jsonrpc_session *s)
738 jsonrpc_wait(s->rpc);
739 } else if (s->stream) {
740 stream_connect_wait(s->stream);
742 reconnect_wait(s->reconnect, time_msec());
746 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
748 return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
752 jsonrpc_session_get_name(const struct jsonrpc_session *s)
754 return reconnect_get_name(s->reconnect);
758 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
760 return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
764 jsonrpc_session_recv(struct jsonrpc_session *s)
766 struct jsonrpc_msg *msg = NULL;
768 jsonrpc_recv(s->rpc, &msg);
770 reconnect_received(s->reconnect, time_msec());
777 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
780 jsonrpc_recv_wait(s->rpc);
785 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
787 return s->rpc != NULL;
791 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
797 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
799 reconnect_force_reconnect(s->reconnect, time_msec());