netfilter: nfnetlink: silence warning if CONFIG_PROVE_RCU isn't set
[cascardo/linux.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 #include <linux/ceph/ceph_features.h>
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage.  Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid.  If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44
45 struct ceph_reconnect_state {
46         struct ceph_pagelist *pagelist;
47         bool flock;
48 };
49
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51                             struct list_head *head);
52
53 static const struct ceph_connection_operations mds_con_ops;
54
55
56 /*
57  * mds reply parsing
58  */
59
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
64                                struct ceph_mds_reply_info_in *info,
65                                int features)
66 {
67         int err = -EIO;
68
69         info->in = *p;
70         *p += sizeof(struct ceph_mds_reply_inode) +
71                 sizeof(*info->in->fragtree.splits) *
72                 le32_to_cpu(info->in->fragtree.nsplits);
73
74         ceph_decode_32_safe(p, end, info->symlink_len, bad);
75         ceph_decode_need(p, end, info->symlink_len, bad);
76         info->symlink = *p;
77         *p += info->symlink_len;
78
79         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
80                 ceph_decode_copy_safe(p, end, &info->dir_layout,
81                                       sizeof(info->dir_layout), bad);
82         else
83                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
84
85         ceph_decode_32_safe(p, end, info->xattr_len, bad);
86         ceph_decode_need(p, end, info->xattr_len, bad);
87         info->xattr_data = *p;
88         *p += info->xattr_len;
89         return 0;
90 bad:
91         return err;
92 }
93
94 /*
95  * parse a normal reply, which may contain a (dir+)dentry and/or a
96  * target inode.
97  */
98 static int parse_reply_info_trace(void **p, void *end,
99                                   struct ceph_mds_reply_info_parsed *info,
100                                   int features)
101 {
102         int err;
103
104         if (info->head->is_dentry) {
105                 err = parse_reply_info_in(p, end, &info->diri, features);
106                 if (err < 0)
107                         goto out_bad;
108
109                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
110                         goto bad;
111                 info->dirfrag = *p;
112                 *p += sizeof(*info->dirfrag) +
113                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
114                 if (unlikely(*p > end))
115                         goto bad;
116
117                 ceph_decode_32_safe(p, end, info->dname_len, bad);
118                 ceph_decode_need(p, end, info->dname_len, bad);
119                 info->dname = *p;
120                 *p += info->dname_len;
121                 info->dlease = *p;
122                 *p += sizeof(*info->dlease);
123         }
124
125         if (info->head->is_target) {
126                 err = parse_reply_info_in(p, end, &info->targeti, features);
127                 if (err < 0)
128                         goto out_bad;
129         }
130
131         if (unlikely(*p != end))
132                 goto bad;
133         return 0;
134
135 bad:
136         err = -EIO;
137 out_bad:
138         pr_err("problem parsing mds trace %d\n", err);
139         return err;
140 }
141
142 /*
143  * parse readdir results
144  */
145 static int parse_reply_info_dir(void **p, void *end,
146                                 struct ceph_mds_reply_info_parsed *info,
147                                 int features)
148 {
149         u32 num, i = 0;
150         int err;
151
152         info->dir_dir = *p;
153         if (*p + sizeof(*info->dir_dir) > end)
154                 goto bad;
155         *p += sizeof(*info->dir_dir) +
156                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
157         if (*p > end)
158                 goto bad;
159
160         ceph_decode_need(p, end, sizeof(num) + 2, bad);
161         num = ceph_decode_32(p);
162         info->dir_end = ceph_decode_8(p);
163         info->dir_complete = ceph_decode_8(p);
164         if (num == 0)
165                 goto done;
166
167         /* alloc large array */
168         info->dir_nr = num;
169         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
170                                sizeof(*info->dir_dname) +
171                                sizeof(*info->dir_dname_len) +
172                                sizeof(*info->dir_dlease),
173                                GFP_NOFS);
174         if (info->dir_in == NULL) {
175                 err = -ENOMEM;
176                 goto out_bad;
177         }
178         info->dir_dname = (void *)(info->dir_in + num);
179         info->dir_dname_len = (void *)(info->dir_dname + num);
180         info->dir_dlease = (void *)(info->dir_dname_len + num);
181
182         while (num) {
183                 /* dentry */
184                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
185                 info->dir_dname_len[i] = ceph_decode_32(p);
186                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187                 info->dir_dname[i] = *p;
188                 *p += info->dir_dname_len[i];
189                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190                      info->dir_dname[i]);
191                 info->dir_dlease[i] = *p;
192                 *p += sizeof(struct ceph_mds_reply_lease);
193
194                 /* inode */
195                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196                 if (err < 0)
197                         goto out_bad;
198                 i++;
199                 num--;
200         }
201
202 done:
203         if (*p != end)
204                 goto bad;
205         return 0;
206
207 bad:
208         err = -EIO;
209 out_bad:
210         pr_err("problem parsing dir contents %d\n", err);
211         return err;
212 }
213
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218                                      struct ceph_mds_reply_info_parsed *info,
219                                      int features)
220 {
221         if (*p + sizeof(*info->filelock_reply) > end)
222                 goto bad;
223
224         info->filelock_reply = *p;
225         *p += sizeof(*info->filelock_reply);
226
227         if (unlikely(*p != end))
228                 goto bad;
229         return 0;
230
231 bad:
232         return -EIO;
233 }
234
235 /*
236  * parse extra results
237  */
238 static int parse_reply_info_extra(void **p, void *end,
239                                   struct ceph_mds_reply_info_parsed *info,
240                                   int features)
241 {
242         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
243                 return parse_reply_info_filelock(p, end, info, features);
244         else
245                 return parse_reply_info_dir(p, end, info, features);
246 }
247
248 /*
249  * parse entire mds reply
250  */
251 static int parse_reply_info(struct ceph_msg *msg,
252                             struct ceph_mds_reply_info_parsed *info,
253                             int features)
254 {
255         void *p, *end;
256         u32 len;
257         int err;
258
259         info->head = msg->front.iov_base;
260         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
261         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
262
263         /* trace */
264         ceph_decode_32_safe(&p, end, len, bad);
265         if (len > 0) {
266                 ceph_decode_need(&p, end, len, bad);
267                 err = parse_reply_info_trace(&p, p+len, info, features);
268                 if (err < 0)
269                         goto out_bad;
270         }
271
272         /* extra */
273         ceph_decode_32_safe(&p, end, len, bad);
274         if (len > 0) {
275                 ceph_decode_need(&p, end, len, bad);
276                 err = parse_reply_info_extra(&p, p+len, info, features);
277                 if (err < 0)
278                         goto out_bad;
279         }
280
281         /* snap blob */
282         ceph_decode_32_safe(&p, end, len, bad);
283         info->snapblob_len = len;
284         info->snapblob = p;
285         p += len;
286
287         if (p != end)
288                 goto bad;
289         return 0;
290
291 bad:
292         err = -EIO;
293 out_bad:
294         pr_err("mds parse_reply err %d\n", err);
295         return err;
296 }
297
298 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
299 {
300         kfree(info->dir_in);
301 }
302
303
304 /*
305  * sessions
306  */
307 static const char *session_state_name(int s)
308 {
309         switch (s) {
310         case CEPH_MDS_SESSION_NEW: return "new";
311         case CEPH_MDS_SESSION_OPENING: return "opening";
312         case CEPH_MDS_SESSION_OPEN: return "open";
313         case CEPH_MDS_SESSION_HUNG: return "hung";
314         case CEPH_MDS_SESSION_CLOSING: return "closing";
315         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
316         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
317         default: return "???";
318         }
319 }
320
321 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
322 {
323         if (atomic_inc_not_zero(&s->s_ref)) {
324                 dout("mdsc get_session %p %d -> %d\n", s,
325                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
326                 return s;
327         } else {
328                 dout("mdsc get_session %p 0 -- FAIL", s);
329                 return NULL;
330         }
331 }
332
333 void ceph_put_mds_session(struct ceph_mds_session *s)
334 {
335         dout("mdsc put_session %p %d -> %d\n", s,
336              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
337         if (atomic_dec_and_test(&s->s_ref)) {
338                 if (s->s_auth.authorizer)
339                      s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
340                              s->s_mdsc->fsc->client->monc.auth,
341                              s->s_auth.authorizer);
342                 kfree(s);
343         }
344 }
345
346 /*
347  * called under mdsc->mutex
348  */
349 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
350                                                    int mds)
351 {
352         struct ceph_mds_session *session;
353
354         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
355                 return NULL;
356         session = mdsc->sessions[mds];
357         dout("lookup_mds_session %p %d\n", session,
358              atomic_read(&session->s_ref));
359         get_session(session);
360         return session;
361 }
362
363 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
364 {
365         if (mds >= mdsc->max_sessions)
366                 return false;
367         return mdsc->sessions[mds];
368 }
369
370 static int __verify_registered_session(struct ceph_mds_client *mdsc,
371                                        struct ceph_mds_session *s)
372 {
373         if (s->s_mds >= mdsc->max_sessions ||
374             mdsc->sessions[s->s_mds] != s)
375                 return -ENOENT;
376         return 0;
377 }
378
379 /*
380  * create+register a new session for given mds.
381  * called under mdsc->mutex.
382  */
383 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
384                                                  int mds)
385 {
386         struct ceph_mds_session *s;
387
388         s = kzalloc(sizeof(*s), GFP_NOFS);
389         if (!s)
390                 return ERR_PTR(-ENOMEM);
391         s->s_mdsc = mdsc;
392         s->s_mds = mds;
393         s->s_state = CEPH_MDS_SESSION_NEW;
394         s->s_ttl = 0;
395         s->s_seq = 0;
396         mutex_init(&s->s_mutex);
397
398         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
399
400         spin_lock_init(&s->s_gen_ttl_lock);
401         s->s_cap_gen = 0;
402         s->s_cap_ttl = jiffies - 1;
403
404         spin_lock_init(&s->s_cap_lock);
405         s->s_renew_requested = 0;
406         s->s_renew_seq = 0;
407         INIT_LIST_HEAD(&s->s_caps);
408         s->s_nr_caps = 0;
409         s->s_trim_caps = 0;
410         atomic_set(&s->s_ref, 1);
411         INIT_LIST_HEAD(&s->s_waiting);
412         INIT_LIST_HEAD(&s->s_unsafe);
413         s->s_num_cap_releases = 0;
414         s->s_cap_iterator = NULL;
415         INIT_LIST_HEAD(&s->s_cap_releases);
416         INIT_LIST_HEAD(&s->s_cap_releases_done);
417         INIT_LIST_HEAD(&s->s_cap_flushing);
418         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
419
420         dout("register_session mds%d\n", mds);
421         if (mds >= mdsc->max_sessions) {
422                 int newmax = 1 << get_count_order(mds+1);
423                 struct ceph_mds_session **sa;
424
425                 dout("register_session realloc to %d\n", newmax);
426                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
427                 if (sa == NULL)
428                         goto fail_realloc;
429                 if (mdsc->sessions) {
430                         memcpy(sa, mdsc->sessions,
431                                mdsc->max_sessions * sizeof(void *));
432                         kfree(mdsc->sessions);
433                 }
434                 mdsc->sessions = sa;
435                 mdsc->max_sessions = newmax;
436         }
437         mdsc->sessions[mds] = s;
438         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
439
440         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
441                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
442
443         return s;
444
445 fail_realloc:
446         kfree(s);
447         return ERR_PTR(-ENOMEM);
448 }
449
450 /*
451  * called under mdsc->mutex
452  */
453 static void __unregister_session(struct ceph_mds_client *mdsc,
454                                struct ceph_mds_session *s)
455 {
456         dout("__unregister_session mds%d %p\n", s->s_mds, s);
457         BUG_ON(mdsc->sessions[s->s_mds] != s);
458         mdsc->sessions[s->s_mds] = NULL;
459         ceph_con_close(&s->s_con);
460         ceph_put_mds_session(s);
461 }
462
463 /*
464  * drop session refs in request.
465  *
466  * should be last request ref, or hold mdsc->mutex
467  */
468 static void put_request_session(struct ceph_mds_request *req)
469 {
470         if (req->r_session) {
471                 ceph_put_mds_session(req->r_session);
472                 req->r_session = NULL;
473         }
474 }
475
476 void ceph_mdsc_release_request(struct kref *kref)
477 {
478         struct ceph_mds_request *req = container_of(kref,
479                                                     struct ceph_mds_request,
480                                                     r_kref);
481         if (req->r_request)
482                 ceph_msg_put(req->r_request);
483         if (req->r_reply) {
484                 ceph_msg_put(req->r_reply);
485                 destroy_reply_info(&req->r_reply_info);
486         }
487         if (req->r_inode) {
488                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
489                 iput(req->r_inode);
490         }
491         if (req->r_locked_dir)
492                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
493         if (req->r_target_inode)
494                 iput(req->r_target_inode);
495         if (req->r_dentry)
496                 dput(req->r_dentry);
497         if (req->r_old_dentry) {
498                 /*
499                  * track (and drop pins for) r_old_dentry_dir
500                  * separately, since r_old_dentry's d_parent may have
501                  * changed between the dir mutex being dropped and
502                  * this request being freed.
503                  */
504                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
505                                   CEPH_CAP_PIN);
506                 dput(req->r_old_dentry);
507                 iput(req->r_old_dentry_dir);
508         }
509         kfree(req->r_path1);
510         kfree(req->r_path2);
511         put_request_session(req);
512         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
513         kfree(req);
514 }
515
516 /*
517  * lookup session, bump ref if found.
518  *
519  * called under mdsc->mutex.
520  */
521 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
522                                              u64 tid)
523 {
524         struct ceph_mds_request *req;
525         struct rb_node *n = mdsc->request_tree.rb_node;
526
527         while (n) {
528                 req = rb_entry(n, struct ceph_mds_request, r_node);
529                 if (tid < req->r_tid)
530                         n = n->rb_left;
531                 else if (tid > req->r_tid)
532                         n = n->rb_right;
533                 else {
534                         ceph_mdsc_get_request(req);
535                         return req;
536                 }
537         }
538         return NULL;
539 }
540
541 static void __insert_request(struct ceph_mds_client *mdsc,
542                              struct ceph_mds_request *new)
543 {
544         struct rb_node **p = &mdsc->request_tree.rb_node;
545         struct rb_node *parent = NULL;
546         struct ceph_mds_request *req = NULL;
547
548         while (*p) {
549                 parent = *p;
550                 req = rb_entry(parent, struct ceph_mds_request, r_node);
551                 if (new->r_tid < req->r_tid)
552                         p = &(*p)->rb_left;
553                 else if (new->r_tid > req->r_tid)
554                         p = &(*p)->rb_right;
555                 else
556                         BUG();
557         }
558
559         rb_link_node(&new->r_node, parent, p);
560         rb_insert_color(&new->r_node, &mdsc->request_tree);
561 }
562
563 /*
564  * Register an in-flight request, and assign a tid.  Link to directory
565  * are modifying (if any).
566  *
567  * Called under mdsc->mutex.
568  */
569 static void __register_request(struct ceph_mds_client *mdsc,
570                                struct ceph_mds_request *req,
571                                struct inode *dir)
572 {
573         req->r_tid = ++mdsc->last_tid;
574         if (req->r_num_caps)
575                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
576                                   req->r_num_caps);
577         dout("__register_request %p tid %lld\n", req, req->r_tid);
578         ceph_mdsc_get_request(req);
579         __insert_request(mdsc, req);
580
581         req->r_uid = current_fsuid();
582         req->r_gid = current_fsgid();
583
584         if (dir) {
585                 struct ceph_inode_info *ci = ceph_inode(dir);
586
587                 ihold(dir);
588                 spin_lock(&ci->i_unsafe_lock);
589                 req->r_unsafe_dir = dir;
590                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
591                 spin_unlock(&ci->i_unsafe_lock);
592         }
593 }
594
595 static void __unregister_request(struct ceph_mds_client *mdsc,
596                                  struct ceph_mds_request *req)
597 {
598         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
599         rb_erase(&req->r_node, &mdsc->request_tree);
600         RB_CLEAR_NODE(&req->r_node);
601
602         if (req->r_unsafe_dir) {
603                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
604
605                 spin_lock(&ci->i_unsafe_lock);
606                 list_del_init(&req->r_unsafe_dir_item);
607                 spin_unlock(&ci->i_unsafe_lock);
608
609                 iput(req->r_unsafe_dir);
610                 req->r_unsafe_dir = NULL;
611         }
612
613         ceph_mdsc_put_request(req);
614 }
615
616 /*
617  * Choose mds to send request to next.  If there is a hint set in the
618  * request (e.g., due to a prior forward hint from the mds), use that.
619  * Otherwise, consult frag tree and/or caps to identify the
620  * appropriate mds.  If all else fails, choose randomly.
621  *
622  * Called under mdsc->mutex.
623  */
624 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
625 {
626         /*
627          * we don't need to worry about protecting the d_parent access
628          * here because we never renaming inside the snapped namespace
629          * except to resplice to another snapdir, and either the old or new
630          * result is a valid result.
631          */
632         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
633                 dentry = dentry->d_parent;
634         return dentry;
635 }
636
637 static int __choose_mds(struct ceph_mds_client *mdsc,
638                         struct ceph_mds_request *req)
639 {
640         struct inode *inode;
641         struct ceph_inode_info *ci;
642         struct ceph_cap *cap;
643         int mode = req->r_direct_mode;
644         int mds = -1;
645         u32 hash = req->r_direct_hash;
646         bool is_hash = req->r_direct_is_hash;
647
648         /*
649          * is there a specific mds we should try?  ignore hint if we have
650          * no session and the mds is not up (active or recovering).
651          */
652         if (req->r_resend_mds >= 0 &&
653             (__have_session(mdsc, req->r_resend_mds) ||
654              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
655                 dout("choose_mds using resend_mds mds%d\n",
656                      req->r_resend_mds);
657                 return req->r_resend_mds;
658         }
659
660         if (mode == USE_RANDOM_MDS)
661                 goto random;
662
663         inode = NULL;
664         if (req->r_inode) {
665                 inode = req->r_inode;
666         } else if (req->r_dentry) {
667                 /* ignore race with rename; old or new d_parent is okay */
668                 struct dentry *parent = req->r_dentry->d_parent;
669                 struct inode *dir = parent->d_inode;
670
671                 if (dir->i_sb != mdsc->fsc->sb) {
672                         /* not this fs! */
673                         inode = req->r_dentry->d_inode;
674                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
675                         /* direct snapped/virtual snapdir requests
676                          * based on parent dir inode */
677                         struct dentry *dn = get_nonsnap_parent(parent);
678                         inode = dn->d_inode;
679                         dout("__choose_mds using nonsnap parent %p\n", inode);
680                 } else if (req->r_dentry->d_inode) {
681                         /* dentry target */
682                         inode = req->r_dentry->d_inode;
683                 } else {
684                         /* dir + name */
685                         inode = dir;
686                         hash = ceph_dentry_hash(dir, req->r_dentry);
687                         is_hash = true;
688                 }
689         }
690
691         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
692              (int)hash, mode);
693         if (!inode)
694                 goto random;
695         ci = ceph_inode(inode);
696
697         if (is_hash && S_ISDIR(inode->i_mode)) {
698                 struct ceph_inode_frag frag;
699                 int found;
700
701                 ceph_choose_frag(ci, hash, &frag, &found);
702                 if (found) {
703                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
704                                 u8 r;
705
706                                 /* choose a random replica */
707                                 get_random_bytes(&r, 1);
708                                 r %= frag.ndist;
709                                 mds = frag.dist[r];
710                                 dout("choose_mds %p %llx.%llx "
711                                      "frag %u mds%d (%d/%d)\n",
712                                      inode, ceph_vinop(inode),
713                                      frag.frag, mds,
714                                      (int)r, frag.ndist);
715                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
716                                     CEPH_MDS_STATE_ACTIVE)
717                                         return mds;
718                         }
719
720                         /* since this file/dir wasn't known to be
721                          * replicated, then we want to look for the
722                          * authoritative mds. */
723                         mode = USE_AUTH_MDS;
724                         if (frag.mds >= 0) {
725                                 /* choose auth mds */
726                                 mds = frag.mds;
727                                 dout("choose_mds %p %llx.%llx "
728                                      "frag %u mds%d (auth)\n",
729                                      inode, ceph_vinop(inode), frag.frag, mds);
730                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
731                                     CEPH_MDS_STATE_ACTIVE)
732                                         return mds;
733                         }
734                 }
735         }
736
737         spin_lock(&ci->i_ceph_lock);
738         cap = NULL;
739         if (mode == USE_AUTH_MDS)
740                 cap = ci->i_auth_cap;
741         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
742                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
743         if (!cap) {
744                 spin_unlock(&ci->i_ceph_lock);
745                 goto random;
746         }
747         mds = cap->session->s_mds;
748         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
749              inode, ceph_vinop(inode), mds,
750              cap == ci->i_auth_cap ? "auth " : "", cap);
751         spin_unlock(&ci->i_ceph_lock);
752         return mds;
753
754 random:
755         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
756         dout("choose_mds chose random mds%d\n", mds);
757         return mds;
758 }
759
760
761 /*
762  * session messages
763  */
764 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
765 {
766         struct ceph_msg *msg;
767         struct ceph_mds_session_head *h;
768
769         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
770                            false);
771         if (!msg) {
772                 pr_err("create_session_msg ENOMEM creating msg\n");
773                 return NULL;
774         }
775         h = msg->front.iov_base;
776         h->op = cpu_to_le32(op);
777         h->seq = cpu_to_le64(seq);
778         return msg;
779 }
780
781 /*
782  * send session open request.
783  *
784  * called under mdsc->mutex
785  */
786 static int __open_session(struct ceph_mds_client *mdsc,
787                           struct ceph_mds_session *session)
788 {
789         struct ceph_msg *msg;
790         int mstate;
791         int mds = session->s_mds;
792
793         /* wait for mds to go active? */
794         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
795         dout("open_session to mds%d (%s)\n", mds,
796              ceph_mds_state_name(mstate));
797         session->s_state = CEPH_MDS_SESSION_OPENING;
798         session->s_renew_requested = jiffies;
799
800         /* send connect message */
801         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
802         if (!msg)
803                 return -ENOMEM;
804         ceph_con_send(&session->s_con, msg);
805         return 0;
806 }
807
808 /*
809  * open sessions for any export targets for the given mds
810  *
811  * called under mdsc->mutex
812  */
813 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
814                                           struct ceph_mds_session *session)
815 {
816         struct ceph_mds_info *mi;
817         struct ceph_mds_session *ts;
818         int i, mds = session->s_mds;
819         int target;
820
821         if (mds >= mdsc->mdsmap->m_max_mds)
822                 return;
823         mi = &mdsc->mdsmap->m_info[mds];
824         dout("open_export_target_sessions for mds%d (%d targets)\n",
825              session->s_mds, mi->num_export_targets);
826
827         for (i = 0; i < mi->num_export_targets; i++) {
828                 target = mi->export_targets[i];
829                 ts = __ceph_lookup_mds_session(mdsc, target);
830                 if (!ts) {
831                         ts = register_session(mdsc, target);
832                         if (IS_ERR(ts))
833                                 return;
834                 }
835                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
836                     session->s_state == CEPH_MDS_SESSION_CLOSING)
837                         __open_session(mdsc, session);
838                 else
839                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
840                              i, ts, session_state_name(ts->s_state));
841                 ceph_put_mds_session(ts);
842         }
843 }
844
845 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
846                                            struct ceph_mds_session *session)
847 {
848         mutex_lock(&mdsc->mutex);
849         __open_export_target_sessions(mdsc, session);
850         mutex_unlock(&mdsc->mutex);
851 }
852
853 /*
854  * session caps
855  */
856
857 /*
858  * Free preallocated cap messages assigned to this session
859  */
860 static void cleanup_cap_releases(struct ceph_mds_session *session)
861 {
862         struct ceph_msg *msg;
863
864         spin_lock(&session->s_cap_lock);
865         while (!list_empty(&session->s_cap_releases)) {
866                 msg = list_first_entry(&session->s_cap_releases,
867                                        struct ceph_msg, list_head);
868                 list_del_init(&msg->list_head);
869                 ceph_msg_put(msg);
870         }
871         while (!list_empty(&session->s_cap_releases_done)) {
872                 msg = list_first_entry(&session->s_cap_releases_done,
873                                        struct ceph_msg, list_head);
874                 list_del_init(&msg->list_head);
875                 ceph_msg_put(msg);
876         }
877         spin_unlock(&session->s_cap_lock);
878 }
879
880 /*
881  * Helper to safely iterate over all caps associated with a session, with
882  * special care taken to handle a racing __ceph_remove_cap().
883  *
884  * Caller must hold session s_mutex.
885  */
886 static int iterate_session_caps(struct ceph_mds_session *session,
887                                  int (*cb)(struct inode *, struct ceph_cap *,
888                                             void *), void *arg)
889 {
890         struct list_head *p;
891         struct ceph_cap *cap;
892         struct inode *inode, *last_inode = NULL;
893         struct ceph_cap *old_cap = NULL;
894         int ret;
895
896         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
897         spin_lock(&session->s_cap_lock);
898         p = session->s_caps.next;
899         while (p != &session->s_caps) {
900                 cap = list_entry(p, struct ceph_cap, session_caps);
901                 inode = igrab(&cap->ci->vfs_inode);
902                 if (!inode) {
903                         p = p->next;
904                         continue;
905                 }
906                 session->s_cap_iterator = cap;
907                 spin_unlock(&session->s_cap_lock);
908
909                 if (last_inode) {
910                         iput(last_inode);
911                         last_inode = NULL;
912                 }
913                 if (old_cap) {
914                         ceph_put_cap(session->s_mdsc, old_cap);
915                         old_cap = NULL;
916                 }
917
918                 ret = cb(inode, cap, arg);
919                 last_inode = inode;
920
921                 spin_lock(&session->s_cap_lock);
922                 p = p->next;
923                 if (cap->ci == NULL) {
924                         dout("iterate_session_caps  finishing cap %p removal\n",
925                              cap);
926                         BUG_ON(cap->session != session);
927                         list_del_init(&cap->session_caps);
928                         session->s_nr_caps--;
929                         cap->session = NULL;
930                         old_cap = cap;  /* put_cap it w/o locks held */
931                 }
932                 if (ret < 0)
933                         goto out;
934         }
935         ret = 0;
936 out:
937         session->s_cap_iterator = NULL;
938         spin_unlock(&session->s_cap_lock);
939
940         if (last_inode)
941                 iput(last_inode);
942         if (old_cap)
943                 ceph_put_cap(session->s_mdsc, old_cap);
944
945         return ret;
946 }
947
948 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
949                                   void *arg)
950 {
951         struct ceph_inode_info *ci = ceph_inode(inode);
952         int drop = 0;
953
954         dout("removing cap %p, ci is %p, inode is %p\n",
955              cap, ci, &ci->vfs_inode);
956         spin_lock(&ci->i_ceph_lock);
957         __ceph_remove_cap(cap);
958         if (!__ceph_is_any_real_caps(ci)) {
959                 struct ceph_mds_client *mdsc =
960                         ceph_sb_to_client(inode->i_sb)->mdsc;
961
962                 spin_lock(&mdsc->cap_dirty_lock);
963                 if (!list_empty(&ci->i_dirty_item)) {
964                         pr_info(" dropping dirty %s state for %p %lld\n",
965                                 ceph_cap_string(ci->i_dirty_caps),
966                                 inode, ceph_ino(inode));
967                         ci->i_dirty_caps = 0;
968                         list_del_init(&ci->i_dirty_item);
969                         drop = 1;
970                 }
971                 if (!list_empty(&ci->i_flushing_item)) {
972                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
973                                 ceph_cap_string(ci->i_flushing_caps),
974                                 inode, ceph_ino(inode));
975                         ci->i_flushing_caps = 0;
976                         list_del_init(&ci->i_flushing_item);
977                         mdsc->num_cap_flushing--;
978                         drop = 1;
979                 }
980                 if (drop && ci->i_wrbuffer_ref) {
981                         pr_info(" dropping dirty data for %p %lld\n",
982                                 inode, ceph_ino(inode));
983                         ci->i_wrbuffer_ref = 0;
984                         ci->i_wrbuffer_ref_head = 0;
985                         drop++;
986                 }
987                 spin_unlock(&mdsc->cap_dirty_lock);
988         }
989         spin_unlock(&ci->i_ceph_lock);
990         while (drop--)
991                 iput(inode);
992         return 0;
993 }
994
995 /*
996  * caller must hold session s_mutex
997  */
998 static void remove_session_caps(struct ceph_mds_session *session)
999 {
1000         dout("remove_session_caps on %p\n", session);
1001         iterate_session_caps(session, remove_session_caps_cb, NULL);
1002         BUG_ON(session->s_nr_caps > 0);
1003         BUG_ON(!list_empty(&session->s_cap_flushing));
1004         cleanup_cap_releases(session);
1005 }
1006
1007 /*
1008  * wake up any threads waiting on this session's caps.  if the cap is
1009  * old (didn't get renewed on the client reconnect), remove it now.
1010  *
1011  * caller must hold s_mutex.
1012  */
1013 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1014                               void *arg)
1015 {
1016         struct ceph_inode_info *ci = ceph_inode(inode);
1017
1018         wake_up_all(&ci->i_cap_wq);
1019         if (arg) {
1020                 spin_lock(&ci->i_ceph_lock);
1021                 ci->i_wanted_max_size = 0;
1022                 ci->i_requested_max_size = 0;
1023                 spin_unlock(&ci->i_ceph_lock);
1024         }
1025         return 0;
1026 }
1027
1028 static void wake_up_session_caps(struct ceph_mds_session *session,
1029                                  int reconnect)
1030 {
1031         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1032         iterate_session_caps(session, wake_up_session_cb,
1033                              (void *)(unsigned long)reconnect);
1034 }
1035
1036 /*
1037  * Send periodic message to MDS renewing all currently held caps.  The
1038  * ack will reset the expiration for all caps from this session.
1039  *
1040  * caller holds s_mutex
1041  */
1042 static int send_renew_caps(struct ceph_mds_client *mdsc,
1043                            struct ceph_mds_session *session)
1044 {
1045         struct ceph_msg *msg;
1046         int state;
1047
1048         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1049             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1050                 pr_info("mds%d caps stale\n", session->s_mds);
1051         session->s_renew_requested = jiffies;
1052
1053         /* do not try to renew caps until a recovering mds has reconnected
1054          * with its clients. */
1055         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1056         if (state < CEPH_MDS_STATE_RECONNECT) {
1057                 dout("send_renew_caps ignoring mds%d (%s)\n",
1058                      session->s_mds, ceph_mds_state_name(state));
1059                 return 0;
1060         }
1061
1062         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1063                 ceph_mds_state_name(state));
1064         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1065                                  ++session->s_renew_seq);
1066         if (!msg)
1067                 return -ENOMEM;
1068         ceph_con_send(&session->s_con, msg);
1069         return 0;
1070 }
1071
1072 /*
1073  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1074  *
1075  * Called under session->s_mutex
1076  */
1077 static void renewed_caps(struct ceph_mds_client *mdsc,
1078                          struct ceph_mds_session *session, int is_renew)
1079 {
1080         int was_stale;
1081         int wake = 0;
1082
1083         spin_lock(&session->s_cap_lock);
1084         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1085
1086         session->s_cap_ttl = session->s_renew_requested +
1087                 mdsc->mdsmap->m_session_timeout*HZ;
1088
1089         if (was_stale) {
1090                 if (time_before(jiffies, session->s_cap_ttl)) {
1091                         pr_info("mds%d caps renewed\n", session->s_mds);
1092                         wake = 1;
1093                 } else {
1094                         pr_info("mds%d caps still stale\n", session->s_mds);
1095                 }
1096         }
1097         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1098              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1099              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1100         spin_unlock(&session->s_cap_lock);
1101
1102         if (wake)
1103                 wake_up_session_caps(session, 0);
1104 }
1105
1106 /*
1107  * send a session close request
1108  */
1109 static int request_close_session(struct ceph_mds_client *mdsc,
1110                                  struct ceph_mds_session *session)
1111 {
1112         struct ceph_msg *msg;
1113
1114         dout("request_close_session mds%d state %s seq %lld\n",
1115              session->s_mds, session_state_name(session->s_state),
1116              session->s_seq);
1117         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1118         if (!msg)
1119                 return -ENOMEM;
1120         ceph_con_send(&session->s_con, msg);
1121         return 0;
1122 }
1123
1124 /*
1125  * Called with s_mutex held.
1126  */
1127 static int __close_session(struct ceph_mds_client *mdsc,
1128                          struct ceph_mds_session *session)
1129 {
1130         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1131                 return 0;
1132         session->s_state = CEPH_MDS_SESSION_CLOSING;
1133         return request_close_session(mdsc, session);
1134 }
1135
1136 /*
1137  * Trim old(er) caps.
1138  *
1139  * Because we can't cache an inode without one or more caps, we do
1140  * this indirectly: if a cap is unused, we prune its aliases, at which
1141  * point the inode will hopefully get dropped to.
1142  *
1143  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1144  * memory pressure from the MDS, though, so it needn't be perfect.
1145  */
1146 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1147 {
1148         struct ceph_mds_session *session = arg;
1149         struct ceph_inode_info *ci = ceph_inode(inode);
1150         int used, oissued, mine;
1151
1152         if (session->s_trim_caps <= 0)
1153                 return -1;
1154
1155         spin_lock(&ci->i_ceph_lock);
1156         mine = cap->issued | cap->implemented;
1157         used = __ceph_caps_used(ci);
1158         oissued = __ceph_caps_issued_other(ci, cap);
1159
1160         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1161              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1162              ceph_cap_string(used));
1163         if (ci->i_dirty_caps)
1164                 goto out;   /* dirty caps */
1165         if ((used & ~oissued) & mine)
1166                 goto out;   /* we need these caps */
1167
1168         session->s_trim_caps--;
1169         if (oissued) {
1170                 /* we aren't the only cap.. just remove us */
1171                 __ceph_remove_cap(cap);
1172         } else {
1173                 /* try to drop referring dentries */
1174                 spin_unlock(&ci->i_ceph_lock);
1175                 d_prune_aliases(inode);
1176                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1177                      inode, cap, atomic_read(&inode->i_count));
1178                 return 0;
1179         }
1180
1181 out:
1182         spin_unlock(&ci->i_ceph_lock);
1183         return 0;
1184 }
1185
1186 /*
1187  * Trim session cap count down to some max number.
1188  */
1189 static int trim_caps(struct ceph_mds_client *mdsc,
1190                      struct ceph_mds_session *session,
1191                      int max_caps)
1192 {
1193         int trim_caps = session->s_nr_caps - max_caps;
1194
1195         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1196              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1197         if (trim_caps > 0) {
1198                 session->s_trim_caps = trim_caps;
1199                 iterate_session_caps(session, trim_caps_cb, session);
1200                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1201                      session->s_mds, session->s_nr_caps, max_caps,
1202                         trim_caps - session->s_trim_caps);
1203                 session->s_trim_caps = 0;
1204         }
1205         return 0;
1206 }
1207
1208 /*
1209  * Allocate cap_release messages.  If there is a partially full message
1210  * in the queue, try to allocate enough to cover it's remainder, so that
1211  * we can send it immediately.
1212  *
1213  * Called under s_mutex.
1214  */
1215 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1216                           struct ceph_mds_session *session)
1217 {
1218         struct ceph_msg *msg, *partial = NULL;
1219         struct ceph_mds_cap_release *head;
1220         int err = -ENOMEM;
1221         int extra = mdsc->fsc->mount_options->cap_release_safety;
1222         int num;
1223
1224         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1225              extra);
1226
1227         spin_lock(&session->s_cap_lock);
1228
1229         if (!list_empty(&session->s_cap_releases)) {
1230                 msg = list_first_entry(&session->s_cap_releases,
1231                                        struct ceph_msg,
1232                                  list_head);
1233                 head = msg->front.iov_base;
1234                 num = le32_to_cpu(head->num);
1235                 if (num) {
1236                         dout(" partial %p with (%d/%d)\n", msg, num,
1237                              (int)CEPH_CAPS_PER_RELEASE);
1238                         extra += CEPH_CAPS_PER_RELEASE - num;
1239                         partial = msg;
1240                 }
1241         }
1242         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1243                 spin_unlock(&session->s_cap_lock);
1244                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1245                                    GFP_NOFS, false);
1246                 if (!msg)
1247                         goto out_unlocked;
1248                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1249                      (int)msg->front.iov_len);
1250                 head = msg->front.iov_base;
1251                 head->num = cpu_to_le32(0);
1252                 msg->front.iov_len = sizeof(*head);
1253                 spin_lock(&session->s_cap_lock);
1254                 list_add(&msg->list_head, &session->s_cap_releases);
1255                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1256         }
1257
1258         if (partial) {
1259                 head = partial->front.iov_base;
1260                 num = le32_to_cpu(head->num);
1261                 dout(" queueing partial %p with %d/%d\n", partial, num,
1262                      (int)CEPH_CAPS_PER_RELEASE);
1263                 list_move_tail(&partial->list_head,
1264                                &session->s_cap_releases_done);
1265                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1266         }
1267         err = 0;
1268         spin_unlock(&session->s_cap_lock);
1269 out_unlocked:
1270         return err;
1271 }
1272
1273 /*
1274  * flush all dirty inode data to disk.
1275  *
1276  * returns true if we've flushed through want_flush_seq
1277  */
1278 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1279 {
1280         int mds, ret = 1;
1281
1282         dout("check_cap_flush want %lld\n", want_flush_seq);
1283         mutex_lock(&mdsc->mutex);
1284         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1285                 struct ceph_mds_session *session = mdsc->sessions[mds];
1286
1287                 if (!session)
1288                         continue;
1289                 get_session(session);
1290                 mutex_unlock(&mdsc->mutex);
1291
1292                 mutex_lock(&session->s_mutex);
1293                 if (!list_empty(&session->s_cap_flushing)) {
1294                         struct ceph_inode_info *ci =
1295                                 list_entry(session->s_cap_flushing.next,
1296                                            struct ceph_inode_info,
1297                                            i_flushing_item);
1298                         struct inode *inode = &ci->vfs_inode;
1299
1300                         spin_lock(&ci->i_ceph_lock);
1301                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1302                                 dout("check_cap_flush still flushing %p "
1303                                      "seq %lld <= %lld to mds%d\n", inode,
1304                                      ci->i_cap_flush_seq, want_flush_seq,
1305                                      session->s_mds);
1306                                 ret = 0;
1307                         }
1308                         spin_unlock(&ci->i_ceph_lock);
1309                 }
1310                 mutex_unlock(&session->s_mutex);
1311                 ceph_put_mds_session(session);
1312
1313                 if (!ret)
1314                         return ret;
1315                 mutex_lock(&mdsc->mutex);
1316         }
1317
1318         mutex_unlock(&mdsc->mutex);
1319         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1320         return ret;
1321 }
1322
1323 /*
1324  * called under s_mutex
1325  */
1326 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1327                             struct ceph_mds_session *session)
1328 {
1329         struct ceph_msg *msg;
1330
1331         dout("send_cap_releases mds%d\n", session->s_mds);
1332         spin_lock(&session->s_cap_lock);
1333         while (!list_empty(&session->s_cap_releases_done)) {
1334                 msg = list_first_entry(&session->s_cap_releases_done,
1335                                  struct ceph_msg, list_head);
1336                 list_del_init(&msg->list_head);
1337                 spin_unlock(&session->s_cap_lock);
1338                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1339                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1340                 ceph_con_send(&session->s_con, msg);
1341                 spin_lock(&session->s_cap_lock);
1342         }
1343         spin_unlock(&session->s_cap_lock);
1344 }
1345
1346 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1347                                  struct ceph_mds_session *session)
1348 {
1349         struct ceph_msg *msg;
1350         struct ceph_mds_cap_release *head;
1351         unsigned num;
1352
1353         dout("discard_cap_releases mds%d\n", session->s_mds);
1354         spin_lock(&session->s_cap_lock);
1355
1356         /* zero out the in-progress message */
1357         msg = list_first_entry(&session->s_cap_releases,
1358                                struct ceph_msg, list_head);
1359         head = msg->front.iov_base;
1360         num = le32_to_cpu(head->num);
1361         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1362         head->num = cpu_to_le32(0);
1363         session->s_num_cap_releases += num;
1364
1365         /* requeue completed messages */
1366         while (!list_empty(&session->s_cap_releases_done)) {
1367                 msg = list_first_entry(&session->s_cap_releases_done,
1368                                  struct ceph_msg, list_head);
1369                 list_del_init(&msg->list_head);
1370
1371                 head = msg->front.iov_base;
1372                 num = le32_to_cpu(head->num);
1373                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1374                      num);
1375                 session->s_num_cap_releases += num;
1376                 head->num = cpu_to_le32(0);
1377                 msg->front.iov_len = sizeof(*head);
1378                 list_add(&msg->list_head, &session->s_cap_releases);
1379         }
1380
1381         spin_unlock(&session->s_cap_lock);
1382 }
1383
1384 /*
1385  * requests
1386  */
1387
1388 /*
1389  * Create an mds request.
1390  */
1391 struct ceph_mds_request *
1392 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1393 {
1394         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1395
1396         if (!req)
1397                 return ERR_PTR(-ENOMEM);
1398
1399         mutex_init(&req->r_fill_mutex);
1400         req->r_mdsc = mdsc;
1401         req->r_started = jiffies;
1402         req->r_resend_mds = -1;
1403         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1404         req->r_fmode = -1;
1405         kref_init(&req->r_kref);
1406         INIT_LIST_HEAD(&req->r_wait);
1407         init_completion(&req->r_completion);
1408         init_completion(&req->r_safe_completion);
1409         INIT_LIST_HEAD(&req->r_unsafe_item);
1410
1411         req->r_op = op;
1412         req->r_direct_mode = mode;
1413         return req;
1414 }
1415
1416 /*
1417  * return oldest (lowest) request, tid in request tree, 0 if none.
1418  *
1419  * called under mdsc->mutex.
1420  */
1421 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1422 {
1423         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1424                 return NULL;
1425         return rb_entry(rb_first(&mdsc->request_tree),
1426                         struct ceph_mds_request, r_node);
1427 }
1428
1429 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1430 {
1431         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1432
1433         if (req)
1434                 return req->r_tid;
1435         return 0;
1436 }
1437
1438 /*
1439  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1440  * on build_path_from_dentry in fs/cifs/dir.c.
1441  *
1442  * If @stop_on_nosnap, generate path relative to the first non-snapped
1443  * inode.
1444  *
1445  * Encode hidden .snap dirs as a double /, i.e.
1446  *   foo/.snap/bar -> foo//bar
1447  */
1448 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1449                            int stop_on_nosnap)
1450 {
1451         struct dentry *temp;
1452         char *path;
1453         int len, pos;
1454         unsigned seq;
1455
1456         if (dentry == NULL)
1457                 return ERR_PTR(-EINVAL);
1458
1459 retry:
1460         len = 0;
1461         seq = read_seqbegin(&rename_lock);
1462         rcu_read_lock();
1463         for (temp = dentry; !IS_ROOT(temp);) {
1464                 struct inode *inode = temp->d_inode;
1465                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1466                         len++;  /* slash only */
1467                 else if (stop_on_nosnap && inode &&
1468                          ceph_snap(inode) == CEPH_NOSNAP)
1469                         break;
1470                 else
1471                         len += 1 + temp->d_name.len;
1472                 temp = temp->d_parent;
1473         }
1474         rcu_read_unlock();
1475         if (len)
1476                 len--;  /* no leading '/' */
1477
1478         path = kmalloc(len+1, GFP_NOFS);
1479         if (path == NULL)
1480                 return ERR_PTR(-ENOMEM);
1481         pos = len;
1482         path[pos] = 0;  /* trailing null */
1483         rcu_read_lock();
1484         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1485                 struct inode *inode;
1486
1487                 spin_lock(&temp->d_lock);
1488                 inode = temp->d_inode;
1489                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1490                         dout("build_path path+%d: %p SNAPDIR\n",
1491                              pos, temp);
1492                 } else if (stop_on_nosnap && inode &&
1493                            ceph_snap(inode) == CEPH_NOSNAP) {
1494                         spin_unlock(&temp->d_lock);
1495                         break;
1496                 } else {
1497                         pos -= temp->d_name.len;
1498                         if (pos < 0) {
1499                                 spin_unlock(&temp->d_lock);
1500                                 break;
1501                         }
1502                         strncpy(path + pos, temp->d_name.name,
1503                                 temp->d_name.len);
1504                 }
1505                 spin_unlock(&temp->d_lock);
1506                 if (pos)
1507                         path[--pos] = '/';
1508                 temp = temp->d_parent;
1509         }
1510         rcu_read_unlock();
1511         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1512                 pr_err("build_path did not end path lookup where "
1513                        "expected, namelen is %d, pos is %d\n", len, pos);
1514                 /* presumably this is only possible if racing with a
1515                    rename of one of the parent directories (we can not
1516                    lock the dentries above us to prevent this, but
1517                    retrying should be harmless) */
1518                 kfree(path);
1519                 goto retry;
1520         }
1521
1522         *base = ceph_ino(temp->d_inode);
1523         *plen = len;
1524         dout("build_path on %p %d built %llx '%.*s'\n",
1525              dentry, dentry->d_count, *base, len, path);
1526         return path;
1527 }
1528
1529 static int build_dentry_path(struct dentry *dentry,
1530                              const char **ppath, int *ppathlen, u64 *pino,
1531                              int *pfreepath)
1532 {
1533         char *path;
1534
1535         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1536                 *pino = ceph_ino(dentry->d_parent->d_inode);
1537                 *ppath = dentry->d_name.name;
1538                 *ppathlen = dentry->d_name.len;
1539                 return 0;
1540         }
1541         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1542         if (IS_ERR(path))
1543                 return PTR_ERR(path);
1544         *ppath = path;
1545         *pfreepath = 1;
1546         return 0;
1547 }
1548
1549 static int build_inode_path(struct inode *inode,
1550                             const char **ppath, int *ppathlen, u64 *pino,
1551                             int *pfreepath)
1552 {
1553         struct dentry *dentry;
1554         char *path;
1555
1556         if (ceph_snap(inode) == CEPH_NOSNAP) {
1557                 *pino = ceph_ino(inode);
1558                 *ppathlen = 0;
1559                 return 0;
1560         }
1561         dentry = d_find_alias(inode);
1562         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1563         dput(dentry);
1564         if (IS_ERR(path))
1565                 return PTR_ERR(path);
1566         *ppath = path;
1567         *pfreepath = 1;
1568         return 0;
1569 }
1570
1571 /*
1572  * request arguments may be specified via an inode *, a dentry *, or
1573  * an explicit ino+path.
1574  */
1575 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1576                                   const char *rpath, u64 rino,
1577                                   const char **ppath, int *pathlen,
1578                                   u64 *ino, int *freepath)
1579 {
1580         int r = 0;
1581
1582         if (rinode) {
1583                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1584                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1585                      ceph_snap(rinode));
1586         } else if (rdentry) {
1587                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1588                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1589                      *ppath);
1590         } else if (rpath || rino) {
1591                 *ino = rino;
1592                 *ppath = rpath;
1593                 *pathlen = rpath ? strlen(rpath) : 0;
1594                 dout(" path %.*s\n", *pathlen, rpath);
1595         }
1596
1597         return r;
1598 }
1599
1600 /*
1601  * called under mdsc->mutex
1602  */
1603 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1604                                                struct ceph_mds_request *req,
1605                                                int mds)
1606 {
1607         struct ceph_msg *msg;
1608         struct ceph_mds_request_head *head;
1609         const char *path1 = NULL;
1610         const char *path2 = NULL;
1611         u64 ino1 = 0, ino2 = 0;
1612         int pathlen1 = 0, pathlen2 = 0;
1613         int freepath1 = 0, freepath2 = 0;
1614         int len;
1615         u16 releases;
1616         void *p, *end;
1617         int ret;
1618
1619         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1620                               req->r_path1, req->r_ino1.ino,
1621                               &path1, &pathlen1, &ino1, &freepath1);
1622         if (ret < 0) {
1623                 msg = ERR_PTR(ret);
1624                 goto out;
1625         }
1626
1627         ret = set_request_path_attr(NULL, req->r_old_dentry,
1628                               req->r_path2, req->r_ino2.ino,
1629                               &path2, &pathlen2, &ino2, &freepath2);
1630         if (ret < 0) {
1631                 msg = ERR_PTR(ret);
1632                 goto out_free1;
1633         }
1634
1635         len = sizeof(*head) +
1636                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1637
1638         /* calculate (max) length for cap releases */
1639         len += sizeof(struct ceph_mds_request_release) *
1640                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1641                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1642         if (req->r_dentry_drop)
1643                 len += req->r_dentry->d_name.len;
1644         if (req->r_old_dentry_drop)
1645                 len += req->r_old_dentry->d_name.len;
1646
1647         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1648         if (!msg) {
1649                 msg = ERR_PTR(-ENOMEM);
1650                 goto out_free2;
1651         }
1652
1653         msg->hdr.tid = cpu_to_le64(req->r_tid);
1654
1655         head = msg->front.iov_base;
1656         p = msg->front.iov_base + sizeof(*head);
1657         end = msg->front.iov_base + msg->front.iov_len;
1658
1659         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1660         head->op = cpu_to_le32(req->r_op);
1661         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1662         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1663         head->args = req->r_args;
1664
1665         ceph_encode_filepath(&p, end, ino1, path1);
1666         ceph_encode_filepath(&p, end, ino2, path2);
1667
1668         /* make note of release offset, in case we need to replay */
1669         req->r_request_release_offset = p - msg->front.iov_base;
1670
1671         /* cap releases */
1672         releases = 0;
1673         if (req->r_inode_drop)
1674                 releases += ceph_encode_inode_release(&p,
1675                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1676                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1677         if (req->r_dentry_drop)
1678                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1679                        mds, req->r_dentry_drop, req->r_dentry_unless);
1680         if (req->r_old_dentry_drop)
1681                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1682                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1683         if (req->r_old_inode_drop)
1684                 releases += ceph_encode_inode_release(&p,
1685                       req->r_old_dentry->d_inode,
1686                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1687         head->num_releases = cpu_to_le16(releases);
1688
1689         BUG_ON(p > end);
1690         msg->front.iov_len = p - msg->front.iov_base;
1691         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1692
1693         msg->pages = req->r_pages;
1694         msg->nr_pages = req->r_num_pages;
1695         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1696         msg->hdr.data_off = cpu_to_le16(0);
1697
1698 out_free2:
1699         if (freepath2)
1700                 kfree((char *)path2);
1701 out_free1:
1702         if (freepath1)
1703                 kfree((char *)path1);
1704 out:
1705         return msg;
1706 }
1707
1708 /*
1709  * called under mdsc->mutex if error, under no mutex if
1710  * success.
1711  */
1712 static void complete_request(struct ceph_mds_client *mdsc,
1713                              struct ceph_mds_request *req)
1714 {
1715         if (req->r_callback)
1716                 req->r_callback(mdsc, req);
1717         else
1718                 complete_all(&req->r_completion);
1719 }
1720
1721 /*
1722  * called under mdsc->mutex
1723  */
1724 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1725                                   struct ceph_mds_request *req,
1726                                   int mds)
1727 {
1728         struct ceph_mds_request_head *rhead;
1729         struct ceph_msg *msg;
1730         int flags = 0;
1731
1732         req->r_attempts++;
1733         if (req->r_inode) {
1734                 struct ceph_cap *cap =
1735                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1736
1737                 if (cap)
1738                         req->r_sent_on_mseq = cap->mseq;
1739                 else
1740                         req->r_sent_on_mseq = -1;
1741         }
1742         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1743              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1744
1745         if (req->r_got_unsafe) {
1746                 /*
1747                  * Replay.  Do not regenerate message (and rebuild
1748                  * paths, etc.); just use the original message.
1749                  * Rebuilding paths will break for renames because
1750                  * d_move mangles the src name.
1751                  */
1752                 msg = req->r_request;
1753                 rhead = msg->front.iov_base;
1754
1755                 flags = le32_to_cpu(rhead->flags);
1756                 flags |= CEPH_MDS_FLAG_REPLAY;
1757                 rhead->flags = cpu_to_le32(flags);
1758
1759                 if (req->r_target_inode)
1760                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1761
1762                 rhead->num_retry = req->r_attempts - 1;
1763
1764                 /* remove cap/dentry releases from message */
1765                 rhead->num_releases = 0;
1766                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1767                 msg->front.iov_len = req->r_request_release_offset;
1768                 return 0;
1769         }
1770
1771         if (req->r_request) {
1772                 ceph_msg_put(req->r_request);
1773                 req->r_request = NULL;
1774         }
1775         msg = create_request_message(mdsc, req, mds);
1776         if (IS_ERR(msg)) {
1777                 req->r_err = PTR_ERR(msg);
1778                 complete_request(mdsc, req);
1779                 return PTR_ERR(msg);
1780         }
1781         req->r_request = msg;
1782
1783         rhead = msg->front.iov_base;
1784         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1785         if (req->r_got_unsafe)
1786                 flags |= CEPH_MDS_FLAG_REPLAY;
1787         if (req->r_locked_dir)
1788                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1789         rhead->flags = cpu_to_le32(flags);
1790         rhead->num_fwd = req->r_num_fwd;
1791         rhead->num_retry = req->r_attempts - 1;
1792         rhead->ino = 0;
1793
1794         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1795         return 0;
1796 }
1797
1798 /*
1799  * send request, or put it on the appropriate wait list.
1800  */
1801 static int __do_request(struct ceph_mds_client *mdsc,
1802                         struct ceph_mds_request *req)
1803 {
1804         struct ceph_mds_session *session = NULL;
1805         int mds = -1;
1806         int err = -EAGAIN;
1807
1808         if (req->r_err || req->r_got_result)
1809                 goto out;
1810
1811         if (req->r_timeout &&
1812             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1813                 dout("do_request timed out\n");
1814                 err = -EIO;
1815                 goto finish;
1816         }
1817
1818         put_request_session(req);
1819
1820         mds = __choose_mds(mdsc, req);
1821         if (mds < 0 ||
1822             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1823                 dout("do_request no mds or not active, waiting for map\n");
1824                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1825                 goto out;
1826         }
1827
1828         /* get, open session */
1829         session = __ceph_lookup_mds_session(mdsc, mds);
1830         if (!session) {
1831                 session = register_session(mdsc, mds);
1832                 if (IS_ERR(session)) {
1833                         err = PTR_ERR(session);
1834                         goto finish;
1835                 }
1836         }
1837         req->r_session = get_session(session);
1838
1839         dout("do_request mds%d session %p state %s\n", mds, session,
1840              session_state_name(session->s_state));
1841         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1842             session->s_state != CEPH_MDS_SESSION_HUNG) {
1843                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1844                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1845                         __open_session(mdsc, session);
1846                 list_add(&req->r_wait, &session->s_waiting);
1847                 goto out_session;
1848         }
1849
1850         /* send request */
1851         req->r_resend_mds = -1;   /* forget any previous mds hint */
1852
1853         if (req->r_request_started == 0)   /* note request start time */
1854                 req->r_request_started = jiffies;
1855
1856         err = __prepare_send_request(mdsc, req, mds);
1857         if (!err) {
1858                 ceph_msg_get(req->r_request);
1859                 ceph_con_send(&session->s_con, req->r_request);
1860         }
1861
1862 out_session:
1863         ceph_put_mds_session(session);
1864 out:
1865         return err;
1866
1867 finish:
1868         req->r_err = err;
1869         complete_request(mdsc, req);
1870         goto out;
1871 }
1872
1873 /*
1874  * called under mdsc->mutex
1875  */
1876 static void __wake_requests(struct ceph_mds_client *mdsc,
1877                             struct list_head *head)
1878 {
1879         struct ceph_mds_request *req;
1880         LIST_HEAD(tmp_list);
1881
1882         list_splice_init(head, &tmp_list);
1883
1884         while (!list_empty(&tmp_list)) {
1885                 req = list_entry(tmp_list.next,
1886                                  struct ceph_mds_request, r_wait);
1887                 list_del_init(&req->r_wait);
1888                 __do_request(mdsc, req);
1889         }
1890 }
1891
1892 /*
1893  * Wake up threads with requests pending for @mds, so that they can
1894  * resubmit their requests to a possibly different mds.
1895  */
1896 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1897 {
1898         struct ceph_mds_request *req;
1899         struct rb_node *p;
1900
1901         dout("kick_requests mds%d\n", mds);
1902         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1903                 req = rb_entry(p, struct ceph_mds_request, r_node);
1904                 if (req->r_got_unsafe)
1905                         continue;
1906                 if (req->r_session &&
1907                     req->r_session->s_mds == mds) {
1908                         dout(" kicking tid %llu\n", req->r_tid);
1909                         __do_request(mdsc, req);
1910                 }
1911         }
1912 }
1913
1914 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1915                               struct ceph_mds_request *req)
1916 {
1917         dout("submit_request on %p\n", req);
1918         mutex_lock(&mdsc->mutex);
1919         __register_request(mdsc, req, NULL);
1920         __do_request(mdsc, req);
1921         mutex_unlock(&mdsc->mutex);
1922 }
1923
1924 /*
1925  * Synchrously perform an mds request.  Take care of all of the
1926  * session setup, forwarding, retry details.
1927  */
1928 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1929                          struct inode *dir,
1930                          struct ceph_mds_request *req)
1931 {
1932         int err;
1933
1934         dout("do_request on %p\n", req);
1935
1936         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1937         if (req->r_inode)
1938                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1939         if (req->r_locked_dir)
1940                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1941         if (req->r_old_dentry)
1942                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1943                                   CEPH_CAP_PIN);
1944
1945         /* issue */
1946         mutex_lock(&mdsc->mutex);
1947         __register_request(mdsc, req, dir);
1948         __do_request(mdsc, req);
1949
1950         if (req->r_err) {
1951                 err = req->r_err;
1952                 __unregister_request(mdsc, req);
1953                 dout("do_request early error %d\n", err);
1954                 goto out;
1955         }
1956
1957         /* wait */
1958         mutex_unlock(&mdsc->mutex);
1959         dout("do_request waiting\n");
1960         if (req->r_timeout) {
1961                 err = (long)wait_for_completion_killable_timeout(
1962                         &req->r_completion, req->r_timeout);
1963                 if (err == 0)
1964                         err = -EIO;
1965         } else {
1966                 err = wait_for_completion_killable(&req->r_completion);
1967         }
1968         dout("do_request waited, got %d\n", err);
1969         mutex_lock(&mdsc->mutex);
1970
1971         /* only abort if we didn't race with a real reply */
1972         if (req->r_got_result) {
1973                 err = le32_to_cpu(req->r_reply_info.head->result);
1974         } else if (err < 0) {
1975                 dout("aborted request %lld with %d\n", req->r_tid, err);
1976
1977                 /*
1978                  * ensure we aren't running concurrently with
1979                  * ceph_fill_trace or ceph_readdir_prepopulate, which
1980                  * rely on locks (dir mutex) held by our caller.
1981                  */
1982                 mutex_lock(&req->r_fill_mutex);
1983                 req->r_err = err;
1984                 req->r_aborted = true;
1985                 mutex_unlock(&req->r_fill_mutex);
1986
1987                 if (req->r_locked_dir &&
1988                     (req->r_op & CEPH_MDS_OP_WRITE))
1989                         ceph_invalidate_dir_request(req);
1990         } else {
1991                 err = req->r_err;
1992         }
1993
1994 out:
1995         mutex_unlock(&mdsc->mutex);
1996         dout("do_request %p done, result %d\n", req, err);
1997         return err;
1998 }
1999
2000 /*
2001  * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
2002  * namespace request.
2003  */
2004 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2005 {
2006         struct inode *inode = req->r_locked_dir;
2007         struct ceph_inode_info *ci = ceph_inode(inode);
2008
2009         dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
2010         spin_lock(&ci->i_ceph_lock);
2011         ceph_dir_clear_complete(inode);
2012         ci->i_release_count++;
2013         spin_unlock(&ci->i_ceph_lock);
2014
2015         if (req->r_dentry)
2016                 ceph_invalidate_dentry_lease(req->r_dentry);
2017         if (req->r_old_dentry)
2018                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2019 }
2020
2021 /*
2022  * Handle mds reply.
2023  *
2024  * We take the session mutex and parse and process the reply immediately.
2025  * This preserves the logical ordering of replies, capabilities, etc., sent
2026  * by the MDS as they are applied to our local cache.
2027  */
2028 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2029 {
2030         struct ceph_mds_client *mdsc = session->s_mdsc;
2031         struct ceph_mds_request *req;
2032         struct ceph_mds_reply_head *head = msg->front.iov_base;
2033         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2034         u64 tid;
2035         int err, result;
2036         int mds = session->s_mds;
2037
2038         if (msg->front.iov_len < sizeof(*head)) {
2039                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2040                 ceph_msg_dump(msg);
2041                 return;
2042         }
2043
2044         /* get request, session */
2045         tid = le64_to_cpu(msg->hdr.tid);
2046         mutex_lock(&mdsc->mutex);
2047         req = __lookup_request(mdsc, tid);
2048         if (!req) {
2049                 dout("handle_reply on unknown tid %llu\n", tid);
2050                 mutex_unlock(&mdsc->mutex);
2051                 return;
2052         }
2053         dout("handle_reply %p\n", req);
2054
2055         /* correct session? */
2056         if (req->r_session != session) {
2057                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2058                        " not mds%d\n", tid, session->s_mds,
2059                        req->r_session ? req->r_session->s_mds : -1);
2060                 mutex_unlock(&mdsc->mutex);
2061                 goto out;
2062         }
2063
2064         /* dup? */
2065         if ((req->r_got_unsafe && !head->safe) ||
2066             (req->r_got_safe && head->safe)) {
2067                 pr_warning("got a dup %s reply on %llu from mds%d\n",
2068                            head->safe ? "safe" : "unsafe", tid, mds);
2069                 mutex_unlock(&mdsc->mutex);
2070                 goto out;
2071         }
2072         if (req->r_got_safe && !head->safe) {
2073                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2074                            tid, mds);
2075                 mutex_unlock(&mdsc->mutex);
2076                 goto out;
2077         }
2078
2079         result = le32_to_cpu(head->result);
2080
2081         /*
2082          * Handle an ESTALE
2083          * if we're not talking to the authority, send to them
2084          * if the authority has changed while we weren't looking,
2085          * send to new authority
2086          * Otherwise we just have to return an ESTALE
2087          */
2088         if (result == -ESTALE) {
2089                 dout("got ESTALE on request %llu", req->r_tid);
2090                 if (!req->r_inode) {
2091                         /* do nothing; not an authority problem */
2092                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2093                         dout("not using auth, setting for that now");
2094                         req->r_direct_mode = USE_AUTH_MDS;
2095                         __do_request(mdsc, req);
2096                         mutex_unlock(&mdsc->mutex);
2097                         goto out;
2098                 } else  {
2099                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2100                         struct ceph_cap *cap = NULL;
2101
2102                         if (req->r_session)
2103                                 cap = ceph_get_cap_for_mds(ci,
2104                                                    req->r_session->s_mds);
2105
2106                         dout("already using auth");
2107                         if ((!cap || cap != ci->i_auth_cap) ||
2108                             (cap->mseq != req->r_sent_on_mseq)) {
2109                                 dout("but cap changed, so resending");
2110                                 __do_request(mdsc, req);
2111                                 mutex_unlock(&mdsc->mutex);
2112                                 goto out;
2113                         }
2114                 }
2115                 dout("have to return ESTALE on request %llu", req->r_tid);
2116         }
2117
2118
2119         if (head->safe) {
2120                 req->r_got_safe = true;
2121                 __unregister_request(mdsc, req);
2122                 complete_all(&req->r_safe_completion);
2123
2124                 if (req->r_got_unsafe) {
2125                         /*
2126                          * We already handled the unsafe response, now do the
2127                          * cleanup.  No need to examine the response; the MDS
2128                          * doesn't include any result info in the safe
2129                          * response.  And even if it did, there is nothing
2130                          * useful we could do with a revised return value.
2131                          */
2132                         dout("got safe reply %llu, mds%d\n", tid, mds);
2133                         list_del_init(&req->r_unsafe_item);
2134
2135                         /* last unsafe request during umount? */
2136                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2137                                 complete_all(&mdsc->safe_umount_waiters);
2138                         mutex_unlock(&mdsc->mutex);
2139                         goto out;
2140                 }
2141         } else {
2142                 req->r_got_unsafe = true;
2143                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2144         }
2145
2146         dout("handle_reply tid %lld result %d\n", tid, result);
2147         rinfo = &req->r_reply_info;
2148         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2149         mutex_unlock(&mdsc->mutex);
2150
2151         mutex_lock(&session->s_mutex);
2152         if (err < 0) {
2153                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2154                 ceph_msg_dump(msg);
2155                 goto out_err;
2156         }
2157
2158         /* snap trace */
2159         if (rinfo->snapblob_len) {
2160                 down_write(&mdsc->snap_rwsem);
2161                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2162                                rinfo->snapblob + rinfo->snapblob_len,
2163                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2164                 downgrade_write(&mdsc->snap_rwsem);
2165         } else {
2166                 down_read(&mdsc->snap_rwsem);
2167         }
2168
2169         /* insert trace into our cache */
2170         mutex_lock(&req->r_fill_mutex);
2171         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2172         if (err == 0) {
2173                 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2174                     rinfo->dir_nr)
2175                         ceph_readdir_prepopulate(req, req->r_session);
2176                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2177         }
2178         mutex_unlock(&req->r_fill_mutex);
2179
2180         up_read(&mdsc->snap_rwsem);
2181 out_err:
2182         mutex_lock(&mdsc->mutex);
2183         if (!req->r_aborted) {
2184                 if (err) {
2185                         req->r_err = err;
2186                 } else {
2187                         req->r_reply = msg;
2188                         ceph_msg_get(msg);
2189                         req->r_got_result = true;
2190                 }
2191         } else {
2192                 dout("reply arrived after request %lld was aborted\n", tid);
2193         }
2194         mutex_unlock(&mdsc->mutex);
2195
2196         ceph_add_cap_releases(mdsc, req->r_session);
2197         mutex_unlock(&session->s_mutex);
2198
2199         /* kick calling process */
2200         complete_request(mdsc, req);
2201 out:
2202         ceph_mdsc_put_request(req);
2203         return;
2204 }
2205
2206
2207
2208 /*
2209  * handle mds notification that our request has been forwarded.
2210  */
2211 static void handle_forward(struct ceph_mds_client *mdsc,
2212                            struct ceph_mds_session *session,
2213                            struct ceph_msg *msg)
2214 {
2215         struct ceph_mds_request *req;
2216         u64 tid = le64_to_cpu(msg->hdr.tid);
2217         u32 next_mds;
2218         u32 fwd_seq;
2219         int err = -EINVAL;
2220         void *p = msg->front.iov_base;
2221         void *end = p + msg->front.iov_len;
2222
2223         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2224         next_mds = ceph_decode_32(&p);
2225         fwd_seq = ceph_decode_32(&p);
2226
2227         mutex_lock(&mdsc->mutex);
2228         req = __lookup_request(mdsc, tid);
2229         if (!req) {
2230                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2231                 goto out;  /* dup reply? */
2232         }
2233
2234         if (req->r_aborted) {
2235                 dout("forward tid %llu aborted, unregistering\n", tid);
2236                 __unregister_request(mdsc, req);
2237         } else if (fwd_seq <= req->r_num_fwd) {
2238                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2239                      tid, next_mds, req->r_num_fwd, fwd_seq);
2240         } else {
2241                 /* resend. forward race not possible; mds would drop */
2242                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2243                 BUG_ON(req->r_err);
2244                 BUG_ON(req->r_got_result);
2245                 req->r_num_fwd = fwd_seq;
2246                 req->r_resend_mds = next_mds;
2247                 put_request_session(req);
2248                 __do_request(mdsc, req);
2249         }
2250         ceph_mdsc_put_request(req);
2251 out:
2252         mutex_unlock(&mdsc->mutex);
2253         return;
2254
2255 bad:
2256         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2257 }
2258
2259 /*
2260  * handle a mds session control message
2261  */
2262 static void handle_session(struct ceph_mds_session *session,
2263                            struct ceph_msg *msg)
2264 {
2265         struct ceph_mds_client *mdsc = session->s_mdsc;
2266         u32 op;
2267         u64 seq;
2268         int mds = session->s_mds;
2269         struct ceph_mds_session_head *h = msg->front.iov_base;
2270         int wake = 0;
2271
2272         /* decode */
2273         if (msg->front.iov_len != sizeof(*h))
2274                 goto bad;
2275         op = le32_to_cpu(h->op);
2276         seq = le64_to_cpu(h->seq);
2277
2278         mutex_lock(&mdsc->mutex);
2279         if (op == CEPH_SESSION_CLOSE)
2280                 __unregister_session(mdsc, session);
2281         /* FIXME: this ttl calculation is generous */
2282         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2283         mutex_unlock(&mdsc->mutex);
2284
2285         mutex_lock(&session->s_mutex);
2286
2287         dout("handle_session mds%d %s %p state %s seq %llu\n",
2288              mds, ceph_session_op_name(op), session,
2289              session_state_name(session->s_state), seq);
2290
2291         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2292                 session->s_state = CEPH_MDS_SESSION_OPEN;
2293                 pr_info("mds%d came back\n", session->s_mds);
2294         }
2295
2296         switch (op) {
2297         case CEPH_SESSION_OPEN:
2298                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2299                         pr_info("mds%d reconnect success\n", session->s_mds);
2300                 session->s_state = CEPH_MDS_SESSION_OPEN;
2301                 renewed_caps(mdsc, session, 0);
2302                 wake = 1;
2303                 if (mdsc->stopping)
2304                         __close_session(mdsc, session);
2305                 break;
2306
2307         case CEPH_SESSION_RENEWCAPS:
2308                 if (session->s_renew_seq == seq)
2309                         renewed_caps(mdsc, session, 1);
2310                 break;
2311
2312         case CEPH_SESSION_CLOSE:
2313                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2314                         pr_info("mds%d reconnect denied\n", session->s_mds);
2315                 remove_session_caps(session);
2316                 wake = 1; /* for good measure */
2317                 wake_up_all(&mdsc->session_close_wq);
2318                 kick_requests(mdsc, mds);
2319                 break;
2320
2321         case CEPH_SESSION_STALE:
2322                 pr_info("mds%d caps went stale, renewing\n",
2323                         session->s_mds);
2324                 spin_lock(&session->s_gen_ttl_lock);
2325                 session->s_cap_gen++;
2326                 session->s_cap_ttl = jiffies - 1;
2327                 spin_unlock(&session->s_gen_ttl_lock);
2328                 send_renew_caps(mdsc, session);
2329                 break;
2330
2331         case CEPH_SESSION_RECALL_STATE:
2332                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2333                 break;
2334
2335         default:
2336                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2337                 WARN_ON(1);
2338         }
2339
2340         mutex_unlock(&session->s_mutex);
2341         if (wake) {
2342                 mutex_lock(&mdsc->mutex);
2343                 __wake_requests(mdsc, &session->s_waiting);
2344                 mutex_unlock(&mdsc->mutex);
2345         }
2346         return;
2347
2348 bad:
2349         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2350                (int)msg->front.iov_len);
2351         ceph_msg_dump(msg);
2352         return;
2353 }
2354
2355
2356 /*
2357  * called under session->mutex.
2358  */
2359 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2360                                    struct ceph_mds_session *session)
2361 {
2362         struct ceph_mds_request *req, *nreq;
2363         int err;
2364
2365         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2366
2367         mutex_lock(&mdsc->mutex);
2368         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2369                 err = __prepare_send_request(mdsc, req, session->s_mds);
2370                 if (!err) {
2371                         ceph_msg_get(req->r_request);
2372                         ceph_con_send(&session->s_con, req->r_request);
2373                 }
2374         }
2375         mutex_unlock(&mdsc->mutex);
2376 }
2377
2378 /*
2379  * Encode information about a cap for a reconnect with the MDS.
2380  */
2381 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2382                           void *arg)
2383 {
2384         union {
2385                 struct ceph_mds_cap_reconnect v2;
2386                 struct ceph_mds_cap_reconnect_v1 v1;
2387         } rec;
2388         size_t reclen;
2389         struct ceph_inode_info *ci;
2390         struct ceph_reconnect_state *recon_state = arg;
2391         struct ceph_pagelist *pagelist = recon_state->pagelist;
2392         char *path;
2393         int pathlen, err;
2394         u64 pathbase;
2395         struct dentry *dentry;
2396
2397         ci = cap->ci;
2398
2399         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2400              inode, ceph_vinop(inode), cap, cap->cap_id,
2401              ceph_cap_string(cap->issued));
2402         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2403         if (err)
2404                 return err;
2405
2406         dentry = d_find_alias(inode);
2407         if (dentry) {
2408                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2409                 if (IS_ERR(path)) {
2410                         err = PTR_ERR(path);
2411                         goto out_dput;
2412                 }
2413         } else {
2414                 path = NULL;
2415                 pathlen = 0;
2416         }
2417         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2418         if (err)
2419                 goto out_free;
2420
2421         spin_lock(&ci->i_ceph_lock);
2422         cap->seq = 0;        /* reset cap seq */
2423         cap->issue_seq = 0;  /* and issue_seq */
2424
2425         if (recon_state->flock) {
2426                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2427                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2428                 rec.v2.issued = cpu_to_le32(cap->issued);
2429                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2430                 rec.v2.pathbase = cpu_to_le64(pathbase);
2431                 rec.v2.flock_len = 0;
2432                 reclen = sizeof(rec.v2);
2433         } else {
2434                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2435                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2436                 rec.v1.issued = cpu_to_le32(cap->issued);
2437                 rec.v1.size = cpu_to_le64(inode->i_size);
2438                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2439                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2440                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2441                 rec.v1.pathbase = cpu_to_le64(pathbase);
2442                 reclen = sizeof(rec.v1);
2443         }
2444         spin_unlock(&ci->i_ceph_lock);
2445
2446         if (recon_state->flock) {
2447                 int num_fcntl_locks, num_flock_locks;
2448                 struct ceph_pagelist_cursor trunc_point;
2449
2450                 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2451                 do {
2452                         lock_flocks();
2453                         ceph_count_locks(inode, &num_fcntl_locks,
2454                                          &num_flock_locks);
2455                         rec.v2.flock_len = (2*sizeof(u32) +
2456                                             (num_fcntl_locks+num_flock_locks) *
2457                                             sizeof(struct ceph_filelock));
2458                         unlock_flocks();
2459
2460                         /* pre-alloc pagelist */
2461                         ceph_pagelist_truncate(pagelist, &trunc_point);
2462                         err = ceph_pagelist_append(pagelist, &rec, reclen);
2463                         if (!err)
2464                                 err = ceph_pagelist_reserve(pagelist,
2465                                                             rec.v2.flock_len);
2466
2467                         /* encode locks */
2468                         if (!err) {
2469                                 lock_flocks();
2470                                 err = ceph_encode_locks(inode,
2471                                                         pagelist,
2472                                                         num_fcntl_locks,
2473                                                         num_flock_locks);
2474                                 unlock_flocks();
2475                         }
2476                 } while (err == -ENOSPC);
2477         } else {
2478                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2479         }
2480
2481 out_free:
2482         kfree(path);
2483 out_dput:
2484         dput(dentry);
2485         return err;
2486 }
2487
2488
2489 /*
2490  * If an MDS fails and recovers, clients need to reconnect in order to
2491  * reestablish shared state.  This includes all caps issued through
2492  * this session _and_ the snap_realm hierarchy.  Because it's not
2493  * clear which snap realms the mds cares about, we send everything we
2494  * know about.. that ensures we'll then get any new info the
2495  * recovering MDS might have.
2496  *
2497  * This is a relatively heavyweight operation, but it's rare.
2498  *
2499  * called with mdsc->mutex held.
2500  */
2501 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2502                                struct ceph_mds_session *session)
2503 {
2504         struct ceph_msg *reply;
2505         struct rb_node *p;
2506         int mds = session->s_mds;
2507         int err = -ENOMEM;
2508         struct ceph_pagelist *pagelist;
2509         struct ceph_reconnect_state recon_state;
2510
2511         pr_info("mds%d reconnect start\n", mds);
2512
2513         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2514         if (!pagelist)
2515                 goto fail_nopagelist;
2516         ceph_pagelist_init(pagelist);
2517
2518         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2519         if (!reply)
2520                 goto fail_nomsg;
2521
2522         mutex_lock(&session->s_mutex);
2523         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2524         session->s_seq = 0;
2525
2526         ceph_con_close(&session->s_con);
2527         ceph_con_open(&session->s_con,
2528                       CEPH_ENTITY_TYPE_MDS, mds,
2529                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2530
2531         /* replay unsafe requests */
2532         replay_unsafe_requests(mdsc, session);
2533
2534         down_read(&mdsc->snap_rwsem);
2535
2536         dout("session %p state %s\n", session,
2537              session_state_name(session->s_state));
2538
2539         /* drop old cap expires; we're about to reestablish that state */
2540         discard_cap_releases(mdsc, session);
2541
2542         /* traverse this session's caps */
2543         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2544         if (err)
2545                 goto fail;
2546
2547         recon_state.pagelist = pagelist;
2548         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2549         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2550         if (err < 0)
2551                 goto fail;
2552
2553         /*
2554          * snaprealms.  we provide mds with the ino, seq (version), and
2555          * parent for all of our realms.  If the mds has any newer info,
2556          * it will tell us.
2557          */
2558         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2559                 struct ceph_snap_realm *realm =
2560                         rb_entry(p, struct ceph_snap_realm, node);
2561                 struct ceph_mds_snaprealm_reconnect sr_rec;
2562
2563                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2564                      realm->ino, realm->seq, realm->parent_ino);
2565                 sr_rec.ino = cpu_to_le64(realm->ino);
2566                 sr_rec.seq = cpu_to_le64(realm->seq);
2567                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2568                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2569                 if (err)
2570                         goto fail;
2571         }
2572
2573         reply->pagelist = pagelist;
2574         if (recon_state.flock)
2575                 reply->hdr.version = cpu_to_le16(2);
2576         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2577         reply->nr_pages = calc_pages_for(0, pagelist->length);
2578         ceph_con_send(&session->s_con, reply);
2579
2580         mutex_unlock(&session->s_mutex);
2581
2582         mutex_lock(&mdsc->mutex);
2583         __wake_requests(mdsc, &session->s_waiting);
2584         mutex_unlock(&mdsc->mutex);
2585
2586         up_read(&mdsc->snap_rwsem);
2587         return;
2588
2589 fail:
2590         ceph_msg_put(reply);
2591         up_read(&mdsc->snap_rwsem);
2592         mutex_unlock(&session->s_mutex);
2593 fail_nomsg:
2594         ceph_pagelist_release(pagelist);
2595         kfree(pagelist);
2596 fail_nopagelist:
2597         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2598         return;
2599 }
2600
2601
2602 /*
2603  * compare old and new mdsmaps, kicking requests
2604  * and closing out old connections as necessary
2605  *
2606  * called under mdsc->mutex.
2607  */
2608 static void check_new_map(struct ceph_mds_client *mdsc,
2609                           struct ceph_mdsmap *newmap,
2610                           struct ceph_mdsmap *oldmap)
2611 {
2612         int i;
2613         int oldstate, newstate;
2614         struct ceph_mds_session *s;
2615
2616         dout("check_new_map new %u old %u\n",
2617              newmap->m_epoch, oldmap->m_epoch);
2618
2619         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2620                 if (mdsc->sessions[i] == NULL)
2621                         continue;
2622                 s = mdsc->sessions[i];
2623                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2624                 newstate = ceph_mdsmap_get_state(newmap, i);
2625
2626                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2627                      i, ceph_mds_state_name(oldstate),
2628                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2629                      ceph_mds_state_name(newstate),
2630                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2631                      session_state_name(s->s_state));
2632
2633                 if (i >= newmap->m_max_mds ||
2634                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
2635                            ceph_mdsmap_get_addr(newmap, i),
2636                            sizeof(struct ceph_entity_addr))) {
2637                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2638                                 /* the session never opened, just close it
2639                                  * out now */
2640                                 __wake_requests(mdsc, &s->s_waiting);
2641                                 __unregister_session(mdsc, s);
2642                         } else {
2643                                 /* just close it */
2644                                 mutex_unlock(&mdsc->mutex);
2645                                 mutex_lock(&s->s_mutex);
2646                                 mutex_lock(&mdsc->mutex);
2647                                 ceph_con_close(&s->s_con);
2648                                 mutex_unlock(&s->s_mutex);
2649                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2650                         }
2651
2652                         /* kick any requests waiting on the recovering mds */
2653                         kick_requests(mdsc, i);
2654                 } else if (oldstate == newstate) {
2655                         continue;  /* nothing new with this mds */
2656                 }
2657
2658                 /*
2659                  * send reconnect?
2660                  */
2661                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2662                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2663                         mutex_unlock(&mdsc->mutex);
2664                         send_mds_reconnect(mdsc, s);
2665                         mutex_lock(&mdsc->mutex);
2666                 }
2667
2668                 /*
2669                  * kick request on any mds that has gone active.
2670                  */
2671                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2672                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2673                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2674                             oldstate != CEPH_MDS_STATE_STARTING)
2675                                 pr_info("mds%d recovery completed\n", s->s_mds);
2676                         kick_requests(mdsc, i);
2677                         ceph_kick_flushing_caps(mdsc, s);
2678                         wake_up_session_caps(s, 1);
2679                 }
2680         }
2681
2682         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2683                 s = mdsc->sessions[i];
2684                 if (!s)
2685                         continue;
2686                 if (!ceph_mdsmap_is_laggy(newmap, i))
2687                         continue;
2688                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2689                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2690                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2691                         dout(" connecting to export targets of laggy mds%d\n",
2692                              i);
2693                         __open_export_target_sessions(mdsc, s);
2694                 }
2695         }
2696 }
2697
2698
2699
2700 /*
2701  * leases
2702  */
2703
2704 /*
2705  * caller must hold session s_mutex, dentry->d_lock
2706  */
2707 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2708 {
2709         struct ceph_dentry_info *di = ceph_dentry(dentry);
2710
2711         ceph_put_mds_session(di->lease_session);
2712         di->lease_session = NULL;
2713 }
2714
2715 static void handle_lease(struct ceph_mds_client *mdsc,
2716                          struct ceph_mds_session *session,
2717                          struct ceph_msg *msg)
2718 {
2719         struct super_block *sb = mdsc->fsc->sb;
2720         struct inode *inode;
2721         struct dentry *parent, *dentry;
2722         struct ceph_dentry_info *di;
2723         int mds = session->s_mds;
2724         struct ceph_mds_lease *h = msg->front.iov_base;
2725         u32 seq;
2726         struct ceph_vino vino;
2727         struct qstr dname;
2728         int release = 0;
2729
2730         dout("handle_lease from mds%d\n", mds);
2731
2732         /* decode */
2733         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2734                 goto bad;
2735         vino.ino = le64_to_cpu(h->ino);
2736         vino.snap = CEPH_NOSNAP;
2737         seq = le32_to_cpu(h->seq);
2738         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2739         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2740         if (dname.len != get_unaligned_le32(h+1))
2741                 goto bad;
2742
2743         mutex_lock(&session->s_mutex);
2744         session->s_seq++;
2745
2746         /* lookup inode */
2747         inode = ceph_find_inode(sb, vino);
2748         dout("handle_lease %s, ino %llx %p %.*s\n",
2749              ceph_lease_op_name(h->action), vino.ino, inode,
2750              dname.len, dname.name);
2751         if (inode == NULL) {
2752                 dout("handle_lease no inode %llx\n", vino.ino);
2753                 goto release;
2754         }
2755
2756         /* dentry */
2757         parent = d_find_alias(inode);
2758         if (!parent) {
2759                 dout("no parent dentry on inode %p\n", inode);
2760                 WARN_ON(1);
2761                 goto release;  /* hrm... */
2762         }
2763         dname.hash = full_name_hash(dname.name, dname.len);
2764         dentry = d_lookup(parent, &dname);
2765         dput(parent);
2766         if (!dentry)
2767                 goto release;
2768
2769         spin_lock(&dentry->d_lock);
2770         di = ceph_dentry(dentry);
2771         switch (h->action) {
2772         case CEPH_MDS_LEASE_REVOKE:
2773                 if (di->lease_session == session) {
2774                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2775                                 h->seq = cpu_to_le32(di->lease_seq);
2776                         __ceph_mdsc_drop_dentry_lease(dentry);
2777                 }
2778                 release = 1;
2779                 break;
2780
2781         case CEPH_MDS_LEASE_RENEW:
2782                 if (di->lease_session == session &&
2783                     di->lease_gen == session->s_cap_gen &&
2784                     di->lease_renew_from &&
2785                     di->lease_renew_after == 0) {
2786                         unsigned long duration =
2787                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2788
2789                         di->lease_seq = seq;
2790                         dentry->d_time = di->lease_renew_from + duration;
2791                         di->lease_renew_after = di->lease_renew_from +
2792                                 (duration >> 1);
2793                         di->lease_renew_from = 0;
2794                 }
2795                 break;
2796         }
2797         spin_unlock(&dentry->d_lock);
2798         dput(dentry);
2799
2800         if (!release)
2801                 goto out;
2802
2803 release:
2804         /* let's just reuse the same message */
2805         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2806         ceph_msg_get(msg);
2807         ceph_con_send(&session->s_con, msg);
2808
2809 out:
2810         iput(inode);
2811         mutex_unlock(&session->s_mutex);
2812         return;
2813
2814 bad:
2815         pr_err("corrupt lease message\n");
2816         ceph_msg_dump(msg);
2817 }
2818
2819 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2820                               struct inode *inode,
2821                               struct dentry *dentry, char action,
2822                               u32 seq)
2823 {
2824         struct ceph_msg *msg;
2825         struct ceph_mds_lease *lease;
2826         int len = sizeof(*lease) + sizeof(u32);
2827         int dnamelen = 0;
2828
2829         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2830              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2831         dnamelen = dentry->d_name.len;
2832         len += dnamelen;
2833
2834         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2835         if (!msg)
2836                 return;
2837         lease = msg->front.iov_base;
2838         lease->action = action;
2839         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2840         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2841         lease->seq = cpu_to_le32(seq);
2842         put_unaligned_le32(dnamelen, lease + 1);
2843         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2844
2845         /*
2846          * if this is a preemptive lease RELEASE, no need to
2847          * flush request stream, since the actual request will
2848          * soon follow.
2849          */
2850         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2851
2852         ceph_con_send(&session->s_con, msg);
2853 }
2854
2855 /*
2856  * Preemptively release a lease we expect to invalidate anyway.
2857  * Pass @inode always, @dentry is optional.
2858  */
2859 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2860                              struct dentry *dentry)
2861 {
2862         struct ceph_dentry_info *di;
2863         struct ceph_mds_session *session;
2864         u32 seq;
2865
2866         BUG_ON(inode == NULL);
2867         BUG_ON(dentry == NULL);
2868
2869         /* is dentry lease valid? */
2870         spin_lock(&dentry->d_lock);
2871         di = ceph_dentry(dentry);
2872         if (!di || !di->lease_session ||
2873             di->lease_session->s_mds < 0 ||
2874             di->lease_gen != di->lease_session->s_cap_gen ||
2875             !time_before(jiffies, dentry->d_time)) {
2876                 dout("lease_release inode %p dentry %p -- "
2877                      "no lease\n",
2878                      inode, dentry);
2879                 spin_unlock(&dentry->d_lock);
2880                 return;
2881         }
2882
2883         /* we do have a lease on this dentry; note mds and seq */
2884         session = ceph_get_mds_session(di->lease_session);
2885         seq = di->lease_seq;
2886         __ceph_mdsc_drop_dentry_lease(dentry);
2887         spin_unlock(&dentry->d_lock);
2888
2889         dout("lease_release inode %p dentry %p to mds%d\n",
2890              inode, dentry, session->s_mds);
2891         ceph_mdsc_lease_send_msg(session, inode, dentry,
2892                                  CEPH_MDS_LEASE_RELEASE, seq);
2893         ceph_put_mds_session(session);
2894 }
2895
2896 /*
2897  * drop all leases (and dentry refs) in preparation for umount
2898  */
2899 static void drop_leases(struct ceph_mds_client *mdsc)
2900 {
2901         int i;
2902
2903         dout("drop_leases\n");
2904         mutex_lock(&mdsc->mutex);
2905         for (i = 0; i < mdsc->max_sessions; i++) {
2906                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2907                 if (!s)
2908                         continue;
2909                 mutex_unlock(&mdsc->mutex);
2910                 mutex_lock(&s->s_mutex);
2911                 mutex_unlock(&s->s_mutex);
2912                 ceph_put_mds_session(s);
2913                 mutex_lock(&mdsc->mutex);
2914         }
2915         mutex_unlock(&mdsc->mutex);
2916 }
2917
2918
2919
2920 /*
2921  * delayed work -- periodically trim expired leases, renew caps with mds
2922  */
2923 static void schedule_delayed(struct ceph_mds_client *mdsc)
2924 {
2925         int delay = 5;
2926         unsigned hz = round_jiffies_relative(HZ * delay);
2927         schedule_delayed_work(&mdsc->delayed_work, hz);
2928 }
2929
2930 static void delayed_work(struct work_struct *work)
2931 {
2932         int i;
2933         struct ceph_mds_client *mdsc =
2934                 container_of(work, struct ceph_mds_client, delayed_work.work);
2935         int renew_interval;
2936         int renew_caps;
2937
2938         dout("mdsc delayed_work\n");
2939         ceph_check_delayed_caps(mdsc);
2940
2941         mutex_lock(&mdsc->mutex);
2942         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2943         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2944                                    mdsc->last_renew_caps);
2945         if (renew_caps)
2946                 mdsc->last_renew_caps = jiffies;
2947
2948         for (i = 0; i < mdsc->max_sessions; i++) {
2949                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2950                 if (s == NULL)
2951                         continue;
2952                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2953                         dout("resending session close request for mds%d\n",
2954                              s->s_mds);
2955                         request_close_session(mdsc, s);
2956                         ceph_put_mds_session(s);
2957                         continue;
2958                 }
2959                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2960                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2961                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2962                                 pr_info("mds%d hung\n", s->s_mds);
2963                         }
2964                 }
2965                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2966                         /* this mds is failed or recovering, just wait */
2967                         ceph_put_mds_session(s);
2968                         continue;
2969                 }
2970                 mutex_unlock(&mdsc->mutex);
2971
2972                 mutex_lock(&s->s_mutex);
2973                 if (renew_caps)
2974                         send_renew_caps(mdsc, s);
2975                 else
2976                         ceph_con_keepalive(&s->s_con);
2977                 ceph_add_cap_releases(mdsc, s);
2978                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2979                     s->s_state == CEPH_MDS_SESSION_HUNG)
2980                         ceph_send_cap_releases(mdsc, s);
2981                 mutex_unlock(&s->s_mutex);
2982                 ceph_put_mds_session(s);
2983
2984                 mutex_lock(&mdsc->mutex);
2985         }
2986         mutex_unlock(&mdsc->mutex);
2987
2988         schedule_delayed(mdsc);
2989 }
2990
2991 int ceph_mdsc_init(struct ceph_fs_client *fsc)
2992
2993 {
2994         struct ceph_mds_client *mdsc;
2995
2996         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2997         if (!mdsc)
2998                 return -ENOMEM;
2999         mdsc->fsc = fsc;
3000         fsc->mdsc = mdsc;
3001         mutex_init(&mdsc->mutex);
3002         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3003         if (mdsc->mdsmap == NULL)
3004                 return -ENOMEM;
3005
3006         init_completion(&mdsc->safe_umount_waiters);
3007         init_waitqueue_head(&mdsc->session_close_wq);
3008         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3009         mdsc->sessions = NULL;
3010         mdsc->max_sessions = 0;
3011         mdsc->stopping = 0;
3012         init_rwsem(&mdsc->snap_rwsem);
3013         mdsc->snap_realms = RB_ROOT;
3014         INIT_LIST_HEAD(&mdsc->snap_empty);
3015         spin_lock_init(&mdsc->snap_empty_lock);
3016         mdsc->last_tid = 0;
3017         mdsc->request_tree = RB_ROOT;
3018         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3019         mdsc->last_renew_caps = jiffies;
3020         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3021         spin_lock_init(&mdsc->cap_delay_lock);
3022         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3023         spin_lock_init(&mdsc->snap_flush_lock);
3024         mdsc->cap_flush_seq = 0;
3025         INIT_LIST_HEAD(&mdsc->cap_dirty);
3026         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3027         mdsc->num_cap_flushing = 0;
3028         spin_lock_init(&mdsc->cap_dirty_lock);
3029         init_waitqueue_head(&mdsc->cap_flushing_wq);
3030         spin_lock_init(&mdsc->dentry_lru_lock);
3031         INIT_LIST_HEAD(&mdsc->dentry_lru);
3032
3033         ceph_caps_init(mdsc);
3034         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3035
3036         return 0;
3037 }
3038
3039 /*
3040  * Wait for safe replies on open mds requests.  If we time out, drop
3041  * all requests from the tree to avoid dangling dentry refs.
3042  */
3043 static void wait_requests(struct ceph_mds_client *mdsc)
3044 {
3045         struct ceph_mds_request *req;
3046         struct ceph_fs_client *fsc = mdsc->fsc;
3047
3048         mutex_lock(&mdsc->mutex);
3049         if (__get_oldest_req(mdsc)) {
3050                 mutex_unlock(&mdsc->mutex);
3051
3052                 dout("wait_requests waiting for requests\n");
3053                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3054                                     fsc->client->options->mount_timeout * HZ);
3055
3056                 /* tear down remaining requests */
3057                 mutex_lock(&mdsc->mutex);
3058                 while ((req = __get_oldest_req(mdsc))) {
3059                         dout("wait_requests timed out on tid %llu\n",
3060                              req->r_tid);
3061                         __unregister_request(mdsc, req);
3062                 }
3063         }
3064         mutex_unlock(&mdsc->mutex);
3065         dout("wait_requests done\n");
3066 }
3067
3068 /*
3069  * called before mount is ro, and before dentries are torn down.
3070  * (hmm, does this still race with new lookups?)
3071  */
3072 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3073 {
3074         dout("pre_umount\n");
3075         mdsc->stopping = 1;
3076
3077         drop_leases(mdsc);
3078         ceph_flush_dirty_caps(mdsc);
3079         wait_requests(mdsc);
3080
3081         /*
3082          * wait for reply handlers to drop their request refs and
3083          * their inode/dcache refs
3084          */
3085         ceph_msgr_flush();
3086 }
3087
3088 /*
3089  * wait for all write mds requests to flush.
3090  */
3091 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3092 {
3093         struct ceph_mds_request *req = NULL, *nextreq;
3094         struct rb_node *n;
3095
3096         mutex_lock(&mdsc->mutex);
3097         dout("wait_unsafe_requests want %lld\n", want_tid);
3098 restart:
3099         req = __get_oldest_req(mdsc);
3100         while (req && req->r_tid <= want_tid) {
3101                 /* find next request */
3102                 n = rb_next(&req->r_node);
3103                 if (n)
3104                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3105                 else
3106                         nextreq = NULL;
3107                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3108                         /* write op */
3109                         ceph_mdsc_get_request(req);
3110                         if (nextreq)
3111                                 ceph_mdsc_get_request(nextreq);
3112                         mutex_unlock(&mdsc->mutex);
3113                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3114                              req->r_tid, want_tid);
3115                         wait_for_completion(&req->r_safe_completion);
3116                         mutex_lock(&mdsc->mutex);
3117                         ceph_mdsc_put_request(req);
3118                         if (!nextreq)
3119                                 break;  /* next dne before, so we're done! */
3120                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3121                                 /* next request was removed from tree */
3122                                 ceph_mdsc_put_request(nextreq);
3123                                 goto restart;
3124                         }
3125                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3126                 }
3127                 req = nextreq;
3128         }
3129         mutex_unlock(&mdsc->mutex);
3130         dout("wait_unsafe_requests done\n");
3131 }
3132
3133 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3134 {
3135         u64 want_tid, want_flush;
3136
3137         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3138                 return;
3139
3140         dout("sync\n");
3141         mutex_lock(&mdsc->mutex);
3142         want_tid = mdsc->last_tid;
3143         want_flush = mdsc->cap_flush_seq;
3144         mutex_unlock(&mdsc->mutex);
3145         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3146
3147         ceph_flush_dirty_caps(mdsc);
3148
3149         wait_unsafe_requests(mdsc, want_tid);
3150         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3151 }
3152
3153 /*
3154  * true if all sessions are closed, or we force unmount
3155  */
3156 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3157 {
3158         int i, n = 0;
3159
3160         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3161                 return true;
3162
3163         mutex_lock(&mdsc->mutex);
3164         for (i = 0; i < mdsc->max_sessions; i++)
3165                 if (mdsc->sessions[i])
3166                         n++;
3167         mutex_unlock(&mdsc->mutex);
3168         return n == 0;
3169 }
3170
3171 /*
3172  * called after sb is ro.
3173  */
3174 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3175 {
3176         struct ceph_mds_session *session;
3177         int i;
3178         struct ceph_fs_client *fsc = mdsc->fsc;
3179         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3180
3181         dout("close_sessions\n");
3182
3183         /* close sessions */
3184         mutex_lock(&mdsc->mutex);
3185         for (i = 0; i < mdsc->max_sessions; i++) {
3186                 session = __ceph_lookup_mds_session(mdsc, i);
3187                 if (!session)
3188                         continue;
3189                 mutex_unlock(&mdsc->mutex);
3190                 mutex_lock(&session->s_mutex);
3191                 __close_session(mdsc, session);
3192                 mutex_unlock(&session->s_mutex);
3193                 ceph_put_mds_session(session);
3194                 mutex_lock(&mdsc->mutex);
3195         }
3196         mutex_unlock(&mdsc->mutex);
3197
3198         dout("waiting for sessions to close\n");
3199         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3200                            timeout);
3201
3202         /* tear down remaining sessions */
3203         mutex_lock(&mdsc->mutex);
3204         for (i = 0; i < mdsc->max_sessions; i++) {
3205                 if (mdsc->sessions[i]) {
3206                         session = get_session(mdsc->sessions[i]);
3207                         __unregister_session(mdsc, session);
3208                         mutex_unlock(&mdsc->mutex);
3209                         mutex_lock(&session->s_mutex);
3210                         remove_session_caps(session);
3211                         mutex_unlock(&session->s_mutex);
3212                         ceph_put_mds_session(session);
3213                         mutex_lock(&mdsc->mutex);
3214                 }
3215         }
3216         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3217         mutex_unlock(&mdsc->mutex);
3218
3219         ceph_cleanup_empty_realms(mdsc);
3220
3221         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3222
3223         dout("stopped\n");
3224 }
3225
3226 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3227 {
3228         dout("stop\n");
3229         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3230         if (mdsc->mdsmap)
3231                 ceph_mdsmap_destroy(mdsc->mdsmap);
3232         kfree(mdsc->sessions);
3233         ceph_caps_finalize(mdsc);
3234 }
3235
3236 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3237 {
3238         struct ceph_mds_client *mdsc = fsc->mdsc;
3239
3240         dout("mdsc_destroy %p\n", mdsc);
3241         ceph_mdsc_stop(mdsc);
3242
3243         /* flush out any connection work with references to us */
3244         ceph_msgr_flush();
3245
3246         fsc->mdsc = NULL;
3247         kfree(mdsc);
3248         dout("mdsc_destroy %p done\n", mdsc);
3249 }
3250
3251
3252 /*
3253  * handle mds map update.
3254  */
3255 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3256 {
3257         u32 epoch;
3258         u32 maplen;
3259         void *p = msg->front.iov_base;
3260         void *end = p + msg->front.iov_len;
3261         struct ceph_mdsmap *newmap, *oldmap;
3262         struct ceph_fsid fsid;
3263         int err = -EINVAL;
3264
3265         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3266         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3267         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3268                 return;
3269         epoch = ceph_decode_32(&p);
3270         maplen = ceph_decode_32(&p);
3271         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3272
3273         /* do we need it? */
3274         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3275         mutex_lock(&mdsc->mutex);
3276         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3277                 dout("handle_map epoch %u <= our %u\n",
3278                      epoch, mdsc->mdsmap->m_epoch);
3279                 mutex_unlock(&mdsc->mutex);
3280                 return;
3281         }
3282
3283         newmap = ceph_mdsmap_decode(&p, end);
3284         if (IS_ERR(newmap)) {
3285                 err = PTR_ERR(newmap);
3286                 goto bad_unlock;
3287         }
3288
3289         /* swap into place */
3290         if (mdsc->mdsmap) {
3291                 oldmap = mdsc->mdsmap;
3292                 mdsc->mdsmap = newmap;
3293                 check_new_map(mdsc, newmap, oldmap);
3294                 ceph_mdsmap_destroy(oldmap);
3295         } else {
3296                 mdsc->mdsmap = newmap;  /* first mds map */
3297         }
3298         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3299
3300         __wake_requests(mdsc, &mdsc->waiting_for_map);
3301
3302         mutex_unlock(&mdsc->mutex);
3303         schedule_delayed(mdsc);
3304         return;
3305
3306 bad_unlock:
3307         mutex_unlock(&mdsc->mutex);
3308 bad:
3309         pr_err("error decoding mdsmap %d\n", err);
3310         return;
3311 }
3312
3313 static struct ceph_connection *con_get(struct ceph_connection *con)
3314 {
3315         struct ceph_mds_session *s = con->private;
3316
3317         if (get_session(s)) {
3318                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3319                 return con;
3320         }
3321         dout("mdsc con_get %p FAIL\n", s);
3322         return NULL;
3323 }
3324
3325 static void con_put(struct ceph_connection *con)
3326 {
3327         struct ceph_mds_session *s = con->private;
3328
3329         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3330         ceph_put_mds_session(s);
3331 }
3332
3333 /*
3334  * if the client is unresponsive for long enough, the mds will kill
3335  * the session entirely.
3336  */
3337 static void peer_reset(struct ceph_connection *con)
3338 {
3339         struct ceph_mds_session *s = con->private;
3340         struct ceph_mds_client *mdsc = s->s_mdsc;
3341
3342         pr_warning("mds%d closed our session\n", s->s_mds);
3343         send_mds_reconnect(mdsc, s);
3344 }
3345
3346 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3347 {
3348         struct ceph_mds_session *s = con->private;
3349         struct ceph_mds_client *mdsc = s->s_mdsc;
3350         int type = le16_to_cpu(msg->hdr.type);
3351
3352         mutex_lock(&mdsc->mutex);
3353         if (__verify_registered_session(mdsc, s) < 0) {
3354                 mutex_unlock(&mdsc->mutex);
3355                 goto out;
3356         }
3357         mutex_unlock(&mdsc->mutex);
3358
3359         switch (type) {
3360         case CEPH_MSG_MDS_MAP:
3361                 ceph_mdsc_handle_map(mdsc, msg);
3362                 break;
3363         case CEPH_MSG_CLIENT_SESSION:
3364                 handle_session(s, msg);
3365                 break;
3366         case CEPH_MSG_CLIENT_REPLY:
3367                 handle_reply(s, msg);
3368                 break;
3369         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3370                 handle_forward(mdsc, s, msg);
3371                 break;
3372         case CEPH_MSG_CLIENT_CAPS:
3373                 ceph_handle_caps(s, msg);
3374                 break;
3375         case CEPH_MSG_CLIENT_SNAP:
3376                 ceph_handle_snap(mdsc, s, msg);
3377                 break;
3378         case CEPH_MSG_CLIENT_LEASE:
3379                 handle_lease(mdsc, s, msg);
3380                 break;
3381
3382         default:
3383                 pr_err("received unknown message type %d %s\n", type,
3384                        ceph_msg_type_name(type));
3385         }
3386 out:
3387         ceph_msg_put(msg);
3388 }
3389
3390 /*
3391  * authentication
3392  */
3393
3394 /*
3395  * Note: returned pointer is the address of a structure that's
3396  * managed separately.  Caller must *not* attempt to free it.
3397  */
3398 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3399                                         int *proto, int force_new)
3400 {
3401         struct ceph_mds_session *s = con->private;
3402         struct ceph_mds_client *mdsc = s->s_mdsc;
3403         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3404         struct ceph_auth_handshake *auth = &s->s_auth;
3405
3406         if (force_new && auth->authorizer) {
3407                 if (ac->ops && ac->ops->destroy_authorizer)
3408                         ac->ops->destroy_authorizer(ac, auth->authorizer);
3409                 auth->authorizer = NULL;
3410         }
3411         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3412                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3413                                                         auth);
3414                 if (ret)
3415                         return ERR_PTR(ret);
3416         }
3417         *proto = ac->protocol;
3418
3419         return auth;
3420 }
3421
3422
3423 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3424 {
3425         struct ceph_mds_session *s = con->private;
3426         struct ceph_mds_client *mdsc = s->s_mdsc;
3427         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3428
3429         return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3430 }
3431
3432 static int invalidate_authorizer(struct ceph_connection *con)
3433 {
3434         struct ceph_mds_session *s = con->private;
3435         struct ceph_mds_client *mdsc = s->s_mdsc;
3436         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3437
3438         if (ac->ops->invalidate_authorizer)
3439                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3440
3441         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3442 }
3443
3444 static const struct ceph_connection_operations mds_con_ops = {
3445         .get = con_get,
3446         .put = con_put,
3447         .dispatch = dispatch,
3448         .get_authorizer = get_authorizer,
3449         .verify_authorizer_reply = verify_authorizer_reply,
3450         .invalidate_authorizer = invalidate_authorizer,
3451         .peer_reset = peer_reset,
3452 };
3453
3454 /* eof */