ceph: improve fragtree change detection
[cascardo/linux.git] / fs / ceph / inode.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/fs.h>
5 #include <linux/slab.h>
6 #include <linux/string.h>
7 #include <linux/uaccess.h>
8 #include <linux/kernel.h>
9 #include <linux/writeback.h>
10 #include <linux/vmalloc.h>
11 #include <linux/posix_acl.h>
12 #include <linux/random.h>
13 #include <linux/sort.h>
14
15 #include "super.h"
16 #include "mds_client.h"
17 #include "cache.h"
18 #include <linux/ceph/decode.h>
19
20 /*
21  * Ceph inode operations
22  *
23  * Implement basic inode helpers (get, alloc) and inode ops (getattr,
24  * setattr, etc.), xattr helpers, and helpers for assimilating
25  * metadata returned by the MDS into our cache.
26  *
27  * Also define helpers for doing asynchronous writeback, invalidation,
28  * and truncation for the benefit of those who can't afford to block
29  * (typically because they are in the message handler path).
30  */
31
32 static const struct inode_operations ceph_symlink_iops;
33
34 static void ceph_invalidate_work(struct work_struct *work);
35 static void ceph_writeback_work(struct work_struct *work);
36 static void ceph_vmtruncate_work(struct work_struct *work);
37
38 /*
39  * find or create an inode, given the ceph ino number
40  */
41 static int ceph_set_ino_cb(struct inode *inode, void *data)
42 {
43         ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
44         inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
45         return 0;
46 }
47
48 struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
49 {
50         struct inode *inode;
51         ino_t t = ceph_vino_to_ino(vino);
52
53         inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
54         if (inode == NULL)
55                 return ERR_PTR(-ENOMEM);
56         if (inode->i_state & I_NEW) {
57                 dout("get_inode created new inode %p %llx.%llx ino %llx\n",
58                      inode, ceph_vinop(inode), (u64)inode->i_ino);
59                 unlock_new_inode(inode);
60         }
61
62         dout("get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
63              vino.snap, inode);
64         return inode;
65 }
66
67 /*
68  * get/constuct snapdir inode for a given directory
69  */
70 struct inode *ceph_get_snapdir(struct inode *parent)
71 {
72         struct ceph_vino vino = {
73                 .ino = ceph_ino(parent),
74                 .snap = CEPH_SNAPDIR,
75         };
76         struct inode *inode = ceph_get_inode(parent->i_sb, vino);
77         struct ceph_inode_info *ci = ceph_inode(inode);
78
79         BUG_ON(!S_ISDIR(parent->i_mode));
80         if (IS_ERR(inode))
81                 return inode;
82         inode->i_mode = parent->i_mode;
83         inode->i_uid = parent->i_uid;
84         inode->i_gid = parent->i_gid;
85         inode->i_op = &ceph_snapdir_iops;
86         inode->i_fop = &ceph_snapdir_fops;
87         ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
88         ci->i_rbytes = 0;
89         return inode;
90 }
91
92 const struct inode_operations ceph_file_iops = {
93         .permission = ceph_permission,
94         .setattr = ceph_setattr,
95         .getattr = ceph_getattr,
96         .setxattr = ceph_setxattr,
97         .getxattr = ceph_getxattr,
98         .listxattr = ceph_listxattr,
99         .removexattr = ceph_removexattr,
100         .get_acl = ceph_get_acl,
101         .set_acl = ceph_set_acl,
102 };
103
104
105 /*
106  * We use a 'frag tree' to keep track of the MDS's directory fragments
107  * for a given inode (usually there is just a single fragment).  We
108  * need to know when a child frag is delegated to a new MDS, or when
109  * it is flagged as replicated, so we can direct our requests
110  * accordingly.
111  */
112
113 /*
114  * find/create a frag in the tree
115  */
116 static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
117                                                     u32 f)
118 {
119         struct rb_node **p;
120         struct rb_node *parent = NULL;
121         struct ceph_inode_frag *frag;
122         int c;
123
124         p = &ci->i_fragtree.rb_node;
125         while (*p) {
126                 parent = *p;
127                 frag = rb_entry(parent, struct ceph_inode_frag, node);
128                 c = ceph_frag_compare(f, frag->frag);
129                 if (c < 0)
130                         p = &(*p)->rb_left;
131                 else if (c > 0)
132                         p = &(*p)->rb_right;
133                 else
134                         return frag;
135         }
136
137         frag = kmalloc(sizeof(*frag), GFP_NOFS);
138         if (!frag) {
139                 pr_err("__get_or_create_frag ENOMEM on %p %llx.%llx "
140                        "frag %x\n", &ci->vfs_inode,
141                        ceph_vinop(&ci->vfs_inode), f);
142                 return ERR_PTR(-ENOMEM);
143         }
144         frag->frag = f;
145         frag->split_by = 0;
146         frag->mds = -1;
147         frag->ndist = 0;
148
149         rb_link_node(&frag->node, parent, p);
150         rb_insert_color(&frag->node, &ci->i_fragtree);
151
152         dout("get_or_create_frag added %llx.%llx frag %x\n",
153              ceph_vinop(&ci->vfs_inode), f);
154         return frag;
155 }
156
157 /*
158  * find a specific frag @f
159  */
160 struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
161 {
162         struct rb_node *n = ci->i_fragtree.rb_node;
163
164         while (n) {
165                 struct ceph_inode_frag *frag =
166                         rb_entry(n, struct ceph_inode_frag, node);
167                 int c = ceph_frag_compare(f, frag->frag);
168                 if (c < 0)
169                         n = n->rb_left;
170                 else if (c > 0)
171                         n = n->rb_right;
172                 else
173                         return frag;
174         }
175         return NULL;
176 }
177
178 /*
179  * Choose frag containing the given value @v.  If @pfrag is
180  * specified, copy the frag delegation info to the caller if
181  * it is present.
182  */
183 static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
184                               struct ceph_inode_frag *pfrag, int *found)
185 {
186         u32 t = ceph_frag_make(0, 0);
187         struct ceph_inode_frag *frag;
188         unsigned nway, i;
189         u32 n;
190
191         if (found)
192                 *found = 0;
193
194         while (1) {
195                 WARN_ON(!ceph_frag_contains_value(t, v));
196                 frag = __ceph_find_frag(ci, t);
197                 if (!frag)
198                         break; /* t is a leaf */
199                 if (frag->split_by == 0) {
200                         if (pfrag)
201                                 memcpy(pfrag, frag, sizeof(*pfrag));
202                         if (found)
203                                 *found = 1;
204                         break;
205                 }
206
207                 /* choose child */
208                 nway = 1 << frag->split_by;
209                 dout("choose_frag(%x) %x splits by %d (%d ways)\n", v, t,
210                      frag->split_by, nway);
211                 for (i = 0; i < nway; i++) {
212                         n = ceph_frag_make_child(t, frag->split_by, i);
213                         if (ceph_frag_contains_value(n, v)) {
214                                 t = n;
215                                 break;
216                         }
217                 }
218                 BUG_ON(i == nway);
219         }
220         dout("choose_frag(%x) = %x\n", v, t);
221
222         return t;
223 }
224
225 u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
226                      struct ceph_inode_frag *pfrag, int *found)
227 {
228         u32 ret;
229         mutex_lock(&ci->i_fragtree_mutex);
230         ret = __ceph_choose_frag(ci, v, pfrag, found);
231         mutex_unlock(&ci->i_fragtree_mutex);
232         return ret;
233 }
234
235 /*
236  * Process dirfrag (delegation) info from the mds.  Include leaf
237  * fragment in tree ONLY if ndist > 0.  Otherwise, only
238  * branches/splits are included in i_fragtree)
239  */
240 static int ceph_fill_dirfrag(struct inode *inode,
241                              struct ceph_mds_reply_dirfrag *dirinfo)
242 {
243         struct ceph_inode_info *ci = ceph_inode(inode);
244         struct ceph_inode_frag *frag;
245         u32 id = le32_to_cpu(dirinfo->frag);
246         int mds = le32_to_cpu(dirinfo->auth);
247         int ndist = le32_to_cpu(dirinfo->ndist);
248         int diri_auth = -1;
249         int i;
250         int err = 0;
251
252         spin_lock(&ci->i_ceph_lock);
253         if (ci->i_auth_cap)
254                 diri_auth = ci->i_auth_cap->mds;
255         spin_unlock(&ci->i_ceph_lock);
256
257         if (mds == -1) /* CDIR_AUTH_PARENT */
258                 mds = diri_auth;
259
260         mutex_lock(&ci->i_fragtree_mutex);
261         if (ndist == 0 && mds == diri_auth) {
262                 /* no delegation info needed. */
263                 frag = __ceph_find_frag(ci, id);
264                 if (!frag)
265                         goto out;
266                 if (frag->split_by == 0) {
267                         /* tree leaf, remove */
268                         dout("fill_dirfrag removed %llx.%llx frag %x"
269                              " (no ref)\n", ceph_vinop(inode), id);
270                         rb_erase(&frag->node, &ci->i_fragtree);
271                         kfree(frag);
272                 } else {
273                         /* tree branch, keep and clear */
274                         dout("fill_dirfrag cleared %llx.%llx frag %x"
275                              " referral\n", ceph_vinop(inode), id);
276                         frag->mds = -1;
277                         frag->ndist = 0;
278                 }
279                 goto out;
280         }
281
282
283         /* find/add this frag to store mds delegation info */
284         frag = __get_or_create_frag(ci, id);
285         if (IS_ERR(frag)) {
286                 /* this is not the end of the world; we can continue
287                    with bad/inaccurate delegation info */
288                 pr_err("fill_dirfrag ENOMEM on mds ref %llx.%llx fg %x\n",
289                        ceph_vinop(inode), le32_to_cpu(dirinfo->frag));
290                 err = -ENOMEM;
291                 goto out;
292         }
293
294         frag->mds = mds;
295         frag->ndist = min_t(u32, ndist, CEPH_MAX_DIRFRAG_REP);
296         for (i = 0; i < frag->ndist; i++)
297                 frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
298         dout("fill_dirfrag %llx.%llx frag %x ndist=%d\n",
299              ceph_vinop(inode), frag->frag, frag->ndist);
300
301 out:
302         mutex_unlock(&ci->i_fragtree_mutex);
303         return err;
304 }
305
306 static int frag_tree_split_cmp(const void *l, const void *r)
307 {
308         struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
309         struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
310         return ceph_frag_compare(ls->frag, rs->frag);
311 }
312
313 static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
314 {
315         if (!frag)
316                 return f == ceph_frag_make(0, 0);
317         if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
318                 return false;
319         return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
320 }
321
322 static int ceph_fill_fragtree(struct inode *inode,
323                               struct ceph_frag_tree_head *fragtree,
324                               struct ceph_mds_reply_dirfrag *dirinfo)
325 {
326         struct ceph_inode_info *ci = ceph_inode(inode);
327         struct ceph_inode_frag *frag, *prev_frag = NULL;
328         struct rb_node *rb_node;
329         unsigned i, split_by, nsplits;
330         u32 id;
331         bool update = false;
332
333         mutex_lock(&ci->i_fragtree_mutex);
334         nsplits = le32_to_cpu(fragtree->nsplits);
335         if (nsplits != ci->i_fragtree_nsplits) {
336                 update = true;
337         } else if (nsplits) {
338                 i = prandom_u32() % nsplits;
339                 id = le32_to_cpu(fragtree->splits[i].frag);
340                 if (!__ceph_find_frag(ci, id))
341                         update = true;
342         } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
343                 rb_node = rb_first(&ci->i_fragtree);
344                 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
345                 if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
346                         update = true;
347         }
348         if (!update && dirinfo) {
349                 id = le32_to_cpu(dirinfo->frag);
350                 if (id != __ceph_choose_frag(ci, id, NULL, NULL))
351                         update = true;
352         }
353         if (!update)
354                 goto out_unlock;
355
356         if (nsplits > 1) {
357                 sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
358                      frag_tree_split_cmp, NULL);
359         }
360
361         dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
362         rb_node = rb_first(&ci->i_fragtree);
363         for (i = 0; i < nsplits; i++) {
364                 id = le32_to_cpu(fragtree->splits[i].frag);
365                 split_by = le32_to_cpu(fragtree->splits[i].by);
366                 if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
367                         pr_err("fill_fragtree %llx.%llx invalid split %d/%u, "
368                                "frag %x split by %d\n", ceph_vinop(inode),
369                                i, nsplits, id, split_by);
370                         continue;
371                 }
372                 frag = NULL;
373                 while (rb_node) {
374                         frag = rb_entry(rb_node, struct ceph_inode_frag, node);
375                         if (ceph_frag_compare(frag->frag, id) >= 0) {
376                                 if (frag->frag != id)
377                                         frag = NULL;
378                                 else
379                                         rb_node = rb_next(rb_node);
380                                 break;
381                         }
382                         rb_node = rb_next(rb_node);
383                         /* delete stale split/leaf node */
384                         if (frag->split_by > 0 ||
385                             !is_frag_child(frag->frag, prev_frag)) {
386                                 rb_erase(&frag->node, &ci->i_fragtree);
387                                 if (frag->split_by > 0)
388                                         ci->i_fragtree_nsplits--;
389                                 kfree(frag);
390                         }
391                         frag = NULL;
392                 }
393                 if (!frag) {
394                         frag = __get_or_create_frag(ci, id);
395                         if (IS_ERR(frag))
396                                 continue;
397                 }
398                 if (frag->split_by == 0)
399                         ci->i_fragtree_nsplits++;
400                 frag->split_by = split_by;
401                 dout(" frag %x split by %d\n", frag->frag, frag->split_by);
402                 prev_frag = frag;
403         }
404         while (rb_node) {
405                 frag = rb_entry(rb_node, struct ceph_inode_frag, node);
406                 rb_node = rb_next(rb_node);
407                 /* delete stale split/leaf node */
408                 if (frag->split_by > 0 ||
409                     !is_frag_child(frag->frag, prev_frag)) {
410                         rb_erase(&frag->node, &ci->i_fragtree);
411                         if (frag->split_by > 0)
412                                 ci->i_fragtree_nsplits--;
413                         kfree(frag);
414                 }
415         }
416 out_unlock:
417         mutex_unlock(&ci->i_fragtree_mutex);
418         return 0;
419 }
420
421 /*
422  * initialize a newly allocated inode.
423  */
424 struct inode *ceph_alloc_inode(struct super_block *sb)
425 {
426         struct ceph_inode_info *ci;
427         int i;
428
429         ci = kmem_cache_alloc(ceph_inode_cachep, GFP_NOFS);
430         if (!ci)
431                 return NULL;
432
433         dout("alloc_inode %p\n", &ci->vfs_inode);
434
435         spin_lock_init(&ci->i_ceph_lock);
436
437         ci->i_version = 0;
438         ci->i_inline_version = 0;
439         ci->i_time_warp_seq = 0;
440         ci->i_ceph_flags = 0;
441         atomic64_set(&ci->i_ordered_count, 1);
442         atomic64_set(&ci->i_release_count, 1);
443         atomic64_set(&ci->i_complete_seq[0], 0);
444         atomic64_set(&ci->i_complete_seq[1], 0);
445         ci->i_symlink = NULL;
446
447         memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
448         ci->i_pool_ns_len = 0;
449
450         ci->i_fragtree = RB_ROOT;
451         mutex_init(&ci->i_fragtree_mutex);
452
453         ci->i_xattrs.blob = NULL;
454         ci->i_xattrs.prealloc_blob = NULL;
455         ci->i_xattrs.dirty = false;
456         ci->i_xattrs.index = RB_ROOT;
457         ci->i_xattrs.count = 0;
458         ci->i_xattrs.names_size = 0;
459         ci->i_xattrs.vals_size = 0;
460         ci->i_xattrs.version = 0;
461         ci->i_xattrs.index_version = 0;
462
463         ci->i_caps = RB_ROOT;
464         ci->i_auth_cap = NULL;
465         ci->i_dirty_caps = 0;
466         ci->i_flushing_caps = 0;
467         INIT_LIST_HEAD(&ci->i_dirty_item);
468         INIT_LIST_HEAD(&ci->i_flushing_item);
469         ci->i_prealloc_cap_flush = NULL;
470         ci->i_cap_flush_tree = RB_ROOT;
471         init_waitqueue_head(&ci->i_cap_wq);
472         ci->i_hold_caps_min = 0;
473         ci->i_hold_caps_max = 0;
474         INIT_LIST_HEAD(&ci->i_cap_delay_list);
475         INIT_LIST_HEAD(&ci->i_cap_snaps);
476         ci->i_head_snapc = NULL;
477         ci->i_snap_caps = 0;
478
479         for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
480                 ci->i_nr_by_mode[i] = 0;
481
482         mutex_init(&ci->i_truncate_mutex);
483         ci->i_truncate_seq = 0;
484         ci->i_truncate_size = 0;
485         ci->i_truncate_pending = 0;
486
487         ci->i_max_size = 0;
488         ci->i_reported_size = 0;
489         ci->i_wanted_max_size = 0;
490         ci->i_requested_max_size = 0;
491
492         ci->i_pin_ref = 0;
493         ci->i_rd_ref = 0;
494         ci->i_rdcache_ref = 0;
495         ci->i_wr_ref = 0;
496         ci->i_wb_ref = 0;
497         ci->i_wrbuffer_ref = 0;
498         ci->i_wrbuffer_ref_head = 0;
499         ci->i_shared_gen = 0;
500         ci->i_rdcache_gen = 0;
501         ci->i_rdcache_revoking = 0;
502
503         INIT_LIST_HEAD(&ci->i_unsafe_writes);
504         INIT_LIST_HEAD(&ci->i_unsafe_dirops);
505         INIT_LIST_HEAD(&ci->i_unsafe_iops);
506         spin_lock_init(&ci->i_unsafe_lock);
507
508         ci->i_snap_realm = NULL;
509         INIT_LIST_HEAD(&ci->i_snap_realm_item);
510         INIT_LIST_HEAD(&ci->i_snap_flush_item);
511
512         INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
513         INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
514
515         INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
516
517         ceph_fscache_inode_init(ci);
518
519         return &ci->vfs_inode;
520 }
521
522 static void ceph_i_callback(struct rcu_head *head)
523 {
524         struct inode *inode = container_of(head, struct inode, i_rcu);
525         struct ceph_inode_info *ci = ceph_inode(inode);
526
527         kmem_cache_free(ceph_inode_cachep, ci);
528 }
529
530 void ceph_destroy_inode(struct inode *inode)
531 {
532         struct ceph_inode_info *ci = ceph_inode(inode);
533         struct ceph_inode_frag *frag;
534         struct rb_node *n;
535
536         dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
537
538         ceph_fscache_unregister_inode_cookie(ci);
539
540         ceph_queue_caps_release(inode);
541
542         /*
543          * we may still have a snap_realm reference if there are stray
544          * caps in i_snap_caps.
545          */
546         if (ci->i_snap_realm) {
547                 struct ceph_mds_client *mdsc =
548                         ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
549                 struct ceph_snap_realm *realm = ci->i_snap_realm;
550
551                 dout(" dropping residual ref to snap realm %p\n", realm);
552                 spin_lock(&realm->inodes_with_caps_lock);
553                 list_del_init(&ci->i_snap_realm_item);
554                 spin_unlock(&realm->inodes_with_caps_lock);
555                 ceph_put_snap_realm(mdsc, realm);
556         }
557
558         kfree(ci->i_symlink);
559         while ((n = rb_first(&ci->i_fragtree)) != NULL) {
560                 frag = rb_entry(n, struct ceph_inode_frag, node);
561                 rb_erase(n, &ci->i_fragtree);
562                 kfree(frag);
563         }
564         ci->i_fragtree_nsplits = 0;
565
566         __ceph_destroy_xattrs(ci);
567         if (ci->i_xattrs.blob)
568                 ceph_buffer_put(ci->i_xattrs.blob);
569         if (ci->i_xattrs.prealloc_blob)
570                 ceph_buffer_put(ci->i_xattrs.prealloc_blob);
571
572         call_rcu(&inode->i_rcu, ceph_i_callback);
573 }
574
575 int ceph_drop_inode(struct inode *inode)
576 {
577         /*
578          * Positve dentry and corresponding inode are always accompanied
579          * in MDS reply. So no need to keep inode in the cache after
580          * dropping all its aliases.
581          */
582         return 1;
583 }
584
585 /*
586  * Helpers to fill in size, ctime, mtime, and atime.  We have to be
587  * careful because either the client or MDS may have more up to date
588  * info, depending on which capabilities are held, and whether
589  * time_warp_seq or truncate_seq have increased.  (Ordinarily, mtime
590  * and size are monotonically increasing, except when utimes() or
591  * truncate() increments the corresponding _seq values.)
592  */
593 int ceph_fill_file_size(struct inode *inode, int issued,
594                         u32 truncate_seq, u64 truncate_size, u64 size)
595 {
596         struct ceph_inode_info *ci = ceph_inode(inode);
597         int queue_trunc = 0;
598
599         if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
600             (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
601                 dout("size %lld -> %llu\n", inode->i_size, size);
602                 if (size > 0 && S_ISDIR(inode->i_mode)) {
603                         pr_err("fill_file_size non-zero size for directory\n");
604                         size = 0;
605                 }
606                 i_size_write(inode, size);
607                 inode->i_blocks = (size + (1<<9) - 1) >> 9;
608                 ci->i_reported_size = size;
609                 if (truncate_seq != ci->i_truncate_seq) {
610                         dout("truncate_seq %u -> %u\n",
611                              ci->i_truncate_seq, truncate_seq);
612                         ci->i_truncate_seq = truncate_seq;
613
614                         /* the MDS should have revoked these caps */
615                         WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL |
616                                                CEPH_CAP_FILE_RD |
617                                                CEPH_CAP_FILE_WR |
618                                                CEPH_CAP_FILE_LAZYIO));
619                         /*
620                          * If we hold relevant caps, or in the case where we're
621                          * not the only client referencing this file and we
622                          * don't hold those caps, then we need to check whether
623                          * the file is either opened or mmaped
624                          */
625                         if ((issued & (CEPH_CAP_FILE_CACHE|
626                                        CEPH_CAP_FILE_BUFFER)) ||
627                             mapping_mapped(inode->i_mapping) ||
628                             __ceph_caps_file_wanted(ci)) {
629                                 ci->i_truncate_pending++;
630                                 queue_trunc = 1;
631                         }
632                 }
633         }
634         if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0 &&
635             ci->i_truncate_size != truncate_size) {
636                 dout("truncate_size %lld -> %llu\n", ci->i_truncate_size,
637                      truncate_size);
638                 ci->i_truncate_size = truncate_size;
639         }
640
641         if (queue_trunc)
642                 ceph_fscache_invalidate(inode);
643
644         return queue_trunc;
645 }
646
647 void ceph_fill_file_time(struct inode *inode, int issued,
648                          u64 time_warp_seq, struct timespec *ctime,
649                          struct timespec *mtime, struct timespec *atime)
650 {
651         struct ceph_inode_info *ci = ceph_inode(inode);
652         int warn = 0;
653
654         if (issued & (CEPH_CAP_FILE_EXCL|
655                       CEPH_CAP_FILE_WR|
656                       CEPH_CAP_FILE_BUFFER|
657                       CEPH_CAP_AUTH_EXCL|
658                       CEPH_CAP_XATTR_EXCL)) {
659                 if (timespec_compare(ctime, &inode->i_ctime) > 0) {
660                         dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
661                              inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
662                              ctime->tv_sec, ctime->tv_nsec);
663                         inode->i_ctime = *ctime;
664                 }
665                 if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) > 0) {
666                         /* the MDS did a utimes() */
667                         dout("mtime %ld.%09ld -> %ld.%09ld "
668                              "tw %d -> %d\n",
669                              inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
670                              mtime->tv_sec, mtime->tv_nsec,
671                              ci->i_time_warp_seq, (int)time_warp_seq);
672
673                         inode->i_mtime = *mtime;
674                         inode->i_atime = *atime;
675                         ci->i_time_warp_seq = time_warp_seq;
676                 } else if (time_warp_seq == ci->i_time_warp_seq) {
677                         /* nobody did utimes(); take the max */
678                         if (timespec_compare(mtime, &inode->i_mtime) > 0) {
679                                 dout("mtime %ld.%09ld -> %ld.%09ld inc\n",
680                                      inode->i_mtime.tv_sec,
681                                      inode->i_mtime.tv_nsec,
682                                      mtime->tv_sec, mtime->tv_nsec);
683                                 inode->i_mtime = *mtime;
684                         }
685                         if (timespec_compare(atime, &inode->i_atime) > 0) {
686                                 dout("atime %ld.%09ld -> %ld.%09ld inc\n",
687                                      inode->i_atime.tv_sec,
688                                      inode->i_atime.tv_nsec,
689                                      atime->tv_sec, atime->tv_nsec);
690                                 inode->i_atime = *atime;
691                         }
692                 } else if (issued & CEPH_CAP_FILE_EXCL) {
693                         /* we did a utimes(); ignore mds values */
694                 } else {
695                         warn = 1;
696                 }
697         } else {
698                 /* we have no write|excl caps; whatever the MDS says is true */
699                 if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
700                         inode->i_ctime = *ctime;
701                         inode->i_mtime = *mtime;
702                         inode->i_atime = *atime;
703                         ci->i_time_warp_seq = time_warp_seq;
704                 } else {
705                         warn = 1;
706                 }
707         }
708         if (warn) /* time_warp_seq shouldn't go backwards */
709                 dout("%p mds time_warp_seq %llu < %u\n",
710                      inode, time_warp_seq, ci->i_time_warp_seq);
711 }
712
713 /*
714  * Populate an inode based on info from mds.  May be called on new or
715  * existing inodes.
716  */
717 static int fill_inode(struct inode *inode, struct page *locked_page,
718                       struct ceph_mds_reply_info_in *iinfo,
719                       struct ceph_mds_reply_dirfrag *dirinfo,
720                       struct ceph_mds_session *session,
721                       unsigned long ttl_from, int cap_fmode,
722                       struct ceph_cap_reservation *caps_reservation)
723 {
724         struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
725         struct ceph_mds_reply_inode *info = iinfo->in;
726         struct ceph_inode_info *ci = ceph_inode(inode);
727         int issued = 0, implemented, new_issued;
728         struct timespec mtime, atime, ctime;
729         struct ceph_buffer *xattr_blob = NULL;
730         struct ceph_cap *new_cap = NULL;
731         int err = 0;
732         bool wake = false;
733         bool queue_trunc = false;
734         bool new_version = false;
735         bool fill_inline = false;
736
737         dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
738              inode, ceph_vinop(inode), le64_to_cpu(info->version),
739              ci->i_version);
740
741         /* prealloc new cap struct */
742         if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
743                 new_cap = ceph_get_cap(mdsc, caps_reservation);
744
745         /*
746          * prealloc xattr data, if it looks like we'll need it.  only
747          * if len > 4 (meaning there are actually xattrs; the first 4
748          * bytes are the xattr count).
749          */
750         if (iinfo->xattr_len > 4) {
751                 xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
752                 if (!xattr_blob)
753                         pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
754                                iinfo->xattr_len);
755         }
756
757         spin_lock(&ci->i_ceph_lock);
758
759         /*
760          * provided version will be odd if inode value is projected,
761          * even if stable.  skip the update if we have newer stable
762          * info (ours>=theirs, e.g. due to racing mds replies), unless
763          * we are getting projected (unstable) info (in which case the
764          * version is odd, and we want ours>theirs).
765          *   us   them
766          *   2    2     skip
767          *   3    2     skip
768          *   3    3     update
769          */
770         if (ci->i_version == 0 ||
771             ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
772              le64_to_cpu(info->version) > (ci->i_version & ~1)))
773                 new_version = true;
774
775         issued = __ceph_caps_issued(ci, &implemented);
776         issued |= implemented | __ceph_caps_dirty(ci);
777         new_issued = ~issued & le32_to_cpu(info->cap.caps);
778
779         /* update inode */
780         ci->i_version = le64_to_cpu(info->version);
781         inode->i_version++;
782         inode->i_rdev = le32_to_cpu(info->rdev);
783         inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
784
785         if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
786             (issued & CEPH_CAP_AUTH_EXCL) == 0) {
787                 inode->i_mode = le32_to_cpu(info->mode);
788                 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
789                 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
790                 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
791                      from_kuid(&init_user_ns, inode->i_uid),
792                      from_kgid(&init_user_ns, inode->i_gid));
793         }
794
795         if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
796             (issued & CEPH_CAP_LINK_EXCL) == 0)
797                 set_nlink(inode, le32_to_cpu(info->nlink));
798
799         if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
800                 /* be careful with mtime, atime, size */
801                 ceph_decode_timespec(&atime, &info->atime);
802                 ceph_decode_timespec(&mtime, &info->mtime);
803                 ceph_decode_timespec(&ctime, &info->ctime);
804                 ceph_fill_file_time(inode, issued,
805                                 le32_to_cpu(info->time_warp_seq),
806                                 &ctime, &mtime, &atime);
807         }
808
809         if (new_version ||
810             (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
811                 if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
812                         ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
813                 ci->i_layout = info->layout;
814                 ci->i_pool_ns_len = iinfo->pool_ns_len;
815
816                 queue_trunc = ceph_fill_file_size(inode, issued,
817                                         le32_to_cpu(info->truncate_seq),
818                                         le64_to_cpu(info->truncate_size),
819                                         le64_to_cpu(info->size));
820                 /* only update max_size on auth cap */
821                 if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
822                     ci->i_max_size != le64_to_cpu(info->max_size)) {
823                         dout("max_size %lld -> %llu\n", ci->i_max_size,
824                                         le64_to_cpu(info->max_size));
825                         ci->i_max_size = le64_to_cpu(info->max_size);
826                 }
827         }
828
829         /* xattrs */
830         /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
831         if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
832             le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
833                 if (ci->i_xattrs.blob)
834                         ceph_buffer_put(ci->i_xattrs.blob);
835                 ci->i_xattrs.blob = xattr_blob;
836                 if (xattr_blob)
837                         memcpy(ci->i_xattrs.blob->vec.iov_base,
838                                iinfo->xattr_data, iinfo->xattr_len);
839                 ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
840                 ceph_forget_all_cached_acls(inode);
841                 xattr_blob = NULL;
842         }
843
844         inode->i_mapping->a_ops = &ceph_aops;
845
846         switch (inode->i_mode & S_IFMT) {
847         case S_IFIFO:
848         case S_IFBLK:
849         case S_IFCHR:
850         case S_IFSOCK:
851                 init_special_inode(inode, inode->i_mode, inode->i_rdev);
852                 inode->i_op = &ceph_file_iops;
853                 break;
854         case S_IFREG:
855                 inode->i_op = &ceph_file_iops;
856                 inode->i_fop = &ceph_file_fops;
857                 break;
858         case S_IFLNK:
859                 inode->i_op = &ceph_symlink_iops;
860                 if (!ci->i_symlink) {
861                         u32 symlen = iinfo->symlink_len;
862                         char *sym;
863
864                         spin_unlock(&ci->i_ceph_lock);
865
866                         err = -EINVAL;
867                         if (WARN_ON(symlen != i_size_read(inode)))
868                                 goto out;
869
870                         err = -ENOMEM;
871                         sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
872                         if (!sym)
873                                 goto out;
874
875                         spin_lock(&ci->i_ceph_lock);
876                         if (!ci->i_symlink)
877                                 ci->i_symlink = sym;
878                         else
879                                 kfree(sym); /* lost a race */
880                 }
881                 inode->i_link = ci->i_symlink;
882                 break;
883         case S_IFDIR:
884                 inode->i_op = &ceph_dir_iops;
885                 inode->i_fop = &ceph_dir_fops;
886
887                 ci->i_dir_layout = iinfo->dir_layout;
888
889                 ci->i_files = le64_to_cpu(info->files);
890                 ci->i_subdirs = le64_to_cpu(info->subdirs);
891                 ci->i_rbytes = le64_to_cpu(info->rbytes);
892                 ci->i_rfiles = le64_to_cpu(info->rfiles);
893                 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
894                 ceph_decode_timespec(&ci->i_rctime, &info->rctime);
895                 break;
896         default:
897                 pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
898                        ceph_vinop(inode), inode->i_mode);
899         }
900
901         /* were we issued a capability? */
902         if (info->cap.caps) {
903                 if (ceph_snap(inode) == CEPH_NOSNAP) {
904                         unsigned caps = le32_to_cpu(info->cap.caps);
905                         ceph_add_cap(inode, session,
906                                      le64_to_cpu(info->cap.cap_id),
907                                      cap_fmode, caps,
908                                      le32_to_cpu(info->cap.wanted),
909                                      le32_to_cpu(info->cap.seq),
910                                      le32_to_cpu(info->cap.mseq),
911                                      le64_to_cpu(info->cap.realm),
912                                      info->cap.flags, &new_cap);
913
914                         /* set dir completion flag? */
915                         if (S_ISDIR(inode->i_mode) &&
916                             ci->i_files == 0 && ci->i_subdirs == 0 &&
917                             (caps & CEPH_CAP_FILE_SHARED) &&
918                             (issued & CEPH_CAP_FILE_EXCL) == 0 &&
919                             !__ceph_dir_is_complete(ci)) {
920                                 dout(" marking %p complete (empty)\n", inode);
921                                 i_size_write(inode, 0);
922                                 __ceph_dir_set_complete(ci,
923                                         atomic64_read(&ci->i_release_count),
924                                         atomic64_read(&ci->i_ordered_count));
925                         }
926
927                         wake = true;
928                 } else {
929                         dout(" %p got snap_caps %s\n", inode,
930                              ceph_cap_string(le32_to_cpu(info->cap.caps)));
931                         ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
932                         if (cap_fmode >= 0)
933                                 __ceph_get_fmode(ci, cap_fmode);
934                 }
935         } else if (cap_fmode >= 0) {
936                 pr_warn("mds issued no caps on %llx.%llx\n",
937                            ceph_vinop(inode));
938                 __ceph_get_fmode(ci, cap_fmode);
939         }
940
941         if (iinfo->inline_version > 0 &&
942             iinfo->inline_version >= ci->i_inline_version) {
943                 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
944                 ci->i_inline_version = iinfo->inline_version;
945                 if (ci->i_inline_version != CEPH_INLINE_NONE &&
946                     (locked_page ||
947                      (le32_to_cpu(info->cap.caps) & cache_caps)))
948                         fill_inline = true;
949         }
950
951         spin_unlock(&ci->i_ceph_lock);
952
953         if (fill_inline)
954                 ceph_fill_inline_data(inode, locked_page,
955                                       iinfo->inline_data, iinfo->inline_len);
956
957         if (wake)
958                 wake_up_all(&ci->i_cap_wq);
959
960         /* queue truncate if we saw i_size decrease */
961         if (queue_trunc)
962                 ceph_queue_vmtruncate(inode);
963
964         /* populate frag tree */
965         if (S_ISDIR(inode->i_mode))
966                 ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
967
968         /* update delegation info? */
969         if (dirinfo)
970                 ceph_fill_dirfrag(inode, dirinfo);
971
972         err = 0;
973 out:
974         if (new_cap)
975                 ceph_put_cap(mdsc, new_cap);
976         if (xattr_blob)
977                 ceph_buffer_put(xattr_blob);
978         return err;
979 }
980
981 /*
982  * caller should hold session s_mutex.
983  */
984 static void update_dentry_lease(struct dentry *dentry,
985                                 struct ceph_mds_reply_lease *lease,
986                                 struct ceph_mds_session *session,
987                                 unsigned long from_time)
988 {
989         struct ceph_dentry_info *di = ceph_dentry(dentry);
990         long unsigned duration = le32_to_cpu(lease->duration_ms);
991         long unsigned ttl = from_time + (duration * HZ) / 1000;
992         long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
993         struct inode *dir;
994
995         /* only track leases on regular dentries */
996         if (dentry->d_op != &ceph_dentry_ops)
997                 return;
998
999         spin_lock(&dentry->d_lock);
1000         dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
1001              dentry, duration, ttl);
1002
1003         /* make lease_rdcache_gen match directory */
1004         dir = d_inode(dentry->d_parent);
1005         di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
1006
1007         if (duration == 0)
1008                 goto out_unlock;
1009
1010         if (di->lease_gen == session->s_cap_gen &&
1011             time_before(ttl, dentry->d_time))
1012                 goto out_unlock;  /* we already have a newer lease. */
1013
1014         if (di->lease_session && di->lease_session != session)
1015                 goto out_unlock;
1016
1017         ceph_dentry_lru_touch(dentry);
1018
1019         if (!di->lease_session)
1020                 di->lease_session = ceph_get_mds_session(session);
1021         di->lease_gen = session->s_cap_gen;
1022         di->lease_seq = le32_to_cpu(lease->seq);
1023         di->lease_renew_after = half_ttl;
1024         di->lease_renew_from = 0;
1025         dentry->d_time = ttl;
1026 out_unlock:
1027         spin_unlock(&dentry->d_lock);
1028         return;
1029 }
1030
1031 /*
1032  * splice a dentry to an inode.
1033  * caller must hold directory i_mutex for this to be safe.
1034  */
1035 static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
1036 {
1037         struct dentry *realdn;
1038
1039         BUG_ON(d_inode(dn));
1040
1041         /* dn must be unhashed */
1042         if (!d_unhashed(dn))
1043                 d_drop(dn);
1044         realdn = d_splice_alias(in, dn);
1045         if (IS_ERR(realdn)) {
1046                 pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
1047                        PTR_ERR(realdn), dn, in, ceph_vinop(in));
1048                 dn = realdn; /* note realdn contains the error */
1049                 goto out;
1050         } else if (realdn) {
1051                 dout("dn %p (%d) spliced with %p (%d) "
1052                      "inode %p ino %llx.%llx\n",
1053                      dn, d_count(dn),
1054                      realdn, d_count(realdn),
1055                      d_inode(realdn), ceph_vinop(d_inode(realdn)));
1056                 dput(dn);
1057                 dn = realdn;
1058         } else {
1059                 BUG_ON(!ceph_dentry(dn));
1060                 dout("dn %p attached to %p ino %llx.%llx\n",
1061                      dn, d_inode(dn), ceph_vinop(d_inode(dn)));
1062         }
1063 out:
1064         return dn;
1065 }
1066
1067 /*
1068  * Incorporate results into the local cache.  This is either just
1069  * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
1070  * after a lookup).
1071  *
1072  * A reply may contain
1073  *         a directory inode along with a dentry.
1074  *  and/or a target inode
1075  *
1076  * Called with snap_rwsem (read).
1077  */
1078 int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
1079                     struct ceph_mds_session *session)
1080 {
1081         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1082         struct inode *in = NULL;
1083         struct ceph_vino vino;
1084         struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
1085         int err = 0;
1086
1087         dout("fill_trace %p is_dentry %d is_target %d\n", req,
1088              rinfo->head->is_dentry, rinfo->head->is_target);
1089
1090 #if 0
1091         /*
1092          * Debugging hook:
1093          *
1094          * If we resend completed ops to a recovering mds, we get no
1095          * trace.  Since that is very rare, pretend this is the case
1096          * to ensure the 'no trace' handlers in the callers behave.
1097          *
1098          * Fill in inodes unconditionally to avoid breaking cap
1099          * invariants.
1100          */
1101         if (rinfo->head->op & CEPH_MDS_OP_WRITE) {
1102                 pr_info("fill_trace faking empty trace on %lld %s\n",
1103                         req->r_tid, ceph_mds_op_name(rinfo->head->op));
1104                 if (rinfo->head->is_dentry) {
1105                         rinfo->head->is_dentry = 0;
1106                         err = fill_inode(req->r_locked_dir,
1107                                          &rinfo->diri, rinfo->dirfrag,
1108                                          session, req->r_request_started, -1);
1109                 }
1110                 if (rinfo->head->is_target) {
1111                         rinfo->head->is_target = 0;
1112                         ininfo = rinfo->targeti.in;
1113                         vino.ino = le64_to_cpu(ininfo->ino);
1114                         vino.snap = le64_to_cpu(ininfo->snapid);
1115                         in = ceph_get_inode(sb, vino);
1116                         err = fill_inode(in, &rinfo->targeti, NULL,
1117                                          session, req->r_request_started,
1118                                          req->r_fmode);
1119                         iput(in);
1120                 }
1121         }
1122 #endif
1123
1124         if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
1125                 dout("fill_trace reply is empty!\n");
1126                 if (rinfo->head->result == 0 && req->r_locked_dir)
1127                         ceph_invalidate_dir_request(req);
1128                 return 0;
1129         }
1130
1131         if (rinfo->head->is_dentry) {
1132                 struct inode *dir = req->r_locked_dir;
1133
1134                 if (dir) {
1135                         err = fill_inode(dir, NULL,
1136                                          &rinfo->diri, rinfo->dirfrag,
1137                                          session, req->r_request_started, -1,
1138                                          &req->r_caps_reservation);
1139                         if (err < 0)
1140                                 goto done;
1141                 } else {
1142                         WARN_ON_ONCE(1);
1143                 }
1144
1145                 if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
1146                         struct qstr dname;
1147                         struct dentry *dn, *parent;
1148
1149                         BUG_ON(!rinfo->head->is_target);
1150                         BUG_ON(req->r_dentry);
1151
1152                         parent = d_find_any_alias(dir);
1153                         BUG_ON(!parent);
1154
1155                         dname.name = rinfo->dname;
1156                         dname.len = rinfo->dname_len;
1157                         dname.hash = full_name_hash(dname.name, dname.len);
1158                         vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1159                         vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1160 retry_lookup:
1161                         dn = d_lookup(parent, &dname);
1162                         dout("d_lookup on parent=%p name=%.*s got %p\n",
1163                              parent, dname.len, dname.name, dn);
1164
1165                         if (!dn) {
1166                                 dn = d_alloc(parent, &dname);
1167                                 dout("d_alloc %p '%.*s' = %p\n", parent,
1168                                      dname.len, dname.name, dn);
1169                                 if (dn == NULL) {
1170                                         dput(parent);
1171                                         err = -ENOMEM;
1172                                         goto done;
1173                                 }
1174                                 err = ceph_init_dentry(dn);
1175                                 if (err < 0) {
1176                                         dput(dn);
1177                                         dput(parent);
1178                                         goto done;
1179                                 }
1180                         } else if (d_really_is_positive(dn) &&
1181                                    (ceph_ino(d_inode(dn)) != vino.ino ||
1182                                     ceph_snap(d_inode(dn)) != vino.snap)) {
1183                                 dout(" dn %p points to wrong inode %p\n",
1184                                      dn, d_inode(dn));
1185                                 d_delete(dn);
1186                                 dput(dn);
1187                                 goto retry_lookup;
1188                         }
1189
1190                         req->r_dentry = dn;
1191                         dput(parent);
1192                 }
1193         }
1194
1195         if (rinfo->head->is_target) {
1196                 vino.ino = le64_to_cpu(rinfo->targeti.in->ino);
1197                 vino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
1198
1199                 in = ceph_get_inode(sb, vino);
1200                 if (IS_ERR(in)) {
1201                         err = PTR_ERR(in);
1202                         goto done;
1203                 }
1204                 req->r_target_inode = in;
1205
1206                 err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
1207                                 session, req->r_request_started,
1208                                 (!req->r_aborted && rinfo->head->result == 0) ?
1209                                 req->r_fmode : -1,
1210                                 &req->r_caps_reservation);
1211                 if (err < 0) {
1212                         pr_err("fill_inode badness %p %llx.%llx\n",
1213                                 in, ceph_vinop(in));
1214                         goto done;
1215                 }
1216         }
1217
1218         /*
1219          * ignore null lease/binding on snapdir ENOENT, or else we
1220          * will have trouble splicing in the virtual snapdir later
1221          */
1222         if (rinfo->head->is_dentry && !req->r_aborted &&
1223             req->r_locked_dir &&
1224             (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
1225                                                fsc->mount_options->snapdir_name,
1226                                                req->r_dentry->d_name.len))) {
1227                 /*
1228                  * lookup link rename   : null -> possibly existing inode
1229                  * mknod symlink mkdir  : null -> new inode
1230                  * unlink               : linked -> null
1231                  */
1232                 struct inode *dir = req->r_locked_dir;
1233                 struct dentry *dn = req->r_dentry;
1234                 bool have_dir_cap, have_lease;
1235
1236                 BUG_ON(!dn);
1237                 BUG_ON(!dir);
1238                 BUG_ON(d_inode(dn->d_parent) != dir);
1239                 BUG_ON(ceph_ino(dir) !=
1240                        le64_to_cpu(rinfo->diri.in->ino));
1241                 BUG_ON(ceph_snap(dir) !=
1242                        le64_to_cpu(rinfo->diri.in->snapid));
1243
1244                 /* do we have a lease on the whole dir? */
1245                 have_dir_cap =
1246                         (le32_to_cpu(rinfo->diri.in->cap.caps) &
1247                          CEPH_CAP_FILE_SHARED);
1248
1249                 /* do we have a dn lease? */
1250                 have_lease = have_dir_cap ||
1251                         le32_to_cpu(rinfo->dlease->duration_ms);
1252                 if (!have_lease)
1253                         dout("fill_trace  no dentry lease or dir cap\n");
1254
1255                 /* rename? */
1256                 if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) {
1257                         struct inode *olddir = req->r_old_dentry_dir;
1258                         BUG_ON(!olddir);
1259
1260                         dout(" src %p '%pd' dst %p '%pd'\n",
1261                              req->r_old_dentry,
1262                              req->r_old_dentry,
1263                              dn, dn);
1264                         dout("fill_trace doing d_move %p -> %p\n",
1265                              req->r_old_dentry, dn);
1266
1267                         /* d_move screws up sibling dentries' offsets */
1268                         ceph_dir_clear_ordered(dir);
1269                         ceph_dir_clear_ordered(olddir);
1270
1271                         d_move(req->r_old_dentry, dn);
1272                         dout(" src %p '%pd' dst %p '%pd'\n",
1273                              req->r_old_dentry,
1274                              req->r_old_dentry,
1275                              dn, dn);
1276
1277                         /* ensure target dentry is invalidated, despite
1278                            rehashing bug in vfs_rename_dir */
1279                         ceph_invalidate_dentry_lease(dn);
1280
1281                         dout("dn %p gets new offset %lld\n", req->r_old_dentry,
1282                              ceph_dentry(req->r_old_dentry)->offset);
1283
1284                         dn = req->r_old_dentry;  /* use old_dentry */
1285                 }
1286
1287                 /* null dentry? */
1288                 if (!rinfo->head->is_target) {
1289                         dout("fill_trace null dentry\n");
1290                         if (d_really_is_positive(dn)) {
1291                                 ceph_dir_clear_ordered(dir);
1292                                 dout("d_delete %p\n", dn);
1293                                 d_delete(dn);
1294                         } else {
1295                                 if (have_lease && d_unhashed(dn))
1296                                         d_add(dn, NULL);
1297                                 update_dentry_lease(dn, rinfo->dlease,
1298                                                     session,
1299                                                     req->r_request_started);
1300                         }
1301                         goto done;
1302                 }
1303
1304                 /* attach proper inode */
1305                 if (d_really_is_negative(dn)) {
1306                         ceph_dir_clear_ordered(dir);
1307                         ihold(in);
1308                         dn = splice_dentry(dn, in);
1309                         if (IS_ERR(dn)) {
1310                                 err = PTR_ERR(dn);
1311                                 goto done;
1312                         }
1313                         req->r_dentry = dn;  /* may have spliced */
1314                 } else if (d_really_is_positive(dn) && d_inode(dn) != in) {
1315                         dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
1316                              dn, d_inode(dn), ceph_vinop(d_inode(dn)),
1317                              ceph_vinop(in));
1318                         d_invalidate(dn);
1319                         have_lease = false;
1320                 }
1321
1322                 if (have_lease)
1323                         update_dentry_lease(dn, rinfo->dlease, session,
1324                                             req->r_request_started);
1325                 dout(" final dn %p\n", dn);
1326         } else if (!req->r_aborted &&
1327                    (req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
1328                     req->r_op == CEPH_MDS_OP_MKSNAP)) {
1329                 struct dentry *dn = req->r_dentry;
1330                 struct inode *dir = req->r_locked_dir;
1331
1332                 /* fill out a snapdir LOOKUPSNAP dentry */
1333                 BUG_ON(!dn);
1334                 BUG_ON(!dir);
1335                 BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
1336                 dout(" linking snapped dir %p to dn %p\n", in, dn);
1337                 ceph_dir_clear_ordered(dir);
1338                 ihold(in);
1339                 dn = splice_dentry(dn, in);
1340                 if (IS_ERR(dn)) {
1341                         err = PTR_ERR(dn);
1342                         goto done;
1343                 }
1344                 req->r_dentry = dn;  /* may have spliced */
1345         }
1346 done:
1347         dout("fill_trace done err=%d\n", err);
1348         return err;
1349 }
1350
1351 /*
1352  * Prepopulate our cache with readdir results, leases, etc.
1353  */
1354 static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1355                                            struct ceph_mds_session *session)
1356 {
1357         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1358         int i, err = 0;
1359
1360         for (i = 0; i < rinfo->dir_nr; i++) {
1361                 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1362                 struct ceph_vino vino;
1363                 struct inode *in;
1364                 int rc;
1365
1366                 vino.ino = le64_to_cpu(rde->inode.in->ino);
1367                 vino.snap = le64_to_cpu(rde->inode.in->snapid);
1368
1369                 in = ceph_get_inode(req->r_dentry->d_sb, vino);
1370                 if (IS_ERR(in)) {
1371                         err = PTR_ERR(in);
1372                         dout("new_inode badness got %d\n", err);
1373                         continue;
1374                 }
1375                 rc = fill_inode(in, NULL, &rde->inode, NULL, session,
1376                                 req->r_request_started, -1,
1377                                 &req->r_caps_reservation);
1378                 if (rc < 0) {
1379                         pr_err("fill_inode badness on %p got %d\n", in, rc);
1380                         err = rc;
1381                 }
1382                 iput(in);
1383         }
1384
1385         return err;
1386 }
1387
1388 void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
1389 {
1390         if (ctl->page) {
1391                 kunmap(ctl->page);
1392                 put_page(ctl->page);
1393                 ctl->page = NULL;
1394         }
1395 }
1396
1397 static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
1398                               struct ceph_readdir_cache_control *ctl,
1399                               struct ceph_mds_request *req)
1400 {
1401         struct ceph_inode_info *ci = ceph_inode(dir);
1402         unsigned nsize = PAGE_SIZE / sizeof(struct dentry*);
1403         unsigned idx = ctl->index % nsize;
1404         pgoff_t pgoff = ctl->index / nsize;
1405
1406         if (!ctl->page || pgoff != page_index(ctl->page)) {
1407                 ceph_readdir_cache_release(ctl);
1408                 if (idx == 0)
1409                         ctl->page = grab_cache_page(&dir->i_data, pgoff);
1410                 else
1411                         ctl->page = find_lock_page(&dir->i_data, pgoff);
1412                 if (!ctl->page) {
1413                         ctl->index = -1;
1414                         return idx == 0 ? -ENOMEM : 0;
1415                 }
1416                 /* reading/filling the cache are serialized by
1417                  * i_mutex, no need to use page lock */
1418                 unlock_page(ctl->page);
1419                 ctl->dentries = kmap(ctl->page);
1420                 if (idx == 0)
1421                         memset(ctl->dentries, 0, PAGE_SIZE);
1422         }
1423
1424         if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
1425             req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
1426                 dout("readdir cache dn %p idx %d\n", dn, ctl->index);
1427                 ctl->dentries[idx] = dn;
1428                 ctl->index++;
1429         } else {
1430                 dout("disable readdir cache\n");
1431                 ctl->index = -1;
1432         }
1433         return 0;
1434 }
1435
1436 int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1437                              struct ceph_mds_session *session)
1438 {
1439         struct dentry *parent = req->r_dentry;
1440         struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
1441         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1442         struct qstr dname;
1443         struct dentry *dn;
1444         struct inode *in;
1445         int err = 0, skipped = 0, ret, i;
1446         struct inode *snapdir = NULL;
1447         struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
1448         u32 frag = le32_to_cpu(rhead->args.readdir.frag);
1449         u32 last_hash = 0;
1450         u32 fpos_offset;
1451         struct ceph_readdir_cache_control cache_ctl = {};
1452
1453         if (req->r_aborted)
1454                 return readdir_prepopulate_inodes_only(req, session);
1455
1456         if (rinfo->hash_order && req->r_path2) {
1457                 last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1458                                           req->r_path2, strlen(req->r_path2));
1459                 last_hash = ceph_frag_value(last_hash);
1460         }
1461
1462         if (rinfo->dir_dir &&
1463             le32_to_cpu(rinfo->dir_dir->frag) != frag) {
1464                 dout("readdir_prepopulate got new frag %x -> %x\n",
1465                      frag, le32_to_cpu(rinfo->dir_dir->frag));
1466                 frag = le32_to_cpu(rinfo->dir_dir->frag);
1467                 if (!rinfo->hash_order)
1468                         req->r_readdir_offset = 2;
1469         }
1470
1471         if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
1472                 snapdir = ceph_get_snapdir(d_inode(parent));
1473                 parent = d_find_alias(snapdir);
1474                 dout("readdir_prepopulate %d items under SNAPDIR dn %p\n",
1475                      rinfo->dir_nr, parent);
1476         } else {
1477                 dout("readdir_prepopulate %d items under dn %p\n",
1478                      rinfo->dir_nr, parent);
1479                 if (rinfo->dir_dir)
1480                         ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
1481         }
1482
1483         if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
1484                 /* note dir version at start of readdir so we can tell
1485                  * if any dentries get dropped */
1486                 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
1487                 req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
1488                 req->r_readdir_cache_idx = 0;
1489         }
1490
1491         cache_ctl.index = req->r_readdir_cache_idx;
1492         fpos_offset = req->r_readdir_offset;
1493
1494         /* FIXME: release caps/leases if error occurs */
1495         for (i = 0; i < rinfo->dir_nr; i++) {
1496                 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
1497                 struct ceph_vino vino;
1498
1499                 dname.name = rde->name;
1500                 dname.len = rde->name_len;
1501                 dname.hash = full_name_hash(dname.name, dname.len);
1502
1503                 vino.ino = le64_to_cpu(rde->inode.in->ino);
1504                 vino.snap = le64_to_cpu(rde->inode.in->snapid);
1505
1506                 if (rinfo->hash_order) {
1507                         u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1508                                                  rde->name, rde->name_len);
1509                         hash = ceph_frag_value(hash);
1510                         if (hash != last_hash)
1511                                 fpos_offset = 2;
1512                         last_hash = hash;
1513                         rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
1514                 } else {
1515                         rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
1516                 }
1517
1518 retry_lookup:
1519                 dn = d_lookup(parent, &dname);
1520                 dout("d_lookup on parent=%p name=%.*s got %p\n",
1521                      parent, dname.len, dname.name, dn);
1522
1523                 if (!dn) {
1524                         dn = d_alloc(parent, &dname);
1525                         dout("d_alloc %p '%.*s' = %p\n", parent,
1526                              dname.len, dname.name, dn);
1527                         if (dn == NULL) {
1528                                 dout("d_alloc badness\n");
1529                                 err = -ENOMEM;
1530                                 goto out;
1531                         }
1532                         ret = ceph_init_dentry(dn);
1533                         if (ret < 0) {
1534                                 dput(dn);
1535                                 err = ret;
1536                                 goto out;
1537                         }
1538                 } else if (d_really_is_positive(dn) &&
1539                            (ceph_ino(d_inode(dn)) != vino.ino ||
1540                             ceph_snap(d_inode(dn)) != vino.snap)) {
1541                         dout(" dn %p points to wrong inode %p\n",
1542                              dn, d_inode(dn));
1543                         d_delete(dn);
1544                         dput(dn);
1545                         goto retry_lookup;
1546                 }
1547
1548                 /* inode */
1549                 if (d_really_is_positive(dn)) {
1550                         in = d_inode(dn);
1551                 } else {
1552                         in = ceph_get_inode(parent->d_sb, vino);
1553                         if (IS_ERR(in)) {
1554                                 dout("new_inode badness\n");
1555                                 d_drop(dn);
1556                                 dput(dn);
1557                                 err = PTR_ERR(in);
1558                                 goto out;
1559                         }
1560                 }
1561
1562                 ret = fill_inode(in, NULL, &rde->inode, NULL, session,
1563                                  req->r_request_started, -1,
1564                                  &req->r_caps_reservation);
1565                 if (ret < 0) {
1566                         pr_err("fill_inode badness on %p\n", in);
1567                         if (d_really_is_negative(dn))
1568                                 iput(in);
1569                         d_drop(dn);
1570                         err = ret;
1571                         goto next_item;
1572                 }
1573
1574                 if (d_really_is_negative(dn)) {
1575                         struct dentry *realdn;
1576
1577                         if (ceph_security_xattr_deadlock(in)) {
1578                                 dout(" skip splicing dn %p to inode %p"
1579                                      " (security xattr deadlock)\n", dn, in);
1580                                 iput(in);
1581                                 skipped++;
1582                                 goto next_item;
1583                         }
1584
1585                         realdn = splice_dentry(dn, in);
1586                         if (IS_ERR(realdn)) {
1587                                 err = PTR_ERR(realdn);
1588                                 d_drop(dn);
1589                                 dn = NULL;
1590                                 goto next_item;
1591                         }
1592                         dn = realdn;
1593                 }
1594
1595                 ceph_dentry(dn)->offset = rde->offset;
1596
1597                 update_dentry_lease(dn, rde->lease, req->r_session,
1598                                     req->r_request_started);
1599
1600                 if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
1601                         ret = fill_readdir_cache(d_inode(parent), dn,
1602                                                  &cache_ctl, req);
1603                         if (ret < 0)
1604                                 err = ret;
1605                 }
1606 next_item:
1607                 if (dn)
1608                         dput(dn);
1609         }
1610 out:
1611         if (err == 0 && skipped == 0) {
1612                 req->r_did_prepopulate = true;
1613                 req->r_readdir_cache_idx = cache_ctl.index;
1614         }
1615         ceph_readdir_cache_release(&cache_ctl);
1616         if (snapdir) {
1617                 iput(snapdir);
1618                 dput(parent);
1619         }
1620         dout("readdir_prepopulate done\n");
1621         return err;
1622 }
1623
1624 int ceph_inode_set_size(struct inode *inode, loff_t size)
1625 {
1626         struct ceph_inode_info *ci = ceph_inode(inode);
1627         int ret = 0;
1628
1629         spin_lock(&ci->i_ceph_lock);
1630         dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1631         i_size_write(inode, size);
1632         inode->i_blocks = (size + (1 << 9) - 1) >> 9;
1633
1634         /* tell the MDS if we are approaching max_size */
1635         if ((size << 1) >= ci->i_max_size &&
1636             (ci->i_reported_size << 1) < ci->i_max_size)
1637                 ret = 1;
1638
1639         spin_unlock(&ci->i_ceph_lock);
1640         return ret;
1641 }
1642
1643 /*
1644  * Write back inode data in a worker thread.  (This can't be done
1645  * in the message handler context.)
1646  */
1647 void ceph_queue_writeback(struct inode *inode)
1648 {
1649         ihold(inode);
1650         if (queue_work(ceph_inode_to_client(inode)->wb_wq,
1651                        &ceph_inode(inode)->i_wb_work)) {
1652                 dout("ceph_queue_writeback %p\n", inode);
1653         } else {
1654                 dout("ceph_queue_writeback %p failed\n", inode);
1655                 iput(inode);
1656         }
1657 }
1658
1659 static void ceph_writeback_work(struct work_struct *work)
1660 {
1661         struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1662                                                   i_wb_work);
1663         struct inode *inode = &ci->vfs_inode;
1664
1665         dout("writeback %p\n", inode);
1666         filemap_fdatawrite(&inode->i_data);
1667         iput(inode);
1668 }
1669
1670 /*
1671  * queue an async invalidation
1672  */
1673 void ceph_queue_invalidate(struct inode *inode)
1674 {
1675         ihold(inode);
1676         if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
1677                        &ceph_inode(inode)->i_pg_inv_work)) {
1678                 dout("ceph_queue_invalidate %p\n", inode);
1679         } else {
1680                 dout("ceph_queue_invalidate %p failed\n", inode);
1681                 iput(inode);
1682         }
1683 }
1684
1685 /*
1686  * Invalidate inode pages in a worker thread.  (This can't be done
1687  * in the message handler context.)
1688  */
1689 static void ceph_invalidate_work(struct work_struct *work)
1690 {
1691         struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1692                                                   i_pg_inv_work);
1693         struct inode *inode = &ci->vfs_inode;
1694         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1695         u32 orig_gen;
1696         int check = 0;
1697
1698         mutex_lock(&ci->i_truncate_mutex);
1699
1700         if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1701                 pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
1702                                     inode, ceph_ino(inode));
1703                 mapping_set_error(inode->i_mapping, -EIO);
1704                 truncate_pagecache(inode, 0);
1705                 mutex_unlock(&ci->i_truncate_mutex);
1706                 goto out;
1707         }
1708
1709         spin_lock(&ci->i_ceph_lock);
1710         dout("invalidate_pages %p gen %d revoking %d\n", inode,
1711              ci->i_rdcache_gen, ci->i_rdcache_revoking);
1712         if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
1713                 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1714                         check = 1;
1715                 spin_unlock(&ci->i_ceph_lock);
1716                 mutex_unlock(&ci->i_truncate_mutex);
1717                 goto out;
1718         }
1719         orig_gen = ci->i_rdcache_gen;
1720         spin_unlock(&ci->i_ceph_lock);
1721
1722         truncate_pagecache(inode, 0);
1723
1724         spin_lock(&ci->i_ceph_lock);
1725         if (orig_gen == ci->i_rdcache_gen &&
1726             orig_gen == ci->i_rdcache_revoking) {
1727                 dout("invalidate_pages %p gen %d successful\n", inode,
1728                      ci->i_rdcache_gen);
1729                 ci->i_rdcache_revoking--;
1730                 check = 1;
1731         } else {
1732                 dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
1733                      inode, orig_gen, ci->i_rdcache_gen,
1734                      ci->i_rdcache_revoking);
1735                 if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
1736                         check = 1;
1737         }
1738         spin_unlock(&ci->i_ceph_lock);
1739         mutex_unlock(&ci->i_truncate_mutex);
1740 out:
1741         if (check)
1742                 ceph_check_caps(ci, 0, NULL);
1743         iput(inode);
1744 }
1745
1746
1747 /*
1748  * called by trunc_wq;
1749  *
1750  * We also truncate in a separate thread as well.
1751  */
1752 static void ceph_vmtruncate_work(struct work_struct *work)
1753 {
1754         struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
1755                                                   i_vmtruncate_work);
1756         struct inode *inode = &ci->vfs_inode;
1757
1758         dout("vmtruncate_work %p\n", inode);
1759         __ceph_do_pending_vmtruncate(inode);
1760         iput(inode);
1761 }
1762
1763 /*
1764  * Queue an async vmtruncate.  If we fail to queue work, we will handle
1765  * the truncation the next time we call __ceph_do_pending_vmtruncate.
1766  */
1767 void ceph_queue_vmtruncate(struct inode *inode)
1768 {
1769         struct ceph_inode_info *ci = ceph_inode(inode);
1770
1771         ihold(inode);
1772
1773         if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
1774                        &ci->i_vmtruncate_work)) {
1775                 dout("ceph_queue_vmtruncate %p\n", inode);
1776         } else {
1777                 dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
1778                      inode, ci->i_truncate_pending);
1779                 iput(inode);
1780         }
1781 }
1782
1783 /*
1784  * Make sure any pending truncation is applied before doing anything
1785  * that may depend on it.
1786  */
1787 void __ceph_do_pending_vmtruncate(struct inode *inode)
1788 {
1789         struct ceph_inode_info *ci = ceph_inode(inode);
1790         u64 to;
1791         int wrbuffer_refs, finish = 0;
1792
1793         mutex_lock(&ci->i_truncate_mutex);
1794 retry:
1795         spin_lock(&ci->i_ceph_lock);
1796         if (ci->i_truncate_pending == 0) {
1797                 dout("__do_pending_vmtruncate %p none pending\n", inode);
1798                 spin_unlock(&ci->i_ceph_lock);
1799                 mutex_unlock(&ci->i_truncate_mutex);
1800                 return;
1801         }
1802
1803         /*
1804          * make sure any dirty snapped pages are flushed before we
1805          * possibly truncate them.. so write AND block!
1806          */
1807         if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
1808                 dout("__do_pending_vmtruncate %p flushing snaps first\n",
1809                      inode);
1810                 spin_unlock(&ci->i_ceph_lock);
1811                 filemap_write_and_wait_range(&inode->i_data, 0,
1812                                              inode->i_sb->s_maxbytes);
1813                 goto retry;
1814         }
1815
1816         /* there should be no reader or writer */
1817         WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
1818
1819         to = ci->i_truncate_size;
1820         wrbuffer_refs = ci->i_wrbuffer_ref;
1821         dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
1822              ci->i_truncate_pending, to);
1823         spin_unlock(&ci->i_ceph_lock);
1824
1825         truncate_pagecache(inode, to);
1826
1827         spin_lock(&ci->i_ceph_lock);
1828         if (to == ci->i_truncate_size) {
1829                 ci->i_truncate_pending = 0;
1830                 finish = 1;
1831         }
1832         spin_unlock(&ci->i_ceph_lock);
1833         if (!finish)
1834                 goto retry;
1835
1836         mutex_unlock(&ci->i_truncate_mutex);
1837
1838         if (wrbuffer_refs == 0)
1839                 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
1840
1841         wake_up_all(&ci->i_cap_wq);
1842 }
1843
1844 /*
1845  * symlinks
1846  */
1847 static const struct inode_operations ceph_symlink_iops = {
1848         .readlink = generic_readlink,
1849         .get_link = simple_get_link,
1850         .setattr = ceph_setattr,
1851         .getattr = ceph_getattr,
1852         .setxattr = ceph_setxattr,
1853         .getxattr = ceph_getxattr,
1854         .listxattr = ceph_listxattr,
1855         .removexattr = ceph_removexattr,
1856 };
1857
1858 /*
1859  * setattr
1860  */
1861 int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1862 {
1863         struct inode *inode = d_inode(dentry);
1864         struct ceph_inode_info *ci = ceph_inode(inode);
1865         const unsigned int ia_valid = attr->ia_valid;
1866         struct ceph_mds_request *req;
1867         struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
1868         struct ceph_cap_flush *prealloc_cf;
1869         int issued;
1870         int release = 0, dirtied = 0;
1871         int mask = 0;
1872         int err = 0;
1873         int inode_dirty_flags = 0;
1874         bool lock_snap_rwsem = false;
1875
1876         if (ceph_snap(inode) != CEPH_NOSNAP)
1877                 return -EROFS;
1878
1879         err = inode_change_ok(inode, attr);
1880         if (err != 0)
1881                 return err;
1882
1883         prealloc_cf = ceph_alloc_cap_flush();
1884         if (!prealloc_cf)
1885                 return -ENOMEM;
1886
1887         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
1888                                        USE_AUTH_MDS);
1889         if (IS_ERR(req)) {
1890                 ceph_free_cap_flush(prealloc_cf);
1891                 return PTR_ERR(req);
1892         }
1893
1894         spin_lock(&ci->i_ceph_lock);
1895         issued = __ceph_caps_issued(ci, NULL);
1896
1897         if (!ci->i_head_snapc &&
1898             (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
1899                 lock_snap_rwsem = true;
1900                 if (!down_read_trylock(&mdsc->snap_rwsem)) {
1901                         spin_unlock(&ci->i_ceph_lock);
1902                         down_read(&mdsc->snap_rwsem);
1903                         spin_lock(&ci->i_ceph_lock);
1904                         issued = __ceph_caps_issued(ci, NULL);
1905                 }
1906         }
1907
1908         dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
1909
1910         if (ia_valid & ATTR_UID) {
1911                 dout("setattr %p uid %d -> %d\n", inode,
1912                      from_kuid(&init_user_ns, inode->i_uid),
1913                      from_kuid(&init_user_ns, attr->ia_uid));
1914                 if (issued & CEPH_CAP_AUTH_EXCL) {
1915                         inode->i_uid = attr->ia_uid;
1916                         dirtied |= CEPH_CAP_AUTH_EXCL;
1917                 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1918                            !uid_eq(attr->ia_uid, inode->i_uid)) {
1919                         req->r_args.setattr.uid = cpu_to_le32(
1920                                 from_kuid(&init_user_ns, attr->ia_uid));
1921                         mask |= CEPH_SETATTR_UID;
1922                         release |= CEPH_CAP_AUTH_SHARED;
1923                 }
1924         }
1925         if (ia_valid & ATTR_GID) {
1926                 dout("setattr %p gid %d -> %d\n", inode,
1927                      from_kgid(&init_user_ns, inode->i_gid),
1928                      from_kgid(&init_user_ns, attr->ia_gid));
1929                 if (issued & CEPH_CAP_AUTH_EXCL) {
1930                         inode->i_gid = attr->ia_gid;
1931                         dirtied |= CEPH_CAP_AUTH_EXCL;
1932                 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1933                            !gid_eq(attr->ia_gid, inode->i_gid)) {
1934                         req->r_args.setattr.gid = cpu_to_le32(
1935                                 from_kgid(&init_user_ns, attr->ia_gid));
1936                         mask |= CEPH_SETATTR_GID;
1937                         release |= CEPH_CAP_AUTH_SHARED;
1938                 }
1939         }
1940         if (ia_valid & ATTR_MODE) {
1941                 dout("setattr %p mode 0%o -> 0%o\n", inode, inode->i_mode,
1942                      attr->ia_mode);
1943                 if (issued & CEPH_CAP_AUTH_EXCL) {
1944                         inode->i_mode = attr->ia_mode;
1945                         dirtied |= CEPH_CAP_AUTH_EXCL;
1946                 } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||
1947                            attr->ia_mode != inode->i_mode) {
1948                         inode->i_mode = attr->ia_mode;
1949                         req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);
1950                         mask |= CEPH_SETATTR_MODE;
1951                         release |= CEPH_CAP_AUTH_SHARED;
1952                 }
1953         }
1954
1955         if (ia_valid & ATTR_ATIME) {
1956                 dout("setattr %p atime %ld.%ld -> %ld.%ld\n", inode,
1957                      inode->i_atime.tv_sec, inode->i_atime.tv_nsec,
1958                      attr->ia_atime.tv_sec, attr->ia_atime.tv_nsec);
1959                 if (issued & CEPH_CAP_FILE_EXCL) {
1960                         ci->i_time_warp_seq++;
1961                         inode->i_atime = attr->ia_atime;
1962                         dirtied |= CEPH_CAP_FILE_EXCL;
1963                 } else if ((issued & CEPH_CAP_FILE_WR) &&
1964                            timespec_compare(&inode->i_atime,
1965                                             &attr->ia_atime) < 0) {
1966                         inode->i_atime = attr->ia_atime;
1967                         dirtied |= CEPH_CAP_FILE_WR;
1968                 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1969                            !timespec_equal(&inode->i_atime, &attr->ia_atime)) {
1970                         ceph_encode_timespec(&req->r_args.setattr.atime,
1971                                              &attr->ia_atime);
1972                         mask |= CEPH_SETATTR_ATIME;
1973                         release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD |
1974                                 CEPH_CAP_FILE_WR;
1975                 }
1976         }
1977         if (ia_valid & ATTR_MTIME) {
1978                 dout("setattr %p mtime %ld.%ld -> %ld.%ld\n", inode,
1979                      inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
1980                      attr->ia_mtime.tv_sec, attr->ia_mtime.tv_nsec);
1981                 if (issued & CEPH_CAP_FILE_EXCL) {
1982                         ci->i_time_warp_seq++;
1983                         inode->i_mtime = attr->ia_mtime;
1984                         dirtied |= CEPH_CAP_FILE_EXCL;
1985                 } else if ((issued & CEPH_CAP_FILE_WR) &&
1986                            timespec_compare(&inode->i_mtime,
1987                                             &attr->ia_mtime) < 0) {
1988                         inode->i_mtime = attr->ia_mtime;
1989                         dirtied |= CEPH_CAP_FILE_WR;
1990                 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
1991                            !timespec_equal(&inode->i_mtime, &attr->ia_mtime)) {
1992                         ceph_encode_timespec(&req->r_args.setattr.mtime,
1993                                              &attr->ia_mtime);
1994                         mask |= CEPH_SETATTR_MTIME;
1995                         release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
1996                                 CEPH_CAP_FILE_WR;
1997                 }
1998         }
1999         if (ia_valid & ATTR_SIZE) {
2000                 dout("setattr %p size %lld -> %lld\n", inode,
2001                      inode->i_size, attr->ia_size);
2002                 if ((issued & CEPH_CAP_FILE_EXCL) &&
2003                     attr->ia_size > inode->i_size) {
2004                         i_size_write(inode, attr->ia_size);
2005                         inode->i_blocks =
2006                                 (attr->ia_size + (1 << 9) - 1) >> 9;
2007                         inode->i_ctime = attr->ia_ctime;
2008                         ci->i_reported_size = attr->ia_size;
2009                         dirtied |= CEPH_CAP_FILE_EXCL;
2010                 } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
2011                            attr->ia_size != inode->i_size) {
2012                         req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
2013                         req->r_args.setattr.old_size =
2014                                 cpu_to_le64(inode->i_size);
2015                         mask |= CEPH_SETATTR_SIZE;
2016                         release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
2017                                 CEPH_CAP_FILE_WR;
2018                 }
2019         }
2020
2021         /* these do nothing */
2022         if (ia_valid & ATTR_CTIME) {
2023                 bool only = (ia_valid & (ATTR_SIZE|ATTR_MTIME|ATTR_ATIME|
2024                                          ATTR_MODE|ATTR_UID|ATTR_GID)) == 0;
2025                 dout("setattr %p ctime %ld.%ld -> %ld.%ld (%s)\n", inode,
2026                      inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
2027                      attr->ia_ctime.tv_sec, attr->ia_ctime.tv_nsec,
2028                      only ? "ctime only" : "ignored");
2029                 inode->i_ctime = attr->ia_ctime;
2030                 if (only) {
2031                         /*
2032                          * if kernel wants to dirty ctime but nothing else,
2033                          * we need to choose a cap to dirty under, or do
2034                          * a almost-no-op setattr
2035                          */
2036                         if (issued & CEPH_CAP_AUTH_EXCL)
2037                                 dirtied |= CEPH_CAP_AUTH_EXCL;
2038                         else if (issued & CEPH_CAP_FILE_EXCL)
2039                                 dirtied |= CEPH_CAP_FILE_EXCL;
2040                         else if (issued & CEPH_CAP_XATTR_EXCL)
2041                                 dirtied |= CEPH_CAP_XATTR_EXCL;
2042                         else
2043                                 mask |= CEPH_SETATTR_CTIME;
2044                 }
2045         }
2046         if (ia_valid & ATTR_FILE)
2047                 dout("setattr %p ATTR_FILE ... hrm!\n", inode);
2048
2049         if (dirtied) {
2050                 inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
2051                                                            &prealloc_cf);
2052                 inode->i_ctime = current_fs_time(inode->i_sb);
2053         }
2054
2055         release &= issued;
2056         spin_unlock(&ci->i_ceph_lock);
2057         if (lock_snap_rwsem)
2058                 up_read(&mdsc->snap_rwsem);
2059
2060         if (inode_dirty_flags)
2061                 __mark_inode_dirty(inode, inode_dirty_flags);
2062
2063         if (ia_valid & ATTR_MODE) {
2064                 err = posix_acl_chmod(inode, attr->ia_mode);
2065                 if (err)
2066                         goto out_put;
2067         }
2068
2069         if (mask) {
2070                 req->r_inode = inode;
2071                 ihold(inode);
2072                 req->r_inode_drop = release;
2073                 req->r_args.setattr.mask = cpu_to_le32(mask);
2074                 req->r_num_caps = 1;
2075                 err = ceph_mdsc_do_request(mdsc, NULL, req);
2076         }
2077         dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
2078              ceph_cap_string(dirtied), mask);
2079
2080         ceph_mdsc_put_request(req);
2081         if (mask & CEPH_SETATTR_SIZE)
2082                 __ceph_do_pending_vmtruncate(inode);
2083         ceph_free_cap_flush(prealloc_cf);
2084         return err;
2085 out_put:
2086         ceph_mdsc_put_request(req);
2087         ceph_free_cap_flush(prealloc_cf);
2088         return err;
2089 }
2090
2091 /*
2092  * Verify that we have a lease on the given mask.  If not,
2093  * do a getattr against an mds.
2094  */
2095 int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
2096                       int mask, bool force)
2097 {
2098         struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
2099         struct ceph_mds_client *mdsc = fsc->mdsc;
2100         struct ceph_mds_request *req;
2101         int err;
2102
2103         if (ceph_snap(inode) == CEPH_SNAPDIR) {
2104                 dout("do_getattr inode %p SNAPDIR\n", inode);
2105                 return 0;
2106         }
2107
2108         dout("do_getattr inode %p mask %s mode 0%o\n",
2109              inode, ceph_cap_string(mask), inode->i_mode);
2110         if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
2111                 return 0;
2112
2113         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
2114         if (IS_ERR(req))
2115                 return PTR_ERR(req);
2116         req->r_inode = inode;
2117         ihold(inode);
2118         req->r_num_caps = 1;
2119         req->r_args.getattr.mask = cpu_to_le32(mask);
2120         req->r_locked_page = locked_page;
2121         err = ceph_mdsc_do_request(mdsc, NULL, req);
2122         if (locked_page && err == 0) {
2123                 u64 inline_version = req->r_reply_info.targeti.inline_version;
2124                 if (inline_version == 0) {
2125                         /* the reply is supposed to contain inline data */
2126                         err = -EINVAL;
2127                 } else if (inline_version == CEPH_INLINE_NONE) {
2128                         err = -ENODATA;
2129                 } else {
2130                         err = req->r_reply_info.targeti.inline_len;
2131                 }
2132         }
2133         ceph_mdsc_put_request(req);
2134         dout("do_getattr result=%d\n", err);
2135         return err;
2136 }
2137
2138
2139 /*
2140  * Check inode permissions.  We verify we have a valid value for
2141  * the AUTH cap, then call the generic handler.
2142  */
2143 int ceph_permission(struct inode *inode, int mask)
2144 {
2145         int err;
2146
2147         if (mask & MAY_NOT_BLOCK)
2148                 return -ECHILD;
2149
2150         err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
2151
2152         if (!err)
2153                 err = generic_permission(inode, mask);
2154         return err;
2155 }
2156
2157 /*
2158  * Get all attributes.  Hopefully somedata we'll have a statlite()
2159  * and can limit the fields we require to be accurate.
2160  */
2161 int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
2162                  struct kstat *stat)
2163 {
2164         struct inode *inode = d_inode(dentry);
2165         struct ceph_inode_info *ci = ceph_inode(inode);
2166         int err;
2167
2168         err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
2169         if (!err) {
2170                 generic_fillattr(inode, stat);
2171                 stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
2172                 if (ceph_snap(inode) != CEPH_NOSNAP)
2173                         stat->dev = ceph_snap(inode);
2174                 else
2175                         stat->dev = 0;
2176                 if (S_ISDIR(inode->i_mode)) {
2177                         if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
2178                                                 RBYTES))
2179                                 stat->size = ci->i_rbytes;
2180                         else
2181                                 stat->size = ci->i_files + ci->i_subdirs;
2182                         stat->blocks = 0;
2183                         stat->blksize = 65536;
2184                 }
2185         }
2186         return err;
2187 }