Merge branch 'timers-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[cascardo/linux.git] / net / ceph / osd_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20
21 #define OSD_OP_FRONT_LEN        4096
22 #define OSD_OPREPLY_FRONT_LEN   512
23
24 static const struct ceph_connection_operations osd_con_ops;
25
26 static void send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29                                struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31                                         struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33                            struct ceph_osd_request *req);
34
35 static int op_needs_trail(int op)
36 {
37         switch (op) {
38         case CEPH_OSD_OP_GETXATTR:
39         case CEPH_OSD_OP_SETXATTR:
40         case CEPH_OSD_OP_CMPXATTR:
41         case CEPH_OSD_OP_CALL:
42         case CEPH_OSD_OP_NOTIFY:
43                 return 1;
44         default:
45                 return 0;
46         }
47 }
48
49 static int op_has_extent(int op)
50 {
51         return (op == CEPH_OSD_OP_READ ||
52                 op == CEPH_OSD_OP_WRITE);
53 }
54
55 int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56                         struct ceph_file_layout *layout,
57                         u64 snapid,
58                         u64 off, u64 *plen, u64 *bno,
59                         struct ceph_osd_request *req,
60                         struct ceph_osd_req_op *op)
61 {
62         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63         u64 orig_len = *plen;
64         u64 objoff, objlen;    /* extent in object */
65         int r;
66
67         reqhead->snapid = cpu_to_le64(snapid);
68
69         /* object extent? */
70         r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71                                           &objoff, &objlen);
72         if (r < 0)
73                 return r;
74         if (*plen < orig_len)
75                 dout(" skipping last %llu, final file extent %llu~%llu\n",
76                      orig_len - *plen, off, *plen);
77
78         if (op_has_extent(op->op)) {
79                 op->extent.offset = objoff;
80                 op->extent.length = objlen;
81         }
82         req->r_num_pages = calc_pages_for(off, *plen);
83         req->r_page_alignment = off & ~PAGE_MASK;
84         if (op->op == CEPH_OSD_OP_WRITE)
85                 op->payload_len = *plen;
86
87         dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88              *bno, objoff, objlen, req->r_num_pages);
89         return 0;
90 }
91 EXPORT_SYMBOL(ceph_calc_raw_layout);
92
93 /*
94  * Implement client access to distributed object storage cluster.
95  *
96  * All data objects are stored within a cluster/cloud of OSDs, or
97  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
98  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
99  * remote daemons serving up and coordinating consistent and safe
100  * access to storage.
101  *
102  * Cluster membership and the mapping of data objects onto storage devices
103  * are described by the osd map.
104  *
105  * We keep track of pending OSD requests (read, write), resubmit
106  * requests to different OSDs when the cluster topology/data layout
107  * change, or retry the affected requests when the communications
108  * channel with an OSD is reset.
109  */
110
111 /*
112  * calculate the mapping of a file extent onto an object, and fill out the
113  * request accordingly.  shorten extent as necessary if it crosses an
114  * object boundary.
115  *
116  * fill osd op in request message.
117  */
118 static int calc_layout(struct ceph_osd_client *osdc,
119                        struct ceph_vino vino,
120                        struct ceph_file_layout *layout,
121                        u64 off, u64 *plen,
122                        struct ceph_osd_request *req,
123                        struct ceph_osd_req_op *op)
124 {
125         u64 bno;
126         int r;
127
128         r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
129                                  plen, &bno, req, op);
130         if (r < 0)
131                 return r;
132
133         snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134         req->r_oid_len = strlen(req->r_oid);
135
136         return r;
137 }
138
139 /*
140  * requests
141  */
142 void ceph_osdc_release_request(struct kref *kref)
143 {
144         struct ceph_osd_request *req = container_of(kref,
145                                                     struct ceph_osd_request,
146                                                     r_kref);
147
148         if (req->r_request)
149                 ceph_msg_put(req->r_request);
150         if (req->r_con_filling_msg) {
151                 dout("%s revoking pages %p from con %p\n", __func__,
152                      req->r_pages, req->r_con_filling_msg);
153                 ceph_msg_revoke_incoming(req->r_reply);
154                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
155         }
156         if (req->r_reply)
157                 ceph_msg_put(req->r_reply);
158         if (req->r_own_pages)
159                 ceph_release_page_vector(req->r_pages,
160                                          req->r_num_pages);
161 #ifdef CONFIG_BLOCK
162         if (req->r_bio)
163                 bio_put(req->r_bio);
164 #endif
165         ceph_put_snap_context(req->r_snapc);
166         if (req->r_trail) {
167                 ceph_pagelist_release(req->r_trail);
168                 kfree(req->r_trail);
169         }
170         if (req->r_mempool)
171                 mempool_free(req, req->r_osdc->req_mempool);
172         else
173                 kfree(req);
174 }
175 EXPORT_SYMBOL(ceph_osdc_release_request);
176
177 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178 {
179         int i = 0;
180
181         if (needs_trail)
182                 *needs_trail = 0;
183         while (ops[i].op) {
184                 if (needs_trail && op_needs_trail(ops[i].op))
185                         *needs_trail = 1;
186                 i++;
187         }
188
189         return i;
190 }
191
192 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193                                                int flags,
194                                                struct ceph_snap_context *snapc,
195                                                struct ceph_osd_req_op *ops,
196                                                bool use_mempool,
197                                                gfp_t gfp_flags,
198                                                struct page **pages,
199                                                struct bio *bio)
200 {
201         struct ceph_osd_request *req;
202         struct ceph_msg *msg;
203         int needs_trail;
204         int num_op = get_num_ops(ops, &needs_trail);
205         size_t msg_size = sizeof(struct ceph_osd_request_head);
206
207         msg_size += num_op*sizeof(struct ceph_osd_op);
208
209         if (use_mempool) {
210                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
211                 memset(req, 0, sizeof(*req));
212         } else {
213                 req = kzalloc(sizeof(*req), gfp_flags);
214         }
215         if (req == NULL)
216                 return NULL;
217
218         req->r_osdc = osdc;
219         req->r_mempool = use_mempool;
220
221         kref_init(&req->r_kref);
222         init_completion(&req->r_completion);
223         init_completion(&req->r_safe_completion);
224         INIT_LIST_HEAD(&req->r_unsafe_item);
225         INIT_LIST_HEAD(&req->r_linger_item);
226         INIT_LIST_HEAD(&req->r_linger_osd);
227         INIT_LIST_HEAD(&req->r_req_lru_item);
228         INIT_LIST_HEAD(&req->r_osd_item);
229
230         req->r_flags = flags;
231
232         WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
233
234         /* create reply message */
235         if (use_mempool)
236                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
237         else
238                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
239                                    OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
240         if (!msg) {
241                 ceph_osdc_put_request(req);
242                 return NULL;
243         }
244         req->r_reply = msg;
245
246         /* allocate space for the trailing data */
247         if (needs_trail) {
248                 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
249                 if (!req->r_trail) {
250                         ceph_osdc_put_request(req);
251                         return NULL;
252                 }
253                 ceph_pagelist_init(req->r_trail);
254         }
255
256         /* create request message; allow space for oid */
257         msg_size += MAX_OBJ_NAME_SIZE;
258         if (snapc)
259                 msg_size += sizeof(u64) * snapc->num_snaps;
260         if (use_mempool)
261                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
262         else
263                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
264         if (!msg) {
265                 ceph_osdc_put_request(req);
266                 return NULL;
267         }
268
269         memset(msg->front.iov_base, 0, msg->front.iov_len);
270
271         req->r_request = msg;
272         req->r_pages = pages;
273 #ifdef CONFIG_BLOCK
274         if (bio) {
275                 req->r_bio = bio;
276                 bio_get(req->r_bio);
277         }
278 #endif
279
280         return req;
281 }
282 EXPORT_SYMBOL(ceph_osdc_alloc_request);
283
284 static void osd_req_encode_op(struct ceph_osd_request *req,
285                               struct ceph_osd_op *dst,
286                               struct ceph_osd_req_op *src)
287 {
288         dst->op = cpu_to_le16(src->op);
289
290         switch (src->op) {
291         case CEPH_OSD_OP_READ:
292         case CEPH_OSD_OP_WRITE:
293                 dst->extent.offset =
294                         cpu_to_le64(src->extent.offset);
295                 dst->extent.length =
296                         cpu_to_le64(src->extent.length);
297                 dst->extent.truncate_size =
298                         cpu_to_le64(src->extent.truncate_size);
299                 dst->extent.truncate_seq =
300                         cpu_to_le32(src->extent.truncate_seq);
301                 break;
302
303         case CEPH_OSD_OP_GETXATTR:
304         case CEPH_OSD_OP_SETXATTR:
305         case CEPH_OSD_OP_CMPXATTR:
306                 BUG_ON(!req->r_trail);
307
308                 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
309                 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
310                 dst->xattr.cmp_op = src->xattr.cmp_op;
311                 dst->xattr.cmp_mode = src->xattr.cmp_mode;
312                 ceph_pagelist_append(req->r_trail, src->xattr.name,
313                                      src->xattr.name_len);
314                 ceph_pagelist_append(req->r_trail, src->xattr.val,
315                                      src->xattr.value_len);
316                 break;
317         case CEPH_OSD_OP_CALL:
318                 BUG_ON(!req->r_trail);
319
320                 dst->cls.class_len = src->cls.class_len;
321                 dst->cls.method_len = src->cls.method_len;
322                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
323
324                 ceph_pagelist_append(req->r_trail, src->cls.class_name,
325                                      src->cls.class_len);
326                 ceph_pagelist_append(req->r_trail, src->cls.method_name,
327                                      src->cls.method_len);
328                 ceph_pagelist_append(req->r_trail, src->cls.indata,
329                                      src->cls.indata_len);
330                 break;
331         case CEPH_OSD_OP_ROLLBACK:
332                 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
333                 break;
334         case CEPH_OSD_OP_STARTSYNC:
335                 break;
336         case CEPH_OSD_OP_NOTIFY:
337                 {
338                         __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
339                         __le32 timeout = cpu_to_le32(src->watch.timeout);
340
341                         BUG_ON(!req->r_trail);
342
343                         ceph_pagelist_append(req->r_trail,
344                                                 &prot_ver, sizeof(prot_ver));
345                         ceph_pagelist_append(req->r_trail,
346                                                 &timeout, sizeof(timeout));
347                 }
348         case CEPH_OSD_OP_NOTIFY_ACK:
349         case CEPH_OSD_OP_WATCH:
350                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
351                 dst->watch.ver = cpu_to_le64(src->watch.ver);
352                 dst->watch.flag = src->watch.flag;
353                 break;
354         default:
355                 pr_err("unrecognized osd opcode %d\n", dst->op);
356                 WARN_ON(1);
357                 break;
358         }
359         dst->payload_len = cpu_to_le32(src->payload_len);
360 }
361
362 /*
363  * build new request AND message
364  *
365  */
366 void ceph_osdc_build_request(struct ceph_osd_request *req,
367                              u64 off, u64 *plen,
368                              struct ceph_osd_req_op *src_ops,
369                              struct ceph_snap_context *snapc,
370                              struct timespec *mtime,
371                              const char *oid,
372                              int oid_len)
373 {
374         struct ceph_msg *msg = req->r_request;
375         struct ceph_osd_request_head *head;
376         struct ceph_osd_req_op *src_op;
377         struct ceph_osd_op *op;
378         void *p;
379         int num_op = get_num_ops(src_ops, NULL);
380         size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
381         int flags = req->r_flags;
382         u64 data_len = 0;
383         int i;
384
385         head = msg->front.iov_base;
386         op = (void *)(head + 1);
387         p = (void *)(op + num_op);
388
389         req->r_snapc = ceph_get_snap_context(snapc);
390
391         head->client_inc = cpu_to_le32(1); /* always, for now. */
392         head->flags = cpu_to_le32(flags);
393         if (flags & CEPH_OSD_FLAG_WRITE)
394                 ceph_encode_timespec(&head->mtime, mtime);
395         head->num_ops = cpu_to_le16(num_op);
396
397
398         /* fill in oid */
399         head->object_len = cpu_to_le32(oid_len);
400         memcpy(p, oid, oid_len);
401         p += oid_len;
402
403         src_op = src_ops;
404         while (src_op->op) {
405                 osd_req_encode_op(req, op, src_op);
406                 src_op++;
407                 op++;
408         }
409
410         if (req->r_trail)
411                 data_len += req->r_trail->length;
412
413         if (snapc) {
414                 head->snap_seq = cpu_to_le64(snapc->seq);
415                 head->num_snaps = cpu_to_le32(snapc->num_snaps);
416                 for (i = 0; i < snapc->num_snaps; i++) {
417                         put_unaligned_le64(snapc->snaps[i], p);
418                         p += sizeof(u64);
419                 }
420         }
421
422         if (flags & CEPH_OSD_FLAG_WRITE) {
423                 req->r_request->hdr.data_off = cpu_to_le16(off);
424                 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
425         } else if (data_len) {
426                 req->r_request->hdr.data_off = 0;
427                 req->r_request->hdr.data_len = cpu_to_le32(data_len);
428         }
429
430         req->r_request->page_alignment = req->r_page_alignment;
431
432         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
433         msg_size = p - msg->front.iov_base;
434         msg->front.iov_len = msg_size;
435         msg->hdr.front_len = cpu_to_le32(msg_size);
436         return;
437 }
438 EXPORT_SYMBOL(ceph_osdc_build_request);
439
440 /*
441  * build new request AND message, calculate layout, and adjust file
442  * extent as needed.
443  *
444  * if the file was recently truncated, we include information about its
445  * old and new size so that the object can be updated appropriately.  (we
446  * avoid synchronously deleting truncated objects because it's slow.)
447  *
448  * if @do_sync, include a 'startsync' command so that the osd will flush
449  * data quickly.
450  */
451 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
452                                                struct ceph_file_layout *layout,
453                                                struct ceph_vino vino,
454                                                u64 off, u64 *plen,
455                                                int opcode, int flags,
456                                                struct ceph_snap_context *snapc,
457                                                int do_sync,
458                                                u32 truncate_seq,
459                                                u64 truncate_size,
460                                                struct timespec *mtime,
461                                                bool use_mempool, int num_reply,
462                                                int page_align)
463 {
464         struct ceph_osd_req_op ops[3];
465         struct ceph_osd_request *req;
466         int r;
467
468         ops[0].op = opcode;
469         ops[0].extent.truncate_seq = truncate_seq;
470         ops[0].extent.truncate_size = truncate_size;
471         ops[0].payload_len = 0;
472
473         if (do_sync) {
474                 ops[1].op = CEPH_OSD_OP_STARTSYNC;
475                 ops[1].payload_len = 0;
476                 ops[2].op = 0;
477         } else
478                 ops[1].op = 0;
479
480         req = ceph_osdc_alloc_request(osdc, flags,
481                                          snapc, ops,
482                                          use_mempool,
483                                          GFP_NOFS, NULL, NULL);
484         if (!req)
485                 return ERR_PTR(-ENOMEM);
486
487         /* calculate max write size */
488         r = calc_layout(osdc, vino, layout, off, plen, req, ops);
489         if (r < 0)
490                 return ERR_PTR(r);
491         req->r_file_layout = *layout;  /* keep a copy */
492
493         /* in case it differs from natural (file) alignment that
494            calc_layout filled in for us */
495         req->r_num_pages = calc_pages_for(page_align, *plen);
496         req->r_page_alignment = page_align;
497
498         ceph_osdc_build_request(req, off, plen, ops,
499                                 snapc,
500                                 mtime,
501                                 req->r_oid, req->r_oid_len);
502
503         return req;
504 }
505 EXPORT_SYMBOL(ceph_osdc_new_request);
506
507 /*
508  * We keep osd requests in an rbtree, sorted by ->r_tid.
509  */
510 static void __insert_request(struct ceph_osd_client *osdc,
511                              struct ceph_osd_request *new)
512 {
513         struct rb_node **p = &osdc->requests.rb_node;
514         struct rb_node *parent = NULL;
515         struct ceph_osd_request *req = NULL;
516
517         while (*p) {
518                 parent = *p;
519                 req = rb_entry(parent, struct ceph_osd_request, r_node);
520                 if (new->r_tid < req->r_tid)
521                         p = &(*p)->rb_left;
522                 else if (new->r_tid > req->r_tid)
523                         p = &(*p)->rb_right;
524                 else
525                         BUG();
526         }
527
528         rb_link_node(&new->r_node, parent, p);
529         rb_insert_color(&new->r_node, &osdc->requests);
530 }
531
532 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
533                                                  u64 tid)
534 {
535         struct ceph_osd_request *req;
536         struct rb_node *n = osdc->requests.rb_node;
537
538         while (n) {
539                 req = rb_entry(n, struct ceph_osd_request, r_node);
540                 if (tid < req->r_tid)
541                         n = n->rb_left;
542                 else if (tid > req->r_tid)
543                         n = n->rb_right;
544                 else
545                         return req;
546         }
547         return NULL;
548 }
549
550 static struct ceph_osd_request *
551 __lookup_request_ge(struct ceph_osd_client *osdc,
552                     u64 tid)
553 {
554         struct ceph_osd_request *req;
555         struct rb_node *n = osdc->requests.rb_node;
556
557         while (n) {
558                 req = rb_entry(n, struct ceph_osd_request, r_node);
559                 if (tid < req->r_tid) {
560                         if (!n->rb_left)
561                                 return req;
562                         n = n->rb_left;
563                 } else if (tid > req->r_tid) {
564                         n = n->rb_right;
565                 } else {
566                         return req;
567                 }
568         }
569         return NULL;
570 }
571
572 /*
573  * Resubmit requests pending on the given osd.
574  */
575 static void __kick_osd_requests(struct ceph_osd_client *osdc,
576                                 struct ceph_osd *osd)
577 {
578         struct ceph_osd_request *req, *nreq;
579         int err;
580
581         dout("__kick_osd_requests osd%d\n", osd->o_osd);
582         err = __reset_osd(osdc, osd);
583         if (err == -EAGAIN)
584                 return;
585
586         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
587                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
588                 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
589                      osd->o_osd);
590                 if (!req->r_linger)
591                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
592         }
593
594         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
595                                  r_linger_osd) {
596                 /*
597                  * reregister request prior to unregistering linger so
598                  * that r_osd is preserved.
599                  */
600                 BUG_ON(!list_empty(&req->r_req_lru_item));
601                 __register_request(osdc, req);
602                 list_add(&req->r_req_lru_item, &osdc->req_unsent);
603                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
604                 __unregister_linger_request(osdc, req);
605                 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
606                      osd->o_osd);
607         }
608 }
609
610 static void kick_osd_requests(struct ceph_osd_client *osdc,
611                               struct ceph_osd *kickosd)
612 {
613         mutex_lock(&osdc->request_mutex);
614         __kick_osd_requests(osdc, kickosd);
615         mutex_unlock(&osdc->request_mutex);
616 }
617
618 /*
619  * If the osd connection drops, we need to resubmit all requests.
620  */
621 static void osd_reset(struct ceph_connection *con)
622 {
623         struct ceph_osd *osd = con->private;
624         struct ceph_osd_client *osdc;
625
626         if (!osd)
627                 return;
628         dout("osd_reset osd%d\n", osd->o_osd);
629         osdc = osd->o_osdc;
630         down_read(&osdc->map_sem);
631         kick_osd_requests(osdc, osd);
632         send_queued(osdc);
633         up_read(&osdc->map_sem);
634 }
635
636 /*
637  * Track open sessions with osds.
638  */
639 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
640 {
641         struct ceph_osd *osd;
642
643         osd = kzalloc(sizeof(*osd), GFP_NOFS);
644         if (!osd)
645                 return NULL;
646
647         atomic_set(&osd->o_ref, 1);
648         osd->o_osdc = osdc;
649         osd->o_osd = onum;
650         INIT_LIST_HEAD(&osd->o_requests);
651         INIT_LIST_HEAD(&osd->o_linger_requests);
652         INIT_LIST_HEAD(&osd->o_osd_lru);
653         osd->o_incarnation = 1;
654
655         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
656
657         INIT_LIST_HEAD(&osd->o_keepalive_item);
658         return osd;
659 }
660
661 static struct ceph_osd *get_osd(struct ceph_osd *osd)
662 {
663         if (atomic_inc_not_zero(&osd->o_ref)) {
664                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
665                      atomic_read(&osd->o_ref));
666                 return osd;
667         } else {
668                 dout("get_osd %p FAIL\n", osd);
669                 return NULL;
670         }
671 }
672
673 static void put_osd(struct ceph_osd *osd)
674 {
675         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
676              atomic_read(&osd->o_ref) - 1);
677         if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
678                 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
679
680                 if (ac->ops && ac->ops->destroy_authorizer)
681                         ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
682                 kfree(osd);
683         }
684 }
685
686 /*
687  * remove an osd from our map
688  */
689 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
690 {
691         dout("__remove_osd %p\n", osd);
692         BUG_ON(!list_empty(&osd->o_requests));
693         rb_erase(&osd->o_node, &osdc->osds);
694         list_del_init(&osd->o_osd_lru);
695         ceph_con_close(&osd->o_con);
696         put_osd(osd);
697 }
698
699 static void remove_all_osds(struct ceph_osd_client *osdc)
700 {
701         dout("%s %p\n", __func__, osdc);
702         mutex_lock(&osdc->request_mutex);
703         while (!RB_EMPTY_ROOT(&osdc->osds)) {
704                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
705                                                 struct ceph_osd, o_node);
706                 __remove_osd(osdc, osd);
707         }
708         mutex_unlock(&osdc->request_mutex);
709 }
710
711 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
712                               struct ceph_osd *osd)
713 {
714         dout("__move_osd_to_lru %p\n", osd);
715         BUG_ON(!list_empty(&osd->o_osd_lru));
716         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
717         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
718 }
719
720 static void __remove_osd_from_lru(struct ceph_osd *osd)
721 {
722         dout("__remove_osd_from_lru %p\n", osd);
723         if (!list_empty(&osd->o_osd_lru))
724                 list_del_init(&osd->o_osd_lru);
725 }
726
727 static void remove_old_osds(struct ceph_osd_client *osdc)
728 {
729         struct ceph_osd *osd, *nosd;
730
731         dout("__remove_old_osds %p\n", osdc);
732         mutex_lock(&osdc->request_mutex);
733         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
734                 if (time_before(jiffies, osd->lru_ttl))
735                         break;
736                 __remove_osd(osdc, osd);
737         }
738         mutex_unlock(&osdc->request_mutex);
739 }
740
741 /*
742  * reset osd connect
743  */
744 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
745 {
746         struct ceph_osd_request *req;
747         int ret = 0;
748
749         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
750         if (list_empty(&osd->o_requests) &&
751             list_empty(&osd->o_linger_requests)) {
752                 __remove_osd(osdc, osd);
753         } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754                           &osd->o_con.peer_addr,
755                           sizeof(osd->o_con.peer_addr)) == 0 &&
756                    !ceph_con_opened(&osd->o_con)) {
757                 dout(" osd addr hasn't changed and connection never opened,"
758                      " letting msgr retry");
759                 /* touch each r_stamp for handle_timeout()'s benfit */
760                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
761                         req->r_stamp = jiffies;
762                 ret = -EAGAIN;
763         } else {
764                 ceph_con_close(&osd->o_con);
765                 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
766                               &osdc->osdmap->osd_addr[osd->o_osd]);
767                 osd->o_incarnation++;
768         }
769         return ret;
770 }
771
772 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
773 {
774         struct rb_node **p = &osdc->osds.rb_node;
775         struct rb_node *parent = NULL;
776         struct ceph_osd *osd = NULL;
777
778         dout("__insert_osd %p osd%d\n", new, new->o_osd);
779         while (*p) {
780                 parent = *p;
781                 osd = rb_entry(parent, struct ceph_osd, o_node);
782                 if (new->o_osd < osd->o_osd)
783                         p = &(*p)->rb_left;
784                 else if (new->o_osd > osd->o_osd)
785                         p = &(*p)->rb_right;
786                 else
787                         BUG();
788         }
789
790         rb_link_node(&new->o_node, parent, p);
791         rb_insert_color(&new->o_node, &osdc->osds);
792 }
793
794 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
795 {
796         struct ceph_osd *osd;
797         struct rb_node *n = osdc->osds.rb_node;
798
799         while (n) {
800                 osd = rb_entry(n, struct ceph_osd, o_node);
801                 if (o < osd->o_osd)
802                         n = n->rb_left;
803                 else if (o > osd->o_osd)
804                         n = n->rb_right;
805                 else
806                         return osd;
807         }
808         return NULL;
809 }
810
811 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
812 {
813         schedule_delayed_work(&osdc->timeout_work,
814                         osdc->client->options->osd_keepalive_timeout * HZ);
815 }
816
817 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
818 {
819         cancel_delayed_work(&osdc->timeout_work);
820 }
821
822 /*
823  * Register request, assign tid.  If this is the first request, set up
824  * the timeout event.
825  */
826 static void __register_request(struct ceph_osd_client *osdc,
827                                struct ceph_osd_request *req)
828 {
829         req->r_tid = ++osdc->last_tid;
830         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
831         dout("__register_request %p tid %lld\n", req, req->r_tid);
832         __insert_request(osdc, req);
833         ceph_osdc_get_request(req);
834         osdc->num_requests++;
835         if (osdc->num_requests == 1) {
836                 dout(" first request, scheduling timeout\n");
837                 __schedule_osd_timeout(osdc);
838         }
839 }
840
841 static void register_request(struct ceph_osd_client *osdc,
842                              struct ceph_osd_request *req)
843 {
844         mutex_lock(&osdc->request_mutex);
845         __register_request(osdc, req);
846         mutex_unlock(&osdc->request_mutex);
847 }
848
849 /*
850  * called under osdc->request_mutex
851  */
852 static void __unregister_request(struct ceph_osd_client *osdc,
853                                  struct ceph_osd_request *req)
854 {
855         if (RB_EMPTY_NODE(&req->r_node)) {
856                 dout("__unregister_request %p tid %lld not registered\n",
857                         req, req->r_tid);
858                 return;
859         }
860
861         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
862         rb_erase(&req->r_node, &osdc->requests);
863         osdc->num_requests--;
864
865         if (req->r_osd) {
866                 /* make sure the original request isn't in flight. */
867                 ceph_msg_revoke(req->r_request);
868
869                 list_del_init(&req->r_osd_item);
870                 if (list_empty(&req->r_osd->o_requests) &&
871                     list_empty(&req->r_osd->o_linger_requests)) {
872                         dout("moving osd to %p lru\n", req->r_osd);
873                         __move_osd_to_lru(osdc, req->r_osd);
874                 }
875                 if (list_empty(&req->r_linger_item))
876                         req->r_osd = NULL;
877         }
878
879         ceph_osdc_put_request(req);
880
881         list_del_init(&req->r_req_lru_item);
882         if (osdc->num_requests == 0) {
883                 dout(" no requests, canceling timeout\n");
884                 __cancel_osd_timeout(osdc);
885         }
886 }
887
888 /*
889  * Cancel a previously queued request message
890  */
891 static void __cancel_request(struct ceph_osd_request *req)
892 {
893         if (req->r_sent && req->r_osd) {
894                 ceph_msg_revoke(req->r_request);
895                 req->r_sent = 0;
896         }
897 }
898
899 static void __register_linger_request(struct ceph_osd_client *osdc,
900                                     struct ceph_osd_request *req)
901 {
902         dout("__register_linger_request %p\n", req);
903         list_add_tail(&req->r_linger_item, &osdc->req_linger);
904         if (req->r_osd)
905                 list_add_tail(&req->r_linger_osd,
906                               &req->r_osd->o_linger_requests);
907 }
908
909 static void __unregister_linger_request(struct ceph_osd_client *osdc,
910                                         struct ceph_osd_request *req)
911 {
912         dout("__unregister_linger_request %p\n", req);
913         if (req->r_osd) {
914                 list_del_init(&req->r_linger_item);
915                 list_del_init(&req->r_linger_osd);
916
917                 if (list_empty(&req->r_osd->o_requests) &&
918                     list_empty(&req->r_osd->o_linger_requests)) {
919                         dout("moving osd to %p lru\n", req->r_osd);
920                         __move_osd_to_lru(osdc, req->r_osd);
921                 }
922                 if (list_empty(&req->r_osd_item))
923                         req->r_osd = NULL;
924         }
925 }
926
927 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
928                                          struct ceph_osd_request *req)
929 {
930         mutex_lock(&osdc->request_mutex);
931         if (req->r_linger) {
932                 __unregister_linger_request(osdc, req);
933                 ceph_osdc_put_request(req);
934         }
935         mutex_unlock(&osdc->request_mutex);
936 }
937 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
938
939 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
940                                   struct ceph_osd_request *req)
941 {
942         if (!req->r_linger) {
943                 dout("set_request_linger %p\n", req);
944                 req->r_linger = 1;
945                 /*
946                  * caller is now responsible for calling
947                  * unregister_linger_request
948                  */
949                 ceph_osdc_get_request(req);
950         }
951 }
952 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
953
954 /*
955  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
956  * (as needed), and set the request r_osd appropriately.  If there is
957  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
958  * (unsent, homeless) or leave on in-flight lru.
959  *
960  * Return 0 if unchanged, 1 if changed, or negative on error.
961  *
962  * Caller should hold map_sem for read and request_mutex.
963  */
964 static int __map_request(struct ceph_osd_client *osdc,
965                          struct ceph_osd_request *req, int force_resend)
966 {
967         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
968         struct ceph_pg pgid;
969         int acting[CEPH_PG_MAX_SIZE];
970         int o = -1, num = 0;
971         int err;
972
973         dout("map_request %p tid %lld\n", req, req->r_tid);
974         err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
975                                       &req->r_file_layout, osdc->osdmap);
976         if (err) {
977                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
978                 return err;
979         }
980         pgid = reqhead->layout.ol_pgid;
981         req->r_pgid = pgid;
982
983         err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
984         if (err > 0) {
985                 o = acting[0];
986                 num = err;
987         }
988
989         if ((!force_resend &&
990              req->r_osd && req->r_osd->o_osd == o &&
991              req->r_sent >= req->r_osd->o_incarnation &&
992              req->r_num_pg_osds == num &&
993              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
994             (req->r_osd == NULL && o == -1))
995                 return 0;  /* no change */
996
997         dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
998              req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
999              req->r_osd ? req->r_osd->o_osd : -1);
1000
1001         /* record full pg acting set */
1002         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1003         req->r_num_pg_osds = num;
1004
1005         if (req->r_osd) {
1006                 __cancel_request(req);
1007                 list_del_init(&req->r_osd_item);
1008                 req->r_osd = NULL;
1009         }
1010
1011         req->r_osd = __lookup_osd(osdc, o);
1012         if (!req->r_osd && o >= 0) {
1013                 err = -ENOMEM;
1014                 req->r_osd = create_osd(osdc, o);
1015                 if (!req->r_osd) {
1016                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
1017                         goto out;
1018                 }
1019
1020                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1021                 __insert_osd(osdc, req->r_osd);
1022
1023                 ceph_con_open(&req->r_osd->o_con,
1024                               CEPH_ENTITY_TYPE_OSD, o,
1025                               &osdc->osdmap->osd_addr[o]);
1026         }
1027
1028         if (req->r_osd) {
1029                 __remove_osd_from_lru(req->r_osd);
1030                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
1031                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1032         } else {
1033                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1034         }
1035         err = 1;   /* osd or pg changed */
1036
1037 out:
1038         return err;
1039 }
1040
1041 /*
1042  * caller should hold map_sem (for read) and request_mutex
1043  */
1044 static void __send_request(struct ceph_osd_client *osdc,
1045                            struct ceph_osd_request *req)
1046 {
1047         struct ceph_osd_request_head *reqhead;
1048
1049         dout("send_request %p tid %llu to osd%d flags %d\n",
1050              req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1051
1052         reqhead = req->r_request->front.iov_base;
1053         reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1054         reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1055         reqhead->reassert_version = req->r_reassert_version;
1056
1057         req->r_stamp = jiffies;
1058         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1059
1060         ceph_msg_get(req->r_request); /* send consumes a ref */
1061         ceph_con_send(&req->r_osd->o_con, req->r_request);
1062         req->r_sent = req->r_osd->o_incarnation;
1063 }
1064
1065 /*
1066  * Send any requests in the queue (req_unsent).
1067  */
1068 static void send_queued(struct ceph_osd_client *osdc)
1069 {
1070         struct ceph_osd_request *req, *tmp;
1071
1072         dout("send_queued\n");
1073         mutex_lock(&osdc->request_mutex);
1074         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1075                 __send_request(osdc, req);
1076         }
1077         mutex_unlock(&osdc->request_mutex);
1078 }
1079
1080 /*
1081  * Timeout callback, called every N seconds when 1 or more osd
1082  * requests has been active for more than N seconds.  When this
1083  * happens, we ping all OSDs with requests who have timed out to
1084  * ensure any communications channel reset is detected.  Reset the
1085  * request timeouts another N seconds in the future as we go.
1086  * Reschedule the timeout event another N seconds in future (unless
1087  * there are no open requests).
1088  */
1089 static void handle_timeout(struct work_struct *work)
1090 {
1091         struct ceph_osd_client *osdc =
1092                 container_of(work, struct ceph_osd_client, timeout_work.work);
1093         struct ceph_osd_request *req, *last_req = NULL;
1094         struct ceph_osd *osd;
1095         unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1096         unsigned long keepalive =
1097                 osdc->client->options->osd_keepalive_timeout * HZ;
1098         unsigned long last_stamp = 0;
1099         struct list_head slow_osds;
1100         dout("timeout\n");
1101         down_read(&osdc->map_sem);
1102
1103         ceph_monc_request_next_osdmap(&osdc->client->monc);
1104
1105         mutex_lock(&osdc->request_mutex);
1106
1107         /*
1108          * reset osds that appear to be _really_ unresponsive.  this
1109          * is a failsafe measure.. we really shouldn't be getting to
1110          * this point if the system is working properly.  the monitors
1111          * should mark the osd as failed and we should find out about
1112          * it from an updated osd map.
1113          */
1114         while (timeout && !list_empty(&osdc->req_lru)) {
1115                 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1116                                  r_req_lru_item);
1117
1118                 /* hasn't been long enough since we sent it? */
1119                 if (time_before(jiffies, req->r_stamp + timeout))
1120                         break;
1121
1122                 /* hasn't been long enough since it was acked? */
1123                 if (req->r_request->ack_stamp == 0 ||
1124                     time_before(jiffies, req->r_request->ack_stamp + timeout))
1125                         break;
1126
1127                 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1128                 last_req = req;
1129                 last_stamp = req->r_stamp;
1130
1131                 osd = req->r_osd;
1132                 BUG_ON(!osd);
1133                 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1134                            req->r_tid, osd->o_osd);
1135                 __kick_osd_requests(osdc, osd);
1136         }
1137
1138         /*
1139          * ping osds that are a bit slow.  this ensures that if there
1140          * is a break in the TCP connection we will notice, and reopen
1141          * a connection with that osd (from the fault callback).
1142          */
1143         INIT_LIST_HEAD(&slow_osds);
1144         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1145                 if (time_before(jiffies, req->r_stamp + keepalive))
1146                         break;
1147
1148                 osd = req->r_osd;
1149                 BUG_ON(!osd);
1150                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1151                      req->r_tid, osd->o_osd);
1152                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1153         }
1154         while (!list_empty(&slow_osds)) {
1155                 osd = list_entry(slow_osds.next, struct ceph_osd,
1156                                  o_keepalive_item);
1157                 list_del_init(&osd->o_keepalive_item);
1158                 ceph_con_keepalive(&osd->o_con);
1159         }
1160
1161         __schedule_osd_timeout(osdc);
1162         mutex_unlock(&osdc->request_mutex);
1163         send_queued(osdc);
1164         up_read(&osdc->map_sem);
1165 }
1166
1167 static void handle_osds_timeout(struct work_struct *work)
1168 {
1169         struct ceph_osd_client *osdc =
1170                 container_of(work, struct ceph_osd_client,
1171                              osds_timeout_work.work);
1172         unsigned long delay =
1173                 osdc->client->options->osd_idle_ttl * HZ >> 2;
1174
1175         dout("osds timeout\n");
1176         down_read(&osdc->map_sem);
1177         remove_old_osds(osdc);
1178         up_read(&osdc->map_sem);
1179
1180         schedule_delayed_work(&osdc->osds_timeout_work,
1181                               round_jiffies_relative(delay));
1182 }
1183
1184 static void complete_request(struct ceph_osd_request *req)
1185 {
1186         if (req->r_safe_callback)
1187                 req->r_safe_callback(req, NULL);
1188         complete_all(&req->r_safe_completion);  /* fsync waiter */
1189 }
1190
1191 /*
1192  * handle osd op reply.  either call the callback if it is specified,
1193  * or do the completion to wake up the waiting thread.
1194  */
1195 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1196                          struct ceph_connection *con)
1197 {
1198         struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1199         struct ceph_osd_request *req;
1200         u64 tid;
1201         int numops, object_len, flags;
1202         s32 result;
1203
1204         tid = le64_to_cpu(msg->hdr.tid);
1205         if (msg->front.iov_len < sizeof(*rhead))
1206                 goto bad;
1207         numops = le32_to_cpu(rhead->num_ops);
1208         object_len = le32_to_cpu(rhead->object_len);
1209         result = le32_to_cpu(rhead->result);
1210         if (msg->front.iov_len != sizeof(*rhead) + object_len +
1211             numops * sizeof(struct ceph_osd_op))
1212                 goto bad;
1213         dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1214         /* lookup */
1215         mutex_lock(&osdc->request_mutex);
1216         req = __lookup_request(osdc, tid);
1217         if (req == NULL) {
1218                 dout("handle_reply tid %llu dne\n", tid);
1219                 mutex_unlock(&osdc->request_mutex);
1220                 return;
1221         }
1222         ceph_osdc_get_request(req);
1223         flags = le32_to_cpu(rhead->flags);
1224
1225         /*
1226          * if this connection filled our message, drop our reference now, to
1227          * avoid a (safe but slower) revoke later.
1228          */
1229         if (req->r_con_filling_msg == con && req->r_reply == msg) {
1230                 dout(" dropping con_filling_msg ref %p\n", con);
1231                 req->r_con_filling_msg = NULL;
1232                 con->ops->put(con);
1233         }
1234
1235         if (!req->r_got_reply) {
1236                 unsigned int bytes;
1237
1238                 req->r_result = le32_to_cpu(rhead->result);
1239                 bytes = le32_to_cpu(msg->hdr.data_len);
1240                 dout("handle_reply result %d bytes %d\n", req->r_result,
1241                      bytes);
1242                 if (req->r_result == 0)
1243                         req->r_result = bytes;
1244
1245                 /* in case this is a write and we need to replay, */
1246                 req->r_reassert_version = rhead->reassert_version;
1247
1248                 req->r_got_reply = 1;
1249         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1250                 dout("handle_reply tid %llu dup ack\n", tid);
1251                 mutex_unlock(&osdc->request_mutex);
1252                 goto done;
1253         }
1254
1255         dout("handle_reply tid %llu flags %d\n", tid, flags);
1256
1257         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1258                 __register_linger_request(osdc, req);
1259
1260         /* either this is a read, or we got the safe response */
1261         if (result < 0 ||
1262             (flags & CEPH_OSD_FLAG_ONDISK) ||
1263             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1264                 __unregister_request(osdc, req);
1265
1266         mutex_unlock(&osdc->request_mutex);
1267
1268         if (req->r_callback)
1269                 req->r_callback(req, msg);
1270         else
1271                 complete_all(&req->r_completion);
1272
1273         if (flags & CEPH_OSD_FLAG_ONDISK)
1274                 complete_request(req);
1275
1276 done:
1277         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1278         ceph_osdc_put_request(req);
1279         return;
1280
1281 bad:
1282         pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1283                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1284                (int)sizeof(*rhead));
1285         ceph_msg_dump(msg);
1286 }
1287
1288 static void reset_changed_osds(struct ceph_osd_client *osdc)
1289 {
1290         struct rb_node *p, *n;
1291
1292         for (p = rb_first(&osdc->osds); p; p = n) {
1293                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1294
1295                 n = rb_next(p);
1296                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1297                     memcmp(&osd->o_con.peer_addr,
1298                            ceph_osd_addr(osdc->osdmap,
1299                                          osd->o_osd),
1300                            sizeof(struct ceph_entity_addr)) != 0)
1301                         __reset_osd(osdc, osd);
1302         }
1303 }
1304
1305 /*
1306  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1307  * no osd, request a new map.
1308  *
1309  * Caller should hold map_sem for read and request_mutex.
1310  */
1311 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1312 {
1313         struct ceph_osd_request *req, *nreq;
1314         struct rb_node *p;
1315         int needmap = 0;
1316         int err;
1317
1318         dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1319         mutex_lock(&osdc->request_mutex);
1320         for (p = rb_first(&osdc->requests); p; ) {
1321                 req = rb_entry(p, struct ceph_osd_request, r_node);
1322                 p = rb_next(p);
1323                 err = __map_request(osdc, req, force_resend);
1324                 if (err < 0)
1325                         continue;  /* error */
1326                 if (req->r_osd == NULL) {
1327                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1328                         needmap++;  /* request a newer map */
1329                 } else if (err > 0) {
1330                         if (!req->r_linger) {
1331                                 dout("%p tid %llu requeued on osd%d\n", req,
1332                                      req->r_tid,
1333                                      req->r_osd ? req->r_osd->o_osd : -1);
1334                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1335                         }
1336                 }
1337                 if (req->r_linger && list_empty(&req->r_linger_item)) {
1338                         /*
1339                          * register as a linger so that we will
1340                          * re-submit below and get a new tid
1341                          */
1342                         dout("%p tid %llu restart on osd%d\n",
1343                              req, req->r_tid,
1344                              req->r_osd ? req->r_osd->o_osd : -1);
1345                         __register_linger_request(osdc, req);
1346                         __unregister_request(osdc, req);
1347                 }
1348         }
1349
1350         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1351                                  r_linger_item) {
1352                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1353
1354                 err = __map_request(osdc, req, force_resend);
1355                 if (err == 0)
1356                         continue;  /* no change and no osd was specified */
1357                 if (err < 0)
1358                         continue;  /* hrm! */
1359                 if (req->r_osd == NULL) {
1360                         dout("tid %llu maps to no valid osd\n", req->r_tid);
1361                         needmap++;  /* request a newer map */
1362                         continue;
1363                 }
1364
1365                 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1366                      req->r_osd ? req->r_osd->o_osd : -1);
1367                 __unregister_linger_request(osdc, req);
1368                 __register_request(osdc, req);
1369         }
1370         mutex_unlock(&osdc->request_mutex);
1371
1372         if (needmap) {
1373                 dout("%d requests for down osds, need new map\n", needmap);
1374                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1375         }
1376 }
1377
1378
1379 /*
1380  * Process updated osd map.
1381  *
1382  * The message contains any number of incremental and full maps, normally
1383  * indicating some sort of topology change in the cluster.  Kick requests
1384  * off to different OSDs as needed.
1385  */
1386 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1387 {
1388         void *p, *end, *next;
1389         u32 nr_maps, maplen;
1390         u32 epoch;
1391         struct ceph_osdmap *newmap = NULL, *oldmap;
1392         int err;
1393         struct ceph_fsid fsid;
1394
1395         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1396         p = msg->front.iov_base;
1397         end = p + msg->front.iov_len;
1398
1399         /* verify fsid */
1400         ceph_decode_need(&p, end, sizeof(fsid), bad);
1401         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1402         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1403                 return;
1404
1405         down_write(&osdc->map_sem);
1406
1407         /* incremental maps */
1408         ceph_decode_32_safe(&p, end, nr_maps, bad);
1409         dout(" %d inc maps\n", nr_maps);
1410         while (nr_maps > 0) {
1411                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1412                 epoch = ceph_decode_32(&p);
1413                 maplen = ceph_decode_32(&p);
1414                 ceph_decode_need(&p, end, maplen, bad);
1415                 next = p + maplen;
1416                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1417                         dout("applying incremental map %u len %d\n",
1418                              epoch, maplen);
1419                         newmap = osdmap_apply_incremental(&p, next,
1420                                                           osdc->osdmap,
1421                                                           &osdc->client->msgr);
1422                         if (IS_ERR(newmap)) {
1423                                 err = PTR_ERR(newmap);
1424                                 goto bad;
1425                         }
1426                         BUG_ON(!newmap);
1427                         if (newmap != osdc->osdmap) {
1428                                 ceph_osdmap_destroy(osdc->osdmap);
1429                                 osdc->osdmap = newmap;
1430                         }
1431                         kick_requests(osdc, 0);
1432                         reset_changed_osds(osdc);
1433                 } else {
1434                         dout("ignoring incremental map %u len %d\n",
1435                              epoch, maplen);
1436                 }
1437                 p = next;
1438                 nr_maps--;
1439         }
1440         if (newmap)
1441                 goto done;
1442
1443         /* full maps */
1444         ceph_decode_32_safe(&p, end, nr_maps, bad);
1445         dout(" %d full maps\n", nr_maps);
1446         while (nr_maps) {
1447                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1448                 epoch = ceph_decode_32(&p);
1449                 maplen = ceph_decode_32(&p);
1450                 ceph_decode_need(&p, end, maplen, bad);
1451                 if (nr_maps > 1) {
1452                         dout("skipping non-latest full map %u len %d\n",
1453                              epoch, maplen);
1454                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1455                         dout("skipping full map %u len %d, "
1456                              "older than our %u\n", epoch, maplen,
1457                              osdc->osdmap->epoch);
1458                 } else {
1459                         int skipped_map = 0;
1460
1461                         dout("taking full map %u len %d\n", epoch, maplen);
1462                         newmap = osdmap_decode(&p, p+maplen);
1463                         if (IS_ERR(newmap)) {
1464                                 err = PTR_ERR(newmap);
1465                                 goto bad;
1466                         }
1467                         BUG_ON(!newmap);
1468                         oldmap = osdc->osdmap;
1469                         osdc->osdmap = newmap;
1470                         if (oldmap) {
1471                                 if (oldmap->epoch + 1 < newmap->epoch)
1472                                         skipped_map = 1;
1473                                 ceph_osdmap_destroy(oldmap);
1474                         }
1475                         kick_requests(osdc, skipped_map);
1476                 }
1477                 p += maplen;
1478                 nr_maps--;
1479         }
1480
1481 done:
1482         downgrade_write(&osdc->map_sem);
1483         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1484
1485         /*
1486          * subscribe to subsequent osdmap updates if full to ensure
1487          * we find out when we are no longer full and stop returning
1488          * ENOSPC.
1489          */
1490         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1491                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1492
1493         send_queued(osdc);
1494         up_read(&osdc->map_sem);
1495         wake_up_all(&osdc->client->auth_wq);
1496         return;
1497
1498 bad:
1499         pr_err("osdc handle_map corrupt msg\n");
1500         ceph_msg_dump(msg);
1501         up_write(&osdc->map_sem);
1502         return;
1503 }
1504
1505 /*
1506  * watch/notify callback event infrastructure
1507  *
1508  * These callbacks are used both for watch and notify operations.
1509  */
1510 static void __release_event(struct kref *kref)
1511 {
1512         struct ceph_osd_event *event =
1513                 container_of(kref, struct ceph_osd_event, kref);
1514
1515         dout("__release_event %p\n", event);
1516         kfree(event);
1517 }
1518
1519 static void get_event(struct ceph_osd_event *event)
1520 {
1521         kref_get(&event->kref);
1522 }
1523
1524 void ceph_osdc_put_event(struct ceph_osd_event *event)
1525 {
1526         kref_put(&event->kref, __release_event);
1527 }
1528 EXPORT_SYMBOL(ceph_osdc_put_event);
1529
1530 static void __insert_event(struct ceph_osd_client *osdc,
1531                              struct ceph_osd_event *new)
1532 {
1533         struct rb_node **p = &osdc->event_tree.rb_node;
1534         struct rb_node *parent = NULL;
1535         struct ceph_osd_event *event = NULL;
1536
1537         while (*p) {
1538                 parent = *p;
1539                 event = rb_entry(parent, struct ceph_osd_event, node);
1540                 if (new->cookie < event->cookie)
1541                         p = &(*p)->rb_left;
1542                 else if (new->cookie > event->cookie)
1543                         p = &(*p)->rb_right;
1544                 else
1545                         BUG();
1546         }
1547
1548         rb_link_node(&new->node, parent, p);
1549         rb_insert_color(&new->node, &osdc->event_tree);
1550 }
1551
1552 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1553                                                 u64 cookie)
1554 {
1555         struct rb_node **p = &osdc->event_tree.rb_node;
1556         struct rb_node *parent = NULL;
1557         struct ceph_osd_event *event = NULL;
1558
1559         while (*p) {
1560                 parent = *p;
1561                 event = rb_entry(parent, struct ceph_osd_event, node);
1562                 if (cookie < event->cookie)
1563                         p = &(*p)->rb_left;
1564                 else if (cookie > event->cookie)
1565                         p = &(*p)->rb_right;
1566                 else
1567                         return event;
1568         }
1569         return NULL;
1570 }
1571
1572 static void __remove_event(struct ceph_osd_event *event)
1573 {
1574         struct ceph_osd_client *osdc = event->osdc;
1575
1576         if (!RB_EMPTY_NODE(&event->node)) {
1577                 dout("__remove_event removed %p\n", event);
1578                 rb_erase(&event->node, &osdc->event_tree);
1579                 ceph_osdc_put_event(event);
1580         } else {
1581                 dout("__remove_event didn't remove %p\n", event);
1582         }
1583 }
1584
1585 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1586                            void (*event_cb)(u64, u64, u8, void *),
1587                            int one_shot, void *data,
1588                            struct ceph_osd_event **pevent)
1589 {
1590         struct ceph_osd_event *event;
1591
1592         event = kmalloc(sizeof(*event), GFP_NOIO);
1593         if (!event)
1594                 return -ENOMEM;
1595
1596         dout("create_event %p\n", event);
1597         event->cb = event_cb;
1598         event->one_shot = one_shot;
1599         event->data = data;
1600         event->osdc = osdc;
1601         INIT_LIST_HEAD(&event->osd_node);
1602         kref_init(&event->kref);   /* one ref for us */
1603         kref_get(&event->kref);    /* one ref for the caller */
1604         init_completion(&event->completion);
1605
1606         spin_lock(&osdc->event_lock);
1607         event->cookie = ++osdc->event_count;
1608         __insert_event(osdc, event);
1609         spin_unlock(&osdc->event_lock);
1610
1611         *pevent = event;
1612         return 0;
1613 }
1614 EXPORT_SYMBOL(ceph_osdc_create_event);
1615
1616 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1617 {
1618         struct ceph_osd_client *osdc = event->osdc;
1619
1620         dout("cancel_event %p\n", event);
1621         spin_lock(&osdc->event_lock);
1622         __remove_event(event);
1623         spin_unlock(&osdc->event_lock);
1624         ceph_osdc_put_event(event); /* caller's */
1625 }
1626 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1627
1628
1629 static void do_event_work(struct work_struct *work)
1630 {
1631         struct ceph_osd_event_work *event_work =
1632                 container_of(work, struct ceph_osd_event_work, work);
1633         struct ceph_osd_event *event = event_work->event;
1634         u64 ver = event_work->ver;
1635         u64 notify_id = event_work->notify_id;
1636         u8 opcode = event_work->opcode;
1637
1638         dout("do_event_work completing %p\n", event);
1639         event->cb(ver, notify_id, opcode, event->data);
1640         complete(&event->completion);
1641         dout("do_event_work completed %p\n", event);
1642         ceph_osdc_put_event(event);
1643         kfree(event_work);
1644 }
1645
1646
1647 /*
1648  * Process osd watch notifications
1649  */
1650 void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1651 {
1652         void *p, *end;
1653         u8 proto_ver;
1654         u64 cookie, ver, notify_id;
1655         u8 opcode;
1656         struct ceph_osd_event *event;
1657         struct ceph_osd_event_work *event_work;
1658
1659         p = msg->front.iov_base;
1660         end = p + msg->front.iov_len;
1661
1662         ceph_decode_8_safe(&p, end, proto_ver, bad);
1663         ceph_decode_8_safe(&p, end, opcode, bad);
1664         ceph_decode_64_safe(&p, end, cookie, bad);
1665         ceph_decode_64_safe(&p, end, ver, bad);
1666         ceph_decode_64_safe(&p, end, notify_id, bad);
1667
1668         spin_lock(&osdc->event_lock);
1669         event = __find_event(osdc, cookie);
1670         if (event) {
1671                 get_event(event);
1672                 if (event->one_shot)
1673                         __remove_event(event);
1674         }
1675         spin_unlock(&osdc->event_lock);
1676         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1677              cookie, ver, event);
1678         if (event) {
1679                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1680                 if (!event_work) {
1681                         dout("ERROR: could not allocate event_work\n");
1682                         goto done_err;
1683                 }
1684                 INIT_WORK(&event_work->work, do_event_work);
1685                 event_work->event = event;
1686                 event_work->ver = ver;
1687                 event_work->notify_id = notify_id;
1688                 event_work->opcode = opcode;
1689                 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1690                         dout("WARNING: failed to queue notify event work\n");
1691                         goto done_err;
1692                 }
1693         }
1694
1695         return;
1696
1697 done_err:
1698         complete(&event->completion);
1699         ceph_osdc_put_event(event);
1700         return;
1701
1702 bad:
1703         pr_err("osdc handle_watch_notify corrupt msg\n");
1704         return;
1705 }
1706
1707 int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1708 {
1709         int err;
1710
1711         dout("wait_event %p\n", event);
1712         err = wait_for_completion_interruptible_timeout(&event->completion,
1713                                                         timeout * HZ);
1714         ceph_osdc_put_event(event);
1715         if (err > 0)
1716                 err = 0;
1717         dout("wait_event %p returns %d\n", event, err);
1718         return err;
1719 }
1720 EXPORT_SYMBOL(ceph_osdc_wait_event);
1721
1722 /*
1723  * Register request, send initial attempt.
1724  */
1725 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1726                             struct ceph_osd_request *req,
1727                             bool nofail)
1728 {
1729         int rc = 0;
1730
1731         req->r_request->pages = req->r_pages;
1732         req->r_request->nr_pages = req->r_num_pages;
1733 #ifdef CONFIG_BLOCK
1734         req->r_request->bio = req->r_bio;
1735 #endif
1736         req->r_request->trail = req->r_trail;
1737
1738         register_request(osdc, req);
1739
1740         down_read(&osdc->map_sem);
1741         mutex_lock(&osdc->request_mutex);
1742         /*
1743          * a racing kick_requests() may have sent the message for us
1744          * while we dropped request_mutex above, so only send now if
1745          * the request still han't been touched yet.
1746          */
1747         if (req->r_sent == 0) {
1748                 rc = __map_request(osdc, req, 0);
1749                 if (rc < 0) {
1750                         if (nofail) {
1751                                 dout("osdc_start_request failed map, "
1752                                      " will retry %lld\n", req->r_tid);
1753                                 rc = 0;
1754                         }
1755                         goto out_unlock;
1756                 }
1757                 if (req->r_osd == NULL) {
1758                         dout("send_request %p no up osds in pg\n", req);
1759                         ceph_monc_request_next_osdmap(&osdc->client->monc);
1760                 } else {
1761                         __send_request(osdc, req);
1762                 }
1763                 rc = 0;
1764         }
1765
1766 out_unlock:
1767         mutex_unlock(&osdc->request_mutex);
1768         up_read(&osdc->map_sem);
1769         return rc;
1770 }
1771 EXPORT_SYMBOL(ceph_osdc_start_request);
1772
1773 /*
1774  * wait for a request to complete
1775  */
1776 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1777                            struct ceph_osd_request *req)
1778 {
1779         int rc;
1780
1781         rc = wait_for_completion_interruptible(&req->r_completion);
1782         if (rc < 0) {
1783                 mutex_lock(&osdc->request_mutex);
1784                 __cancel_request(req);
1785                 __unregister_request(osdc, req);
1786                 mutex_unlock(&osdc->request_mutex);
1787                 complete_request(req);
1788                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1789                 return rc;
1790         }
1791
1792         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1793         return req->r_result;
1794 }
1795 EXPORT_SYMBOL(ceph_osdc_wait_request);
1796
1797 /*
1798  * sync - wait for all in-flight requests to flush.  avoid starvation.
1799  */
1800 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1801 {
1802         struct ceph_osd_request *req;
1803         u64 last_tid, next_tid = 0;
1804
1805         mutex_lock(&osdc->request_mutex);
1806         last_tid = osdc->last_tid;
1807         while (1) {
1808                 req = __lookup_request_ge(osdc, next_tid);
1809                 if (!req)
1810                         break;
1811                 if (req->r_tid > last_tid)
1812                         break;
1813
1814                 next_tid = req->r_tid + 1;
1815                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1816                         continue;
1817
1818                 ceph_osdc_get_request(req);
1819                 mutex_unlock(&osdc->request_mutex);
1820                 dout("sync waiting on tid %llu (last is %llu)\n",
1821                      req->r_tid, last_tid);
1822                 wait_for_completion(&req->r_safe_completion);
1823                 mutex_lock(&osdc->request_mutex);
1824                 ceph_osdc_put_request(req);
1825         }
1826         mutex_unlock(&osdc->request_mutex);
1827         dout("sync done (thru tid %llu)\n", last_tid);
1828 }
1829 EXPORT_SYMBOL(ceph_osdc_sync);
1830
1831 /*
1832  * init, shutdown
1833  */
1834 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1835 {
1836         int err;
1837
1838         dout("init\n");
1839         osdc->client = client;
1840         osdc->osdmap = NULL;
1841         init_rwsem(&osdc->map_sem);
1842         init_completion(&osdc->map_waiters);
1843         osdc->last_requested_map = 0;
1844         mutex_init(&osdc->request_mutex);
1845         osdc->last_tid = 0;
1846         osdc->osds = RB_ROOT;
1847         INIT_LIST_HEAD(&osdc->osd_lru);
1848         osdc->requests = RB_ROOT;
1849         INIT_LIST_HEAD(&osdc->req_lru);
1850         INIT_LIST_HEAD(&osdc->req_unsent);
1851         INIT_LIST_HEAD(&osdc->req_notarget);
1852         INIT_LIST_HEAD(&osdc->req_linger);
1853         osdc->num_requests = 0;
1854         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1855         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1856         spin_lock_init(&osdc->event_lock);
1857         osdc->event_tree = RB_ROOT;
1858         osdc->event_count = 0;
1859
1860         schedule_delayed_work(&osdc->osds_timeout_work,
1861            round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1862
1863         err = -ENOMEM;
1864         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1865                                         sizeof(struct ceph_osd_request));
1866         if (!osdc->req_mempool)
1867                 goto out;
1868
1869         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1870                                 OSD_OP_FRONT_LEN, 10, true,
1871                                 "osd_op");
1872         if (err < 0)
1873                 goto out_mempool;
1874         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1875                                 OSD_OPREPLY_FRONT_LEN, 10, true,
1876                                 "osd_op_reply");
1877         if (err < 0)
1878                 goto out_msgpool;
1879
1880         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1881         if (IS_ERR(osdc->notify_wq)) {
1882                 err = PTR_ERR(osdc->notify_wq);
1883                 osdc->notify_wq = NULL;
1884                 goto out_msgpool;
1885         }
1886         return 0;
1887
1888 out_msgpool:
1889         ceph_msgpool_destroy(&osdc->msgpool_op);
1890 out_mempool:
1891         mempool_destroy(osdc->req_mempool);
1892 out:
1893         return err;
1894 }
1895 EXPORT_SYMBOL(ceph_osdc_init);
1896
1897 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1898 {
1899         flush_workqueue(osdc->notify_wq);
1900         destroy_workqueue(osdc->notify_wq);
1901         cancel_delayed_work_sync(&osdc->timeout_work);
1902         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1903         if (osdc->osdmap) {
1904                 ceph_osdmap_destroy(osdc->osdmap);
1905                 osdc->osdmap = NULL;
1906         }
1907         remove_all_osds(osdc);
1908         mempool_destroy(osdc->req_mempool);
1909         ceph_msgpool_destroy(&osdc->msgpool_op);
1910         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1911 }
1912 EXPORT_SYMBOL(ceph_osdc_stop);
1913
1914 /*
1915  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1916  * *plen.  Return number of bytes read, or error.
1917  */
1918 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1919                         struct ceph_vino vino, struct ceph_file_layout *layout,
1920                         u64 off, u64 *plen,
1921                         u32 truncate_seq, u64 truncate_size,
1922                         struct page **pages, int num_pages, int page_align)
1923 {
1924         struct ceph_osd_request *req;
1925         int rc = 0;
1926
1927         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1928              vino.snap, off, *plen);
1929         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1930                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1931                                     NULL, 0, truncate_seq, truncate_size, NULL,
1932                                     false, 1, page_align);
1933         if (IS_ERR(req))
1934                 return PTR_ERR(req);
1935
1936         /* it may be a short read due to an object boundary */
1937         req->r_pages = pages;
1938
1939         dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1940              off, *plen, req->r_num_pages, page_align);
1941
1942         rc = ceph_osdc_start_request(osdc, req, false);
1943         if (!rc)
1944                 rc = ceph_osdc_wait_request(osdc, req);
1945
1946         ceph_osdc_put_request(req);
1947         dout("readpages result %d\n", rc);
1948         return rc;
1949 }
1950 EXPORT_SYMBOL(ceph_osdc_readpages);
1951
1952 /*
1953  * do a synchronous write on N pages
1954  */
1955 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1956                          struct ceph_file_layout *layout,
1957                          struct ceph_snap_context *snapc,
1958                          u64 off, u64 len,
1959                          u32 truncate_seq, u64 truncate_size,
1960                          struct timespec *mtime,
1961                          struct page **pages, int num_pages,
1962                          int flags, int do_sync, bool nofail)
1963 {
1964         struct ceph_osd_request *req;
1965         int rc = 0;
1966         int page_align = off & ~PAGE_MASK;
1967
1968         BUG_ON(vino.snap != CEPH_NOSNAP);
1969         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1970                                     CEPH_OSD_OP_WRITE,
1971                                     flags | CEPH_OSD_FLAG_ONDISK |
1972                                             CEPH_OSD_FLAG_WRITE,
1973                                     snapc, do_sync,
1974                                     truncate_seq, truncate_size, mtime,
1975                                     nofail, 1, page_align);
1976         if (IS_ERR(req))
1977                 return PTR_ERR(req);
1978
1979         /* it may be a short write due to an object boundary */
1980         req->r_pages = pages;
1981         dout("writepages %llu~%llu (%d pages)\n", off, len,
1982              req->r_num_pages);
1983
1984         rc = ceph_osdc_start_request(osdc, req, nofail);
1985         if (!rc)
1986                 rc = ceph_osdc_wait_request(osdc, req);
1987
1988         ceph_osdc_put_request(req);
1989         if (rc == 0)
1990                 rc = len;
1991         dout("writepages result %d\n", rc);
1992         return rc;
1993 }
1994 EXPORT_SYMBOL(ceph_osdc_writepages);
1995
1996 /*
1997  * handle incoming message
1998  */
1999 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2000 {
2001         struct ceph_osd *osd = con->private;
2002         struct ceph_osd_client *osdc;
2003         int type = le16_to_cpu(msg->hdr.type);
2004
2005         if (!osd)
2006                 goto out;
2007         osdc = osd->o_osdc;
2008
2009         switch (type) {
2010         case CEPH_MSG_OSD_MAP:
2011                 ceph_osdc_handle_map(osdc, msg);
2012                 break;
2013         case CEPH_MSG_OSD_OPREPLY:
2014                 handle_reply(osdc, msg, con);
2015                 break;
2016         case CEPH_MSG_WATCH_NOTIFY:
2017                 handle_watch_notify(osdc, msg);
2018                 break;
2019
2020         default:
2021                 pr_err("received unknown message type %d %s\n", type,
2022                        ceph_msg_type_name(type));
2023         }
2024 out:
2025         ceph_msg_put(msg);
2026 }
2027
2028 /*
2029  * lookup and return message for incoming reply.  set up reply message
2030  * pages.
2031  */
2032 static struct ceph_msg *get_reply(struct ceph_connection *con,
2033                                   struct ceph_msg_header *hdr,
2034                                   int *skip)
2035 {
2036         struct ceph_osd *osd = con->private;
2037         struct ceph_osd_client *osdc = osd->o_osdc;
2038         struct ceph_msg *m;
2039         struct ceph_osd_request *req;
2040         int front = le32_to_cpu(hdr->front_len);
2041         int data_len = le32_to_cpu(hdr->data_len);
2042         u64 tid;
2043
2044         tid = le64_to_cpu(hdr->tid);
2045         mutex_lock(&osdc->request_mutex);
2046         req = __lookup_request(osdc, tid);
2047         if (!req) {
2048                 *skip = 1;
2049                 m = NULL;
2050                 dout("get_reply unknown tid %llu from osd%d\n", tid,
2051                      osd->o_osd);
2052                 goto out;
2053         }
2054
2055         if (req->r_con_filling_msg) {
2056                 dout("%s revoking msg %p from old con %p\n", __func__,
2057                      req->r_reply, req->r_con_filling_msg);
2058                 ceph_msg_revoke_incoming(req->r_reply);
2059                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2060                 req->r_con_filling_msg = NULL;
2061         }
2062
2063         if (front > req->r_reply->front.iov_len) {
2064                 pr_warning("get_reply front %d > preallocated %d\n",
2065                            front, (int)req->r_reply->front.iov_len);
2066                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2067                 if (!m)
2068                         goto out;
2069                 ceph_msg_put(req->r_reply);
2070                 req->r_reply = m;
2071         }
2072         m = ceph_msg_get(req->r_reply);
2073
2074         if (data_len > 0) {
2075                 int want = calc_pages_for(req->r_page_alignment, data_len);
2076
2077                 if (unlikely(req->r_num_pages < want)) {
2078                         pr_warning("tid %lld reply has %d bytes %d pages, we"
2079                                    " had only %d pages ready\n", tid, data_len,
2080                                    want, req->r_num_pages);
2081                         *skip = 1;
2082                         ceph_msg_put(m);
2083                         m = NULL;
2084                         goto out;
2085                 }
2086                 m->pages = req->r_pages;
2087                 m->nr_pages = req->r_num_pages;
2088                 m->page_alignment = req->r_page_alignment;
2089 #ifdef CONFIG_BLOCK
2090                 m->bio = req->r_bio;
2091 #endif
2092         }
2093         *skip = 0;
2094         req->r_con_filling_msg = con->ops->get(con);
2095         dout("get_reply tid %lld %p\n", tid, m);
2096
2097 out:
2098         mutex_unlock(&osdc->request_mutex);
2099         return m;
2100
2101 }
2102
2103 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2104                                   struct ceph_msg_header *hdr,
2105                                   int *skip)
2106 {
2107         struct ceph_osd *osd = con->private;
2108         int type = le16_to_cpu(hdr->type);
2109         int front = le32_to_cpu(hdr->front_len);
2110
2111         *skip = 0;
2112         switch (type) {
2113         case CEPH_MSG_OSD_MAP:
2114         case CEPH_MSG_WATCH_NOTIFY:
2115                 return ceph_msg_new(type, front, GFP_NOFS, false);
2116         case CEPH_MSG_OSD_OPREPLY:
2117                 return get_reply(con, hdr, skip);
2118         default:
2119                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2120                         osd->o_osd);
2121                 *skip = 1;
2122                 return NULL;
2123         }
2124 }
2125
2126 /*
2127  * Wrappers to refcount containing ceph_osd struct
2128  */
2129 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2130 {
2131         struct ceph_osd *osd = con->private;
2132         if (get_osd(osd))
2133                 return con;
2134         return NULL;
2135 }
2136
2137 static void put_osd_con(struct ceph_connection *con)
2138 {
2139         struct ceph_osd *osd = con->private;
2140         put_osd(osd);
2141 }
2142
2143 /*
2144  * authentication
2145  */
2146 /*
2147  * Note: returned pointer is the address of a structure that's
2148  * managed separately.  Caller must *not* attempt to free it.
2149  */
2150 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2151                                         int *proto, int force_new)
2152 {
2153         struct ceph_osd *o = con->private;
2154         struct ceph_osd_client *osdc = o->o_osdc;
2155         struct ceph_auth_client *ac = osdc->client->monc.auth;
2156         struct ceph_auth_handshake *auth = &o->o_auth;
2157
2158         if (force_new && auth->authorizer) {
2159                 if (ac->ops && ac->ops->destroy_authorizer)
2160                         ac->ops->destroy_authorizer(ac, auth->authorizer);
2161                 auth->authorizer = NULL;
2162         }
2163         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2164                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2165                                                         auth);
2166                 if (ret)
2167                         return ERR_PTR(ret);
2168         }
2169         *proto = ac->protocol;
2170
2171         return auth;
2172 }
2173
2174
2175 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2176 {
2177         struct ceph_osd *o = con->private;
2178         struct ceph_osd_client *osdc = o->o_osdc;
2179         struct ceph_auth_client *ac = osdc->client->monc.auth;
2180
2181         /*
2182          * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2183          * XXX which do we do:  succeed or fail?
2184          */
2185         return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2186 }
2187
2188 static int invalidate_authorizer(struct ceph_connection *con)
2189 {
2190         struct ceph_osd *o = con->private;
2191         struct ceph_osd_client *osdc = o->o_osdc;
2192         struct ceph_auth_client *ac = osdc->client->monc.auth;
2193
2194         if (ac->ops && ac->ops->invalidate_authorizer)
2195                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2196
2197         return ceph_monc_validate_auth(&osdc->client->monc);
2198 }
2199
2200 static const struct ceph_connection_operations osd_con_ops = {
2201         .get = get_osd_con,
2202         .put = put_osd_con,
2203         .dispatch = dispatch,
2204         .get_authorizer = get_authorizer,
2205         .verify_authorizer_reply = verify_authorizer_reply,
2206         .invalidate_authorizer = invalidate_authorizer,
2207         .alloc_msg = alloc_msg,
2208         .fault = osd_reset,
2209 };