jsonrpc: New type "jsonrpc_session", which automatically reconnects.
[cascardo/ovs.git] / lib / jsonrpc.c
1 /*
2  * Copyright (c) 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18
19 #include "jsonrpc.h"
20
21 #include <errno.h>
22
23 #include "byteq.h"
24 #include "dynamic-string.h"
25 #include "json.h"
26 #include "list.h"
27 #include "ofpbuf.h"
28 #include "poll-loop.h"
29 #include "queue.h"
30 #include "reconnect.h"
31 #include "stream.h"
32 #include "timeval.h"
33
34 #define THIS_MODULE VLM_jsonrpc
35 #include "vlog.h"
36 \f
37 struct jsonrpc {
38     struct stream *stream;
39     char *name;
40     int status;
41
42     /* Input. */
43     struct byteq input;
44     struct json_parser *parser;
45     struct jsonrpc_msg *received;
46
47     /* Output. */
48     struct ovs_queue output;
49     size_t backlog;
50 };
51
52 /* Rate limit for error messages. */
53 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
54
55 static void jsonrpc_received(struct jsonrpc *);
56 static void jsonrpc_cleanup(struct jsonrpc *);
57
58 struct jsonrpc *
59 jsonrpc_open(struct stream *stream)
60 {
61     struct jsonrpc *rpc;
62
63     assert(stream != NULL);
64
65     rpc = xzalloc(sizeof *rpc);
66     rpc->name = xstrdup(stream_get_name(stream));
67     rpc->stream = stream;
68     byteq_init(&rpc->input);
69     queue_init(&rpc->output);
70
71     return rpc;
72 }
73
74 void
75 jsonrpc_close(struct jsonrpc *rpc)
76 {
77     if (rpc) {
78         jsonrpc_cleanup(rpc);
79         free(rpc->name);
80         free(rpc);
81     }
82 }
83
84 void
85 jsonrpc_run(struct jsonrpc *rpc)
86 {
87     if (rpc->status) {
88         return;
89     }
90
91     while (!queue_is_empty(&rpc->output)) {
92         struct ofpbuf *buf = rpc->output.head;
93         int retval;
94
95         retval = stream_send(rpc->stream, buf->data, buf->size);
96         if (retval >= 0) {
97             rpc->backlog -= retval;
98             ofpbuf_pull(buf, retval);
99             if (!buf->size) {
100                 ofpbuf_delete(queue_pop_head(&rpc->output));
101             }
102         } else {
103             if (retval != -EAGAIN) {
104                 VLOG_WARN_RL(&rl, "%s: send error: %s",
105                              rpc->name, strerror(-retval));
106                 jsonrpc_error(rpc, -retval);
107             }
108             break;
109         }
110     }
111 }
112
113 void
114 jsonrpc_wait(struct jsonrpc *rpc)
115 {
116     if (!rpc->status && !queue_is_empty(&rpc->output)) {
117         stream_send_wait(rpc->stream);
118     }
119 }
120
121 int
122 jsonrpc_get_status(const struct jsonrpc *rpc)
123 {
124     return rpc->status;
125 }
126
127 size_t
128 jsonrpc_get_backlog(const struct jsonrpc *rpc)
129 {
130     return rpc->status ? 0 : rpc->backlog;
131 }
132
133 const char *
134 jsonrpc_get_name(const struct jsonrpc *rpc)
135 {
136     return rpc->name;
137 }
138
139 static void
140 jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
141                 const struct jsonrpc_msg *msg)
142 {
143     if (VLOG_IS_DBG_ENABLED()) {
144         struct ds s = DS_EMPTY_INITIALIZER;
145         if (msg->method) {
146             ds_put_format(&s, ", method=\"%s\"", msg->method);
147         }
148         if (msg->params) {
149             ds_put_cstr(&s, ", params=");
150             ds_put_and_free_cstr(&s, json_to_string(msg->params, 0));
151         }
152         if (msg->result) {
153             ds_put_cstr(&s, ", result=");
154             ds_put_and_free_cstr(&s, json_to_string(msg->result, 0));
155         }
156         if (msg->error) {
157             ds_put_cstr(&s, ", error=");
158             ds_put_and_free_cstr(&s, json_to_string(msg->error, 0));
159         }
160         if (msg->id) {
161             ds_put_cstr(&s, ", id=");
162             ds_put_and_free_cstr(&s, json_to_string(msg->id, 0));
163         }
164         VLOG_DBG("%s: %s %s%s", rpc->name, title,
165                  jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
166         ds_destroy(&s);
167     }
168 }
169
170 int
171 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
172 {
173     struct ofpbuf *buf;
174     struct json *json;
175     size_t length;
176     char *s;
177
178     if (rpc->status) {
179         jsonrpc_msg_destroy(msg);
180         return rpc->status;
181     }
182
183     jsonrpc_log_msg(rpc, "send", msg);
184
185     json = jsonrpc_msg_to_json(msg);
186     s = json_to_string(json, 0);
187     length = strlen(s);
188     json_destroy(json);
189
190     buf = xmalloc(sizeof *buf);
191     ofpbuf_use(buf, s, length);
192     buf->size = length;
193     queue_push_tail(&rpc->output, buf);
194     rpc->backlog += length;
195
196     if (rpc->output.n == 1) {
197         jsonrpc_run(rpc);
198     }
199     return rpc->status;
200 }
201
202 int
203 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
204 {
205     *msgp = NULL;
206     if (rpc->status) {
207         return rpc->status;
208     }
209
210     while (!rpc->received) {
211         if (byteq_is_empty(&rpc->input)) {
212             size_t chunk;
213             int retval;
214
215             chunk = byteq_headroom(&rpc->input);
216             retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
217             if (retval < 0) {
218                 if (retval == -EAGAIN) {
219                     return EAGAIN;
220                 } else {
221                     VLOG_WARN_RL(&rl, "%s: receive error: %s",
222                                  rpc->name, strerror(-retval));
223                     jsonrpc_error(rpc, -retval);
224                     return rpc->status;
225                 }
226             } else if (retval == 0) {
227                 VLOG_INFO_RL(&rl, "%s: connection closed", rpc->name);
228                 jsonrpc_error(rpc, EOF);
229                 return EOF;
230             }
231             byteq_advance_head(&rpc->input, retval);
232         } else {
233             size_t n, used;
234
235             if (!rpc->parser) {
236                 rpc->parser = json_parser_create(0);
237             }
238             n = byteq_tailroom(&rpc->input);
239             used = json_parser_feed(rpc->parser,
240                                     (char *) byteq_tail(&rpc->input), n);
241             byteq_advance_tail(&rpc->input, used);
242             if (json_parser_is_done(rpc->parser)) {
243                 jsonrpc_received(rpc);
244                 if (rpc->status) {
245                     return rpc->status;
246                 }
247             }
248         }
249     }
250
251     *msgp = rpc->received;
252     rpc->received = NULL;
253     return 0;
254 }
255
256 void
257 jsonrpc_recv_wait(struct jsonrpc *rpc)
258 {
259     if (rpc->status || rpc->received || !byteq_is_empty(&rpc->input)) {
260         poll_immediate_wake();
261     } else {
262         stream_recv_wait(rpc->stream);
263     }
264 }
265
266 int
267 jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
268 {
269     int error;
270
271     error = jsonrpc_send(rpc, msg);
272     if (error) {
273         return error;
274     }
275
276     while (!queue_is_empty(&rpc->output) && !rpc->status) {
277         jsonrpc_run(rpc);
278         jsonrpc_wait(rpc);
279         poll_block();
280     }
281     return rpc->status;
282 }
283
284 int
285 jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
286 {
287     for (;;) {
288         int error = jsonrpc_recv(rpc, msgp);
289         if (error != EAGAIN) {
290             return error;
291         }
292
293         jsonrpc_run(rpc);
294         jsonrpc_wait(rpc);
295         jsonrpc_recv_wait(rpc);
296         poll_block();
297     }
298 }
299
300 int
301 jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
302                        struct jsonrpc_msg **replyp)
303 {
304     struct jsonrpc_msg *reply = NULL;
305     struct json *id;
306     int error;
307
308     id = json_clone(request->id);
309     error = jsonrpc_send_block(rpc, request);
310     if (!error) {
311         for (;;) {
312             error = jsonrpc_recv_block(rpc, &reply);
313             if (error
314                 || (reply->type == JSONRPC_REPLY
315                     && json_equal(id, reply->id))) {
316                 break;
317             }
318             jsonrpc_msg_destroy(reply);
319         }
320     }
321     *replyp = error ? NULL : reply;
322     json_destroy(id);
323     return error;
324 }
325
326 static void
327 jsonrpc_received(struct jsonrpc *rpc)
328 {
329     struct jsonrpc_msg *msg;
330     struct json *json;
331     char *error;
332
333     json = json_parser_finish(rpc->parser);
334     rpc->parser = NULL;
335     if (json->type == JSON_STRING) {
336         VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
337                      rpc->name, json_string(json));
338         jsonrpc_error(rpc, EPROTO);
339         json_destroy(json);
340         return;
341     }
342
343     error = jsonrpc_msg_from_json(json, &msg);
344     if (error) {
345         VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
346                      rpc->name, error);
347         free(error);
348         jsonrpc_error(rpc, EPROTO);
349         return;
350     }
351
352     jsonrpc_log_msg(rpc, "received", msg);
353     rpc->received = msg;
354 }
355
356 void
357 jsonrpc_error(struct jsonrpc *rpc, int error)
358 {
359     assert(error);
360     if (!rpc->status) {
361         rpc->status = error;
362         jsonrpc_cleanup(rpc);
363     }
364 }
365
366 static void
367 jsonrpc_cleanup(struct jsonrpc *rpc)
368 {
369     stream_close(rpc->stream);
370     rpc->stream = NULL;
371
372     json_parser_abort(rpc->parser);
373     rpc->parser = NULL;
374
375     jsonrpc_msg_destroy(rpc->received);
376     rpc->received = NULL;
377
378     queue_clear(&rpc->output);
379     rpc->backlog = 0;
380 }
381 \f
382 static struct jsonrpc_msg *
383 jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
384                 struct json *params, struct json *result, struct json *error,
385                 struct json *id)
386 {
387     struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
388     msg->type = type;
389     msg->method = method ? xstrdup(method) : NULL;
390     msg->params = params;
391     msg->result = result;
392     msg->error = error;
393     msg->id = id;
394     return msg;
395 }
396
397 static struct json *
398 jsonrpc_create_id(void)
399 {
400     static unsigned int id;
401     return json_integer_create(id++);
402 }
403
404 struct jsonrpc_msg *
405 jsonrpc_create_request(const char *method, struct json *params)
406 {
407     return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL,
408                            jsonrpc_create_id());
409 }
410
411 struct jsonrpc_msg *
412 jsonrpc_create_notify(const char *method, struct json *params)
413 {
414     return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
415 }
416
417 struct jsonrpc_msg *
418 jsonrpc_create_reply(struct json *result, const struct json *id)
419 {
420     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
421                            json_clone(id));
422 }
423
424 struct jsonrpc_msg *
425 jsonrpc_create_error(struct json *error, const struct json *id)
426 {
427     return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
428                            json_clone(id));
429 }
430
431 const char *
432 jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
433 {
434     switch (type) {
435     case JSONRPC_REQUEST:
436         return "request";
437
438     case JSONRPC_NOTIFY:
439         return "notification";
440
441     case JSONRPC_REPLY:
442         return "reply";
443
444     case JSONRPC_ERROR:
445         return "error";
446     }
447     return "(null)";
448 }
449
450 char *
451 jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
452 {
453     const char *type_name;
454     unsigned int pattern;
455
456     if (m->params && m->params->type != JSON_ARRAY) {
457         return xstrdup("\"params\" must be JSON array");
458     }
459
460     switch (m->type) {
461     case JSONRPC_REQUEST:
462         pattern = 0x11001;
463         break;
464
465     case JSONRPC_NOTIFY:
466         pattern = 0x11000;
467         break;
468
469     case JSONRPC_REPLY:
470         pattern = 0x00101;
471         break;
472
473     case JSONRPC_ERROR:
474         pattern = 0x00011;
475         break;
476
477     default:
478         return xasprintf("invalid JSON-RPC message type %d", m->type);
479     }
480
481     type_name = jsonrpc_msg_type_to_string(m->type);
482     if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
483         return xasprintf("%s must%s have \"method\"",
484                          type_name, (pattern & 0x10000) ? "" : " not");
485
486     }
487     if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
488         return xasprintf("%s must%s have \"params\"",
489                          type_name, (pattern & 0x1000) ? "" : " not");
490
491     }
492     if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
493         return xasprintf("%s must%s have \"result\"",
494                          type_name, (pattern & 0x100) ? "" : " not");
495
496     }
497     if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
498         return xasprintf("%s must%s have \"error\"",
499                          type_name, (pattern & 0x10) ? "" : " not");
500
501     }
502     if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
503         return xasprintf("%s must%s have \"id\"",
504                          type_name, (pattern & 0x1) ? "" : " not");
505
506     }
507     return NULL;
508 }
509
510 void
511 jsonrpc_msg_destroy(struct jsonrpc_msg *m)
512 {
513     if (m) {
514         free(m->method);
515         json_destroy(m->params);
516         json_destroy(m->result);
517         json_destroy(m->error);
518         json_destroy(m->id);
519         free(m);
520     }
521 }
522
523 static struct json *
524 null_from_json_null(struct json *json)
525 {
526     if (json && json->type == JSON_NULL) {
527         json_destroy(json);
528         return NULL;
529     }
530     return json;
531 }
532
533 char *
534 jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
535 {
536     struct json *method = NULL;
537     struct jsonrpc_msg *msg = NULL;
538     struct shash *object;
539     char *error;
540
541     if (json->type != JSON_OBJECT) {
542         error = xstrdup("message is not a JSON object");
543         goto exit;
544     }
545     object = json_object(json);
546
547     method = shash_find_and_delete(object, "method");
548     if (method && method->type != JSON_STRING) {
549         error = xstrdup("method is not a JSON string");
550         goto exit;
551     }
552
553     msg = xzalloc(sizeof *msg);
554     msg->method = method ? xstrdup(method->u.string) : NULL;
555     msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
556     msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
557     msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
558     msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
559     msg->type = (msg->result ? JSONRPC_REPLY
560                  : msg->error ? JSONRPC_ERROR
561                  : msg->id ? JSONRPC_REQUEST
562                  : JSONRPC_NOTIFY);
563     if (!shash_is_empty(object)) {
564         error = xasprintf("message has unexpected member \"%s\"",
565                           shash_first(object)->name);
566         goto exit;
567     }
568     error = jsonrpc_msg_is_valid(msg);
569     if (error) {
570         goto exit;
571     }
572
573 exit:
574     json_destroy(method);
575     json_destroy(json);
576     if (error) {
577         jsonrpc_msg_destroy(msg);
578         msg = NULL;
579     }
580     *msgp = msg;
581     return error;
582 }
583
584 struct json *
585 jsonrpc_msg_to_json(struct jsonrpc_msg *m)
586 {
587     struct json *json = json_object_create();
588
589     if (m->method) {
590         json_object_put(json, "method", json_string_create_nocopy(m->method));
591     }
592
593     if (m->params) {
594         json_object_put(json, "params", m->params);
595     }
596
597     if (m->result) {
598         json_object_put(json, "result", m->result);
599     } else if (m->type == JSONRPC_ERROR) {
600         json_object_put(json, "result", json_null_create());
601     }
602
603     if (m->error) {
604         json_object_put(json, "error", m->error);
605     } else if (m->type == JSONRPC_REPLY) {
606         json_object_put(json, "error", json_null_create());
607     }
608
609     if (m->id) {
610         json_object_put(json, "id", m->id);
611     } else if (m->type == JSONRPC_NOTIFY) {
612         json_object_put(json, "id", json_null_create());
613     }
614
615     free(m);
616
617     return json;
618 }
619 \f
620 /* A JSON-RPC session with reconnection. */
621
622 struct jsonrpc_session {
623     struct reconnect *reconnect;
624     struct jsonrpc *rpc;
625     struct stream *stream;
626     unsigned int seqno;
627 };
628
629 struct jsonrpc_session *
630 jsonrpc_session_open(const char *name)
631 {
632     struct jsonrpc_session *s;
633
634     s = xmalloc(sizeof *s);
635     s->reconnect = reconnect_create(time_msec());
636     reconnect_set_name(s->reconnect, name);
637     reconnect_enable(s->reconnect, time_msec());
638     s->rpc = NULL;
639     s->stream = NULL;
640     s->seqno = 0;
641
642     return s;
643 }
644
645 void
646 jsonrpc_session_close(struct jsonrpc_session *s)
647 {
648     if (s) {
649         jsonrpc_close(s->rpc);
650         reconnect_destroy(s->reconnect);
651         free(s);
652     }
653 }
654
655 static void
656 jsonrpc_session_disconnect(struct jsonrpc_session *s)
657 {
658     reconnect_disconnected(s->reconnect, time_msec(), 0);
659     if (s->rpc) {
660         jsonrpc_error(s->rpc, EOF);
661         jsonrpc_close(s->rpc);
662         s->rpc = NULL;
663         s->seqno++;
664     } else if (s->stream) {
665         stream_close(s->stream);
666         s->stream = NULL;
667         s->seqno++;
668     }
669 }
670
671 static void
672 jsonrpc_session_connect(struct jsonrpc_session *s)
673 {
674     int error;
675
676     jsonrpc_session_disconnect(s);
677     error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
678     if (error) {
679         reconnect_connect_failed(s->reconnect, time_msec(), error);
680     } else {
681         reconnect_connecting(s->reconnect, time_msec());
682     }
683     s->seqno++;
684 }
685
686 void
687 jsonrpc_session_run(struct jsonrpc_session *s)
688 {
689     if (s->rpc) {
690         int error;
691
692         jsonrpc_run(s->rpc);
693         error = jsonrpc_get_status(s->rpc);
694         if (error) {
695             jsonrpc_session_disconnect(s);
696         }
697     } else if (s->stream) {
698         int error = stream_connect(s->stream);
699         if (!error) {
700             reconnect_connected(s->reconnect, time_msec());
701             s->rpc = jsonrpc_open(s->stream);
702             s->stream = NULL;
703         } else if (error != EAGAIN) {
704             reconnect_connect_failed(s->reconnect, time_msec(), error);
705             stream_close(s->stream);
706             s->stream = NULL;
707         }
708     }
709
710     switch (reconnect_run(s->reconnect, time_msec())) {
711     case RECONNECT_CONNECT:
712         jsonrpc_session_connect(s);
713         break;
714
715     case RECONNECT_DISCONNECT:
716         jsonrpc_session_disconnect(s);
717         break;
718
719     case RECONNECT_PROBE:
720         if (s->rpc) {
721             struct json *params;
722             struct jsonrpc_msg *request;
723
724             params = json_array_create_empty();
725             request = jsonrpc_create_request("echo", params);
726             json_destroy(request->id);
727             request->id = json_string_create("echo");
728             jsonrpc_send(s->rpc, request);
729         }
730         break;
731     }
732 }
733
734 void
735 jsonrpc_session_wait(struct jsonrpc_session *s)
736 {
737     if (s->rpc) {
738         jsonrpc_wait(s->rpc);
739     } else if (s->stream) {
740         stream_connect_wait(s->stream);
741     }
742     reconnect_wait(s->reconnect, time_msec());
743 }
744
745 size_t
746 jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
747 {
748     return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
749 }
750
751 const char *
752 jsonrpc_session_get_name(const struct jsonrpc_session *s)
753 {
754     return reconnect_get_name(s->reconnect);
755 }
756
757 int
758 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
759 {
760     return s->rpc ? jsonrpc_send(s->rpc, msg) : ENOTCONN;
761 }
762
763 struct jsonrpc_msg *
764 jsonrpc_session_recv(struct jsonrpc_session *s)
765 {
766     struct jsonrpc_msg *msg = NULL;
767     if (s->rpc) {
768         jsonrpc_recv(s->rpc, &msg);
769         if (msg) {
770             reconnect_received(s->reconnect, time_msec());
771         }
772     }
773     return msg;
774 }
775
776 void
777 jsonrpc_session_recv_wait(struct jsonrpc_session *s)
778 {
779     if (s->rpc) {
780         jsonrpc_recv_wait(s->rpc);
781     }
782 }
783
784 bool
785 jsonrpc_session_is_connected(const struct jsonrpc_session *s)
786 {
787     return s->rpc != NULL;
788 }
789
790 unsigned int
791 jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
792 {
793     return s->seqno;
794 }
795
796 void
797 jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
798 {
799     reconnect_force_reconnect(s->reconnect, time_msec());
800 }