Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs
[cascardo/linux.git] / drivers / staging / lustre / lustre / llite / statahead.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #include <linux/fs.h>
34 #include <linux/sched.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/pagemap.h>
38
39 #define DEBUG_SUBSYSTEM S_LLITE
40
41 #include "../include/obd_support.h"
42 #include "../include/lustre_dlm.h"
43 #include "llite_internal.h"
44
45 #define SA_OMITTED_ENTRY_MAX 8ULL
46
47 enum se_stat {
48         /** negative values are for error cases */
49         SA_ENTRY_INIT = 0,      /** init entry */
50         SA_ENTRY_SUCC = 1,      /** stat succeed */
51         SA_ENTRY_INVA = 2,      /** invalid entry */
52 };
53
54 /*
55  * sa_entry is not refcounted: statahead thread allocates it and do async stat,
56  * and in async stat callback ll_statahead_interpret() will add it into
57  * sai_interim_entries, later statahead thread will call sa_handle_callback() to
58  * instantiate entry and move it into sai_entries, and then only scanner process
59  * can access and free it.
60  */
61 struct sa_entry {
62         /* link into sai_interim_entries or sai_entries */
63         struct list_head              se_list;
64         /* link into sai hash table locally */
65         struct list_head              se_hash;
66         /* entry index in the sai */
67         __u64              se_index;
68         /* low layer ldlm lock handle */
69         __u64              se_handle;
70         /* entry status */
71         enum se_stat            se_state;
72         /* entry size, contains name */
73         int                  se_size;
74         /* pointer to async getattr enqueue info */
75         struct md_enqueue_info *se_minfo;
76         /* pointer to the async getattr request */
77         struct ptlrpc_request  *se_req;
78         /* pointer to the target inode */
79         struct inode       *se_inode;
80         /* entry name */
81         struct qstr          se_qstr;
82 };
83
84 static unsigned int sai_generation;
85 static DEFINE_SPINLOCK(sai_generation_lock);
86
87 /* sa_entry is ready to use */
88 static inline int sa_ready(struct sa_entry *entry)
89 {
90         smp_rmb();
91         return (entry->se_state != SA_ENTRY_INIT);
92 }
93
94 /* hash value to put in sai_cache */
95 static inline int sa_hash(int val)
96 {
97         return val & LL_SA_CACHE_MASK;
98 }
99
100 /* hash entry into sai_cache */
101 static inline void
102 sa_rehash(struct ll_statahead_info *sai, struct sa_entry *entry)
103 {
104         int i = sa_hash(entry->se_qstr.hash);
105
106         spin_lock(&sai->sai_cache_lock[i]);
107         list_add_tail(&entry->se_hash, &sai->sai_cache[i]);
108         spin_unlock(&sai->sai_cache_lock[i]);
109 }
110
111 /*
112  * Remove entry from SA table.
113  */
114 static inline void
115 sa_unhash(struct ll_statahead_info *sai, struct sa_entry *entry)
116 {
117         int i = sa_hash(entry->se_qstr.hash);
118
119         spin_lock(&sai->sai_cache_lock[i]);
120         list_del_init(&entry->se_hash);
121         spin_unlock(&sai->sai_cache_lock[i]);
122 }
123
124 static inline int agl_should_run(struct ll_statahead_info *sai,
125                                  struct inode *inode)
126 {
127         return (inode && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
128 }
129
130 /* statahead window is full */
131 static inline int sa_sent_full(struct ll_statahead_info *sai)
132 {
133         return atomic_read(&sai->sai_cache_count) >= sai->sai_max;
134 }
135
136 /* got async stat replies */
137 static inline int sa_has_callback(struct ll_statahead_info *sai)
138 {
139         return !list_empty(&sai->sai_interim_entries);
140 }
141
142 static inline int agl_list_empty(struct ll_statahead_info *sai)
143 {
144         return list_empty(&sai->sai_agls);
145 }
146
147 /**
148  * (1) hit ratio less than 80%
149  * or
150  * (2) consecutive miss more than 8
151  * then means low hit.
152  */
153 static inline int sa_low_hit(struct ll_statahead_info *sai)
154 {
155         return ((sai->sai_hit > 7 && sai->sai_hit < 4 * sai->sai_miss) ||
156                 (sai->sai_consecutive_miss > 8));
157 }
158
159 /*
160  * if the given index is behind of statahead window more than
161  * SA_OMITTED_ENTRY_MAX, then it is old.
162  */
163 static inline int is_omitted_entry(struct ll_statahead_info *sai, __u64 index)
164 {
165         return ((__u64)sai->sai_max + index + SA_OMITTED_ENTRY_MAX <
166                  sai->sai_index);
167 }
168
169 /* allocate sa_entry and hash it to allow scanner process to find it */
170 static struct sa_entry *
171 sa_alloc(struct dentry *parent, struct ll_statahead_info *sai, __u64 index,
172          const char *name, int len)
173 {
174         struct ll_inode_info *lli;
175         struct sa_entry   *entry;
176         int                entry_size;
177         char             *dname;
178
179         entry_size = sizeof(struct sa_entry) + (len & ~3) + 4;
180         entry = kzalloc(entry_size, GFP_NOFS);
181         if (unlikely(!entry))
182                 return ERR_PTR(-ENOMEM);
183
184         CDEBUG(D_READA, "alloc sa entry %.*s(%p) index %llu\n",
185                len, name, entry, index);
186
187         entry->se_index = index;
188         entry->se_state = SA_ENTRY_INIT;
189         entry->se_size = entry_size;
190         dname = (char *)entry + sizeof(struct sa_entry);
191         memcpy(dname, name, len);
192         dname[len] = 0;
193
194         entry->se_qstr.hash = full_name_hash(parent, name, len);
195         entry->se_qstr.len = len;
196         entry->se_qstr.name = dname;
197
198         lli = ll_i2info(sai->sai_dentry->d_inode);
199         spin_lock(&lli->lli_sa_lock);
200         INIT_LIST_HEAD(&entry->se_list);
201         sa_rehash(sai, entry);
202         spin_unlock(&lli->lli_sa_lock);
203
204         atomic_inc(&sai->sai_cache_count);
205
206         return entry;
207 }
208
209 /* free sa_entry, which should have been unhashed and not in any list */
210 static void sa_free(struct ll_statahead_info *sai, struct sa_entry *entry)
211 {
212         CDEBUG(D_READA, "free sa entry %.*s(%p) index %llu\n",
213                entry->se_qstr.len, entry->se_qstr.name, entry,
214                entry->se_index);
215
216         LASSERT(list_empty(&entry->se_list));
217         LASSERT(list_empty(&entry->se_hash));
218
219         kfree(entry);
220         atomic_dec(&sai->sai_cache_count);
221 }
222
223 /*
224  * find sa_entry by name, used by directory scanner, lock is not needed because
225  * only scanner can remove the entry from cache.
226  */
227 static struct sa_entry *
228 sa_get(struct ll_statahead_info *sai, const struct qstr *qstr)
229 {
230         struct sa_entry *entry;
231         int i = sa_hash(qstr->hash);
232
233         list_for_each_entry(entry, &sai->sai_cache[i], se_hash) {
234                 if (entry->se_qstr.hash == qstr->hash &&
235                     entry->se_qstr.len == qstr->len &&
236                     memcmp(entry->se_qstr.name, qstr->name, qstr->len) == 0)
237                         return entry;
238         }
239         return NULL;
240 }
241
242 /* unhash and unlink sa_entry, and then free it */
243 static inline void
244 sa_kill(struct ll_statahead_info *sai, struct sa_entry *entry)
245 {
246         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
247
248         LASSERT(!list_empty(&entry->se_hash));
249         LASSERT(!list_empty(&entry->se_list));
250         LASSERT(sa_ready(entry));
251
252         sa_unhash(sai, entry);
253
254         spin_lock(&lli->lli_sa_lock);
255         list_del_init(&entry->se_list);
256         spin_unlock(&lli->lli_sa_lock);
257
258         if (entry->se_inode)
259                 iput(entry->se_inode);
260
261         sa_free(sai, entry);
262 }
263
264 /* called by scanner after use, sa_entry will be killed */
265 static void
266 sa_put(struct ll_statahead_info *sai, struct sa_entry *entry)
267 {
268         struct sa_entry *tmp, *next;
269
270         if (entry && entry->se_state == SA_ENTRY_SUCC) {
271                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
272
273                 sai->sai_hit++;
274                 sai->sai_consecutive_miss = 0;
275                 sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max);
276         } else {
277                 sai->sai_miss++;
278                 sai->sai_consecutive_miss++;
279         }
280
281         if (entry)
282                 sa_kill(sai, entry);
283
284         /*
285          * kill old completed entries, only scanner process does this, no need
286          * to lock
287          */
288         list_for_each_entry_safe(tmp, next, &sai->sai_entries, se_list) {
289                 if (!is_omitted_entry(sai, tmp->se_index))
290                         break;
291                 sa_kill(sai, tmp);
292         }
293
294         wake_up(&sai->sai_thread.t_ctl_waitq);
295 }
296
297 /*
298  * update state and sort add entry to sai_entries by index, return true if
299  * scanner is waiting on this entry.
300  */
301 static bool
302 __sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
303 {
304         struct list_head *pos = &sai->sai_entries;
305         __u64 index = entry->se_index;
306         struct sa_entry *se;
307
308         LASSERT(!sa_ready(entry));
309         LASSERT(list_empty(&entry->se_list));
310
311         list_for_each_entry_reverse(se, &sai->sai_entries, se_list) {
312                 if (se->se_index < entry->se_index) {
313                         pos = &se->se_list;
314                         break;
315                 }
316         }
317         list_add(&entry->se_list, pos);
318         entry->se_state = ret < 0 ? SA_ENTRY_INVA : SA_ENTRY_SUCC;
319
320         return (index == sai->sai_index_wait);
321 }
322
323 /*
324  * release resources used in async stat RPC, update entry state and wakeup if
325  * scanner process it waiting on this entry.
326  */
327 static void
328 sa_make_ready(struct ll_statahead_info *sai, struct sa_entry *entry, int ret)
329 {
330         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
331         struct md_enqueue_info *minfo = entry->se_minfo;
332         struct ptlrpc_request *req = entry->se_req;
333         bool wakeup;
334
335         /* release resources used in RPC */
336         if (minfo) {
337                 entry->se_minfo = NULL;
338                 ll_intent_release(&minfo->mi_it);
339                 iput(minfo->mi_dir);
340                 kfree(minfo);
341         }
342
343         if (req) {
344                 entry->se_req = NULL;
345                 ptlrpc_req_finished(req);
346         }
347
348         spin_lock(&lli->lli_sa_lock);
349         wakeup = __sa_make_ready(sai, entry, ret);
350         spin_unlock(&lli->lli_sa_lock);
351
352         if (wakeup)
353                 wake_up(&sai->sai_waitq);
354 }
355
356 /* Insert inode into the list of sai_agls. */
357 static void ll_agl_add(struct ll_statahead_info *sai,
358                        struct inode *inode, int index)
359 {
360         struct ll_inode_info *child  = ll_i2info(inode);
361         struct ll_inode_info *parent = ll_i2info(sai->sai_dentry->d_inode);
362         int                added  = 0;
363
364         spin_lock(&child->lli_agl_lock);
365         if (child->lli_agl_index == 0) {
366                 child->lli_agl_index = index;
367                 spin_unlock(&child->lli_agl_lock);
368
369                 LASSERT(list_empty(&child->lli_agl_list));
370
371                 igrab(inode);
372                 spin_lock(&parent->lli_agl_lock);
373                 if (list_empty(&sai->sai_agls))
374                         added = 1;
375                 list_add_tail(&child->lli_agl_list, &sai->sai_agls);
376                 spin_unlock(&parent->lli_agl_lock);
377         } else {
378                 spin_unlock(&child->lli_agl_lock);
379         }
380
381         if (added > 0)
382                 wake_up(&sai->sai_agl_thread.t_ctl_waitq);
383 }
384
385 /* allocate sai */
386 static struct ll_statahead_info *ll_sai_alloc(struct dentry *dentry)
387 {
388         struct ll_inode_info *lli = ll_i2info(dentry->d_inode);
389         struct ll_statahead_info *sai;
390         int                    i;
391
392         sai = kzalloc(sizeof(*sai), GFP_NOFS);
393         if (!sai)
394                 return NULL;
395
396         sai->sai_dentry = dget(dentry);
397         atomic_set(&sai->sai_refcount, 1);
398
399         sai->sai_max = LL_SA_RPC_MIN;
400         sai->sai_index = 1;
401         init_waitqueue_head(&sai->sai_waitq);
402         init_waitqueue_head(&sai->sai_thread.t_ctl_waitq);
403         init_waitqueue_head(&sai->sai_agl_thread.t_ctl_waitq);
404
405         INIT_LIST_HEAD(&sai->sai_interim_entries);
406         INIT_LIST_HEAD(&sai->sai_entries);
407         INIT_LIST_HEAD(&sai->sai_agls);
408
409         for (i = 0; i < LL_SA_CACHE_SIZE; i++) {
410                 INIT_LIST_HEAD(&sai->sai_cache[i]);
411                 spin_lock_init(&sai->sai_cache_lock[i]);
412         }
413         atomic_set(&sai->sai_cache_count, 0);
414
415         spin_lock(&sai_generation_lock);
416         lli->lli_sa_generation = ++sai_generation;
417         if (unlikely(!sai_generation))
418                 lli->lli_sa_generation = ++sai_generation;
419         spin_unlock(&sai_generation_lock);
420
421         return sai;
422 }
423
424 /* free sai */
425 static inline void ll_sai_free(struct ll_statahead_info *sai)
426 {
427         LASSERT(sai->sai_dentry);
428         dput(sai->sai_dentry);
429         kfree(sai);
430 }
431
432 /*
433  * take refcount of sai if sai for @dir exists, which means statahead is on for
434  * this directory.
435  */
436 static inline struct ll_statahead_info *ll_sai_get(struct inode *dir)
437 {
438         struct ll_inode_info *lli = ll_i2info(dir);
439         struct ll_statahead_info *sai = NULL;
440
441         spin_lock(&lli->lli_sa_lock);
442         sai = lli->lli_sai;
443         if (sai)
444                 atomic_inc(&sai->sai_refcount);
445         spin_unlock(&lli->lli_sa_lock);
446
447         return sai;
448 }
449
450 /*
451  * put sai refcount after use, if refcount reaches zero, free sai and sa_entries
452  * attached to it.
453  */
454 static void ll_sai_put(struct ll_statahead_info *sai)
455 {
456         struct ll_inode_info *lli = ll_i2info(sai->sai_dentry->d_inode);
457
458         if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_sa_lock)) {
459                 struct ll_sb_info *sbi = ll_i2sbi(sai->sai_dentry->d_inode);
460                 struct sa_entry *entry, *next;
461
462                 lli->lli_sai = NULL;
463                 spin_unlock(&lli->lli_sa_lock);
464
465                 LASSERT(thread_is_stopped(&sai->sai_thread));
466                 LASSERT(thread_is_stopped(&sai->sai_agl_thread));
467                 LASSERT(sai->sai_sent == sai->sai_replied);
468                 LASSERT(!sa_has_callback(sai));
469
470                 list_for_each_entry_safe(entry, next, &sai->sai_entries,
471                                          se_list)
472                         sa_kill(sai, entry);
473
474                 LASSERT(atomic_read(&sai->sai_cache_count) == 0);
475                 LASSERT(list_empty(&sai->sai_agls));
476
477                 ll_sai_free(sai);
478                 atomic_dec(&sbi->ll_sa_running);
479         }
480 }
481
482 /* Do NOT forget to drop inode refcount when into sai_agls. */
483 static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai)
484 {
485         struct ll_inode_info *lli   = ll_i2info(inode);
486         __u64            index = lli->lli_agl_index;
487         int                rc;
488
489         LASSERT(list_empty(&lli->lli_agl_list));
490
491         /* AGL maybe fall behind statahead with one entry */
492         if (is_omitted_entry(sai, index + 1)) {
493                 lli->lli_agl_index = 0;
494                 iput(inode);
495                 return;
496         }
497
498         /* Someone is in glimpse (sync or async), do nothing. */
499         rc = down_write_trylock(&lli->lli_glimpse_sem);
500         if (rc == 0) {
501                 lli->lli_agl_index = 0;
502                 iput(inode);
503                 return;
504         }
505
506         /*
507          * Someone triggered glimpse within 1 sec before.
508          * 1) The former glimpse succeeded with glimpse lock granted by OST, and
509          *    if the lock is still cached on client, AGL needs to do nothing. If
510          *    it is cancelled by other client, AGL maybe cannot obtain new lock
511          *    for no glimpse callback triggered by AGL.
512          * 2) The former glimpse succeeded, but OST did not grant glimpse lock.
513          *    Under such case, it is quite possible that the OST will not grant
514          *    glimpse lock for AGL also.
515          * 3) The former glimpse failed, compared with other two cases, it is
516          *    relative rare. AGL can ignore such case, and it will not muchly
517          *    affect the performance.
518          */
519         if (lli->lli_glimpse_time != 0 &&
520             time_before(cfs_time_shift(-1), lli->lli_glimpse_time)) {
521                 up_write(&lli->lli_glimpse_sem);
522                 lli->lli_agl_index = 0;
523                 iput(inode);
524                 return;
525         }
526
527         CDEBUG(D_READA, "Handling (init) async glimpse: inode = "
528                DFID", idx = %llu\n", PFID(&lli->lli_fid), index);
529
530         cl_agl(inode);
531         lli->lli_agl_index = 0;
532         lli->lli_glimpse_time = cfs_time_current();
533         up_write(&lli->lli_glimpse_sem);
534
535         CDEBUG(D_READA, "Handled (init) async glimpse: inode= "
536                DFID", idx = %llu, rc = %d\n",
537                PFID(&lli->lli_fid), index, rc);
538
539         iput(inode);
540 }
541
542 /*
543  * prepare inode for sa entry, add it into agl list, now sa_entry is ready
544  * to be used by scanner process.
545  */
546 static void sa_instantiate(struct ll_statahead_info *sai,
547                            struct sa_entry *entry)
548 {
549         struct inode *dir = sai->sai_dentry->d_inode;
550         struct inode       *child;
551         struct md_enqueue_info *minfo;
552         struct lookup_intent   *it;
553         struct ptlrpc_request  *req;
554         struct mdt_body *body;
555         int                  rc    = 0;
556
557         LASSERT(entry->se_handle != 0);
558
559         minfo = entry->se_minfo;
560         it = &minfo->mi_it;
561         req = entry->se_req;
562         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
563         if (!body) {
564                 rc = -EFAULT;
565                 goto out;
566         }
567
568         child = entry->se_inode;
569         if (!child) {
570                 /*
571                  * lookup.
572                  */
573                 LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
574
575                 /* XXX: No fid in reply, this is probably cross-ref case.
576                  * SA can't handle it yet.
577                  */
578                 if (body->mbo_valid & OBD_MD_MDS) {
579                         rc = -EAGAIN;
580                         goto out;
581                 }
582         } else {
583                 /*
584                  * revalidate.
585                  */
586                 /* unlinked and re-created with the same name */
587                 if (unlikely(!lu_fid_eq(&minfo->mi_data.op_fid2, &body->mbo_fid1))) {
588                         entry->se_inode = NULL;
589                         iput(child);
590                         child = NULL;
591                 }
592         }
593
594         it->it_lock_handle = entry->se_handle;
595         rc = md_revalidate_lock(ll_i2mdexp(dir), it, ll_inode2fid(dir), NULL);
596         if (rc != 1) {
597                 rc = -EAGAIN;
598                 goto out;
599         }
600
601         rc = ll_prep_inode(&child, req, dir->i_sb, it);
602         if (rc)
603                 goto out;
604
605         CDEBUG(D_READA, "%s: setting %.*s" DFID " l_data to inode %p\n",
606                ll_get_fsname(child->i_sb, NULL, 0),
607                entry->se_qstr.len, entry->se_qstr.name,
608                PFID(ll_inode2fid(child)), child);
609         ll_set_lock_data(ll_i2sbi(dir)->ll_md_exp, child, it, NULL);
610
611         entry->se_inode = child;
612
613         if (agl_should_run(sai, child))
614                 ll_agl_add(sai, child, entry->se_index);
615
616 out:
617         /*
618          * sa_make_ready() will drop ldlm ibits lock refcount by calling
619          * ll_intent_drop_lock() in spite of failures. Do not worry about
620          * calling ll_intent_drop_lock() more than once.
621          */
622         sa_make_ready(sai, entry, rc);
623 }
624
625 /* once there are async stat replies, instantiate sa_entry from replies */
626 static void sa_handle_callback(struct ll_statahead_info *sai)
627 {
628         struct ll_inode_info *lli;
629
630         lli = ll_i2info(sai->sai_dentry->d_inode);
631
632         while (sa_has_callback(sai)) {
633                 struct sa_entry *entry;
634
635                 spin_lock(&lli->lli_sa_lock);
636                 if (unlikely(!sa_has_callback(sai))) {
637                         spin_unlock(&lli->lli_sa_lock);
638                         break;
639                 }
640                 entry = list_entry(sai->sai_interim_entries.next,
641                                    struct sa_entry, se_list);
642                 list_del_init(&entry->se_list);
643                 spin_unlock(&lli->lli_sa_lock);
644
645                 sa_instantiate(sai, entry);
646         }
647 }
648
649 /*
650  * callback for async stat, because this is called in ptlrpcd context, we only
651  * put sa_entry in sai_cb_entries list, and let sa_handle_callback() to really
652  * prepare inode and instantiate sa_entry later.
653  */
654 static int ll_statahead_interpret(struct ptlrpc_request *req,
655                                   struct md_enqueue_info *minfo, int rc)
656 {
657         struct lookup_intent     *it  = &minfo->mi_it;
658         struct inode         *dir = minfo->mi_dir;
659         struct ll_inode_info     *lli = ll_i2info(dir);
660         struct ll_statahead_info *sai = lli->lli_sai;
661         struct sa_entry *entry = (struct sa_entry *)minfo->mi_cbdata;
662         __u64 handle = 0;
663         bool wakeup;
664
665         if (it_disposition(it, DISP_LOOKUP_NEG))
666                 rc = -ENOENT;
667
668         /*
669          * because statahead thread will wait for all inflight RPC to finish,
670          * sai should be always valid, no need to refcount
671          */
672         LASSERT(sai);
673         LASSERT(!thread_is_stopped(&sai->sai_thread));
674         LASSERT(entry);
675
676         CDEBUG(D_READA, "sa_entry %.*s rc %d\n",
677                entry->se_qstr.len, entry->se_qstr.name, rc);
678
679         if (rc) {
680                 ll_intent_release(it);
681                 iput(dir);
682                 kfree(minfo);
683         } else {
684                 /*
685                  * release ibits lock ASAP to avoid deadlock when statahead
686                  * thread enqueues lock on parent in readdir and another
687                  * process enqueues lock on child with parent lock held, eg.
688                  * unlink.
689                  */
690                 handle = it->it_lock_handle;
691                 ll_intent_drop_lock(it);
692         }
693
694         spin_lock(&lli->lli_sa_lock);
695         if (rc) {
696                 wakeup = __sa_make_ready(sai, entry, rc);
697         } else {
698                 entry->se_minfo = minfo;
699                 entry->se_req = ptlrpc_request_addref(req);
700                 /*
701                  * Release the async ibits lock ASAP to avoid deadlock
702                  * when statahead thread tries to enqueue lock on parent
703                  * for readpage and other tries to enqueue lock on child
704                  * with parent's lock held, for example: unlink.
705                  */
706                 entry->se_handle = handle;
707                 wakeup = !sa_has_callback(sai);
708                 list_add_tail(&entry->se_list, &sai->sai_interim_entries);
709         }
710         sai->sai_replied++;
711
712         if (wakeup)
713                 wake_up(&sai->sai_thread.t_ctl_waitq);
714         spin_unlock(&lli->lli_sa_lock);
715
716         return rc;
717 }
718
719 /* finish async stat RPC arguments */
720 static void sa_fini_data(struct md_enqueue_info *minfo,
721                          struct ldlm_enqueue_info *einfo)
722 {
723         LASSERT(minfo && einfo);
724         iput(minfo->mi_dir);
725         kfree(minfo);
726         kfree(einfo);
727 }
728
729 /**
730  * prepare arguments for async stat RPC.
731  */
732 static int sa_prep_data(struct inode *dir, struct inode *child,
733                         struct sa_entry *entry, struct md_enqueue_info **pmi,
734                         struct ldlm_enqueue_info **pei)
735 {
736         const struct qstr      *qstr = &entry->se_qstr;
737         struct md_enqueue_info   *minfo;
738         struct ldlm_enqueue_info *einfo;
739         struct md_op_data       *op_data;
740
741         einfo = kzalloc(sizeof(*einfo), GFP_NOFS);
742         if (!einfo)
743                 return -ENOMEM;
744
745         minfo = kzalloc(sizeof(*minfo), GFP_NOFS);
746         if (!minfo) {
747                 kfree(einfo);
748                 return -ENOMEM;
749         }
750
751         op_data = ll_prep_md_op_data(&minfo->mi_data, dir, child, qstr->name,
752                                      qstr->len, 0, LUSTRE_OPC_ANY, NULL);
753         if (IS_ERR(op_data)) {
754                 kfree(einfo);
755                 kfree(minfo);
756                 return PTR_ERR(op_data);
757         }
758
759         minfo->mi_it.it_op = IT_GETATTR;
760         minfo->mi_dir = igrab(dir);
761         minfo->mi_cb = ll_statahead_interpret;
762         minfo->mi_cbdata = entry;
763
764         einfo->ei_type   = LDLM_IBITS;
765         einfo->ei_mode   = it_to_lock_mode(&minfo->mi_it);
766         einfo->ei_cb_bl  = ll_md_blocking_ast;
767         einfo->ei_cb_cp  = ldlm_completion_ast;
768         einfo->ei_cb_gl  = NULL;
769         einfo->ei_cbdata = NULL;
770
771         *pmi = minfo;
772         *pei = einfo;
773
774         return 0;
775 }
776
777 /* async stat for file not found in dcache */
778 static int sa_lookup(struct inode *dir, struct sa_entry *entry)
779 {
780         struct md_enqueue_info   *minfo;
781         struct ldlm_enqueue_info *einfo;
782         int                    rc;
783
784         rc = sa_prep_data(dir, NULL, entry, &minfo, &einfo);
785         if (rc)
786                 return rc;
787
788         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
789         if (rc)
790                 sa_fini_data(minfo, einfo);
791
792         return rc;
793 }
794
795 /**
796  * async stat for file found in dcache, similar to .revalidate
797  *
798  * \retval      1 dentry valid, no RPC sent
799  * \retval      0 dentry invalid, will send async stat RPC
800  * \retval      negative number upon error
801  */
802 static int sa_revalidate(struct inode *dir, struct sa_entry *entry,
803                          struct dentry *dentry)
804 {
805         struct inode         *inode = d_inode(dentry);
806         struct lookup_intent      it = { .it_op = IT_GETATTR,
807                                          .it_lock_handle = 0 };
808         struct md_enqueue_info   *minfo;
809         struct ldlm_enqueue_info *einfo;
810         int rc;
811
812         if (unlikely(!inode))
813                 return 1;
814
815         if (d_mountpoint(dentry))
816                 return 1;
817
818         entry->se_inode = igrab(inode);
819         rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode),
820                                 NULL);
821         if (rc == 1) {
822                 entry->se_handle = it.it_lock_handle;
823                 ll_intent_release(&it);
824                 return 1;
825         }
826
827         rc = sa_prep_data(dir, inode, entry, &minfo, &einfo);
828         if (rc) {
829                 entry->se_inode = NULL;
830                 iput(inode);
831                 return rc;
832         }
833
834         rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo);
835         if (rc) {
836                 entry->se_inode = NULL;
837                 iput(inode);
838                 sa_fini_data(minfo, einfo);
839         }
840
841         return rc;
842 }
843
844 /* async stat for file with @name */
845 static void sa_statahead(struct dentry *parent, const char *name, int len)
846 {
847         struct inode         *dir    = d_inode(parent);
848         struct ll_inode_info     *lli    = ll_i2info(dir);
849         struct ll_statahead_info *sai    = lli->lli_sai;
850         struct dentry       *dentry = NULL;
851         struct sa_entry *entry;
852         int                    rc;
853
854         entry = sa_alloc(parent, sai, sai->sai_index, name, len);
855         if (IS_ERR(entry))
856                 return;
857
858         dentry = d_lookup(parent, &entry->se_qstr);
859         if (!dentry) {
860                 rc = sa_lookup(dir, entry);
861         } else {
862                 rc = sa_revalidate(dir, entry, dentry);
863                 if (rc == 1 && agl_should_run(sai, d_inode(dentry)))
864                         ll_agl_add(sai, d_inode(dentry), entry->se_index);
865         }
866
867         if (dentry)
868                 dput(dentry);
869
870         if (rc)
871                 sa_make_ready(sai, entry, rc);
872         else
873                 sai->sai_sent++;
874
875         sai->sai_index++;
876 }
877
878 /* async glimpse (agl) thread main function */
879 static int ll_agl_thread(void *arg)
880 {
881         struct dentry       *parent = arg;
882         struct inode         *dir    = d_inode(parent);
883         struct ll_inode_info     *plli   = ll_i2info(dir);
884         struct ll_inode_info     *clli;
885         struct ll_sb_info       *sbi    = ll_i2sbi(dir);
886         struct ll_statahead_info *sai;
887         struct ptlrpc_thread *thread;
888         struct l_wait_info      lwi    = { 0 };
889
890         sai = ll_sai_get(dir);
891         thread = &sai->sai_agl_thread;
892         thread->t_pid = current_pid();
893         CDEBUG(D_READA, "agl thread started: sai %p, parent %pd\n",
894                sai, parent);
895
896         atomic_inc(&sbi->ll_agl_total);
897         spin_lock(&plli->lli_agl_lock);
898         sai->sai_agl_valid = 1;
899         if (thread_is_init(thread))
900                 /* If someone else has changed the thread state
901                  * (e.g. already changed to SVC_STOPPING), we can't just
902                  * blindly overwrite that setting.
903                  */
904                 thread_set_flags(thread, SVC_RUNNING);
905         spin_unlock(&plli->lli_agl_lock);
906         wake_up(&thread->t_ctl_waitq);
907
908         while (1) {
909                 l_wait_event(thread->t_ctl_waitq,
910                              !list_empty(&sai->sai_agls) ||
911                              !thread_is_running(thread),
912                              &lwi);
913
914                 if (!thread_is_running(thread))
915                         break;
916
917                 spin_lock(&plli->lli_agl_lock);
918                 /* The statahead thread maybe help to process AGL entries,
919                  * so check whether list empty again.
920                  */
921                 if (!list_empty(&sai->sai_agls)) {
922                         clli = list_entry(sai->sai_agls.next,
923                                           struct ll_inode_info, lli_agl_list);
924                         list_del_init(&clli->lli_agl_list);
925                         spin_unlock(&plli->lli_agl_lock);
926                         ll_agl_trigger(&clli->lli_vfs_inode, sai);
927                 } else {
928                         spin_unlock(&plli->lli_agl_lock);
929                 }
930         }
931
932         spin_lock(&plli->lli_agl_lock);
933         sai->sai_agl_valid = 0;
934         while (!list_empty(&sai->sai_agls)) {
935                 clli = list_entry(sai->sai_agls.next,
936                                   struct ll_inode_info, lli_agl_list);
937                 list_del_init(&clli->lli_agl_list);
938                 spin_unlock(&plli->lli_agl_lock);
939                 clli->lli_agl_index = 0;
940                 iput(&clli->lli_vfs_inode);
941                 spin_lock(&plli->lli_agl_lock);
942         }
943         thread_set_flags(thread, SVC_STOPPED);
944         spin_unlock(&plli->lli_agl_lock);
945         wake_up(&thread->t_ctl_waitq);
946         ll_sai_put(sai);
947         CDEBUG(D_READA, "agl thread stopped: sai %p, parent %pd\n",
948                sai, parent);
949         return 0;
950 }
951
952 /* start agl thread */
953 static void ll_start_agl(struct dentry *parent, struct ll_statahead_info *sai)
954 {
955         struct ptlrpc_thread *thread = &sai->sai_agl_thread;
956         struct l_wait_info    lwi    = { 0 };
957         struct ll_inode_info  *plli;
958         struct task_struct *task;
959
960         CDEBUG(D_READA, "start agl thread: sai %p, parent %pd\n",
961                sai, parent);
962
963         plli = ll_i2info(d_inode(parent));
964         task = kthread_run(ll_agl_thread, parent, "ll_agl_%u",
965                            plli->lli_opendir_pid);
966         if (IS_ERR(task)) {
967                 CERROR("can't start ll_agl thread, rc: %ld\n", PTR_ERR(task));
968                 thread_set_flags(thread, SVC_STOPPED);
969                 return;
970         }
971
972         l_wait_event(thread->t_ctl_waitq,
973                      thread_is_running(thread) || thread_is_stopped(thread),
974                      &lwi);
975 }
976
977 /* statahead thread main function */
978 static int ll_statahead_thread(void *arg)
979 {
980         struct dentry       *parent = arg;
981         struct inode         *dir    = d_inode(parent);
982         struct ll_inode_info     *lli   = ll_i2info(dir);
983         struct ll_sb_info       *sbi    = ll_i2sbi(dir);
984         struct ll_statahead_info *sai;
985         struct ptlrpc_thread *sa_thread;
986         struct ptlrpc_thread *agl_thread;
987         struct page           *page = NULL;
988         __u64                pos    = 0;
989         int                    first  = 0;
990         int                    rc     = 0;
991         struct md_op_data *op_data;
992         struct l_wait_info      lwi    = { 0 };
993
994         sai = ll_sai_get(dir);
995         sa_thread = &sai->sai_thread;
996         agl_thread = &sai->sai_agl_thread;
997         sa_thread->t_pid = current_pid();
998         CDEBUG(D_READA, "statahead thread starting: sai %p, parent %pd\n",
999                sai, parent);
1000
1001         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1002                                      LUSTRE_OPC_ANY, dir);
1003         if (IS_ERR(op_data)) {
1004                 rc = PTR_ERR(op_data);
1005                 goto out;
1006         }
1007
1008         op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1009
1010         if (sbi->ll_flags & LL_SBI_AGL_ENABLED)
1011                 ll_start_agl(parent, sai);
1012
1013         atomic_inc(&sbi->ll_sa_total);
1014         spin_lock(&lli->lli_sa_lock);
1015         if (thread_is_init(sa_thread))
1016                 /* If someone else has changed the thread state
1017                  * (e.g. already changed to SVC_STOPPING), we can't just
1018                  * blindly overwrite that setting.
1019                  */
1020                 thread_set_flags(sa_thread, SVC_RUNNING);
1021         spin_unlock(&lli->lli_sa_lock);
1022         wake_up(&sa_thread->t_ctl_waitq);
1023
1024         while (pos != MDS_DIR_END_OFF && thread_is_running(sa_thread)) {
1025                 struct lu_dirpage *dp;
1026                 struct lu_dirent  *ent;
1027
1028                 sai->sai_in_readpage = 1;
1029                 page = ll_get_dir_page(dir, op_data, pos);
1030                 sai->sai_in_readpage = 0;
1031                 if (IS_ERR(page)) {
1032                         rc = PTR_ERR(page);
1033                         CDEBUG(D_READA, "error reading dir "DFID" at %llu/%llu: opendir_pid = %u: rc = %d\n",
1034                                PFID(ll_inode2fid(dir)), pos, sai->sai_index,
1035                                lli->lli_opendir_pid, rc);
1036                         break;
1037                 }
1038
1039                 dp = page_address(page);
1040                 for (ent = lu_dirent_start(dp);
1041                      ent && thread_is_running(sa_thread) && !sa_low_hit(sai);
1042                      ent = lu_dirent_next(ent)) {
1043                         __u64 hash;
1044                         int namelen;
1045                         char *name;
1046
1047                         hash = le64_to_cpu(ent->lde_hash);
1048                         if (unlikely(hash < pos))
1049                                 /*
1050                                  * Skip until we find target hash value.
1051                                  */
1052                                 continue;
1053
1054                         namelen = le16_to_cpu(ent->lde_namelen);
1055                         if (unlikely(namelen == 0))
1056                                 /*
1057                                  * Skip dummy record.
1058                                  */
1059                                 continue;
1060
1061                         name = ent->lde_name;
1062                         if (name[0] == '.') {
1063                                 if (namelen == 1) {
1064                                         /*
1065                                          * skip "."
1066                                          */
1067                                         continue;
1068                                 } else if (name[1] == '.' && namelen == 2) {
1069                                         /*
1070                                          * skip ".."
1071                                          */
1072                                         continue;
1073                                 } else if (!sai->sai_ls_all) {
1074                                         /*
1075                                          * skip hidden files.
1076                                          */
1077                                         sai->sai_skip_hidden++;
1078                                         continue;
1079                                 }
1080                         }
1081
1082                         /*
1083                          * don't stat-ahead first entry.
1084                          */
1085                         if (unlikely(++first == 1))
1086                                 continue;
1087
1088                         /* wait for spare statahead window */
1089                         do {
1090                                 l_wait_event(sa_thread->t_ctl_waitq,
1091                                              !sa_sent_full(sai) ||
1092                                              sa_has_callback(sai) ||
1093                                              !list_empty(&sai->sai_agls) ||
1094                                              !thread_is_running(sa_thread),
1095                                              &lwi);
1096                                 sa_handle_callback(sai);
1097
1098                                 spin_lock(&lli->lli_agl_lock);
1099                                 while (sa_sent_full(sai) &&
1100                                        !agl_list_empty(sai)) {
1101                                         struct ll_inode_info *clli;
1102
1103                                         clli = list_entry(sai->sai_agls.next,
1104                                                           struct ll_inode_info, lli_agl_list);
1105                                         list_del_init(&clli->lli_agl_list);
1106                                         spin_unlock(&lli->lli_agl_lock);
1107
1108                                         ll_agl_trigger(&clli->lli_vfs_inode,
1109                                                        sai);
1110
1111                                         spin_lock(&lli->lli_agl_lock);
1112                                 }
1113                                 spin_unlock(&lli->lli_agl_lock);
1114                         } while (sa_sent_full(sai) &&
1115                                  thread_is_running(sa_thread));
1116
1117                         sa_statahead(parent, name, namelen);
1118                 }
1119
1120                 pos = le64_to_cpu(dp->ldp_hash_end);
1121                 ll_release_page(dir, page,
1122                                 le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1123
1124                 if (sa_low_hit(sai)) {
1125                         rc = -EFAULT;
1126                         atomic_inc(&sbi->ll_sa_wrong);
1127                         CDEBUG(D_READA, "Statahead for dir "DFID" hit ratio too low: hit/miss %llu/%llu, sent/replied %llu/%llu, stopping statahead thread: pid %d\n",
1128                                PFID(&lli->lli_fid), sai->sai_hit,
1129                                sai->sai_miss, sai->sai_sent,
1130                                sai->sai_replied, current_pid());
1131                         break;
1132                 }
1133         }
1134         ll_finish_md_op_data(op_data);
1135
1136         if (rc < 0) {
1137                 spin_lock(&lli->lli_sa_lock);
1138                 thread_set_flags(sa_thread, SVC_STOPPING);
1139                 lli->lli_sa_enabled = 0;
1140                 spin_unlock(&lli->lli_sa_lock);
1141         }
1142
1143         /*
1144          * statahead is finished, but statahead entries need to be cached, wait
1145          * for file release to stop me.
1146          */
1147         while (thread_is_running(sa_thread)) {
1148                 l_wait_event(sa_thread->t_ctl_waitq,
1149                              sa_has_callback(sai) ||
1150                              !agl_list_empty(sai) ||
1151                              !thread_is_running(sa_thread),
1152                              &lwi);
1153
1154                 sa_handle_callback(sai);
1155         }
1156 out:
1157         if (sai->sai_agl_valid) {
1158                 spin_lock(&lli->lli_agl_lock);
1159                 thread_set_flags(agl_thread, SVC_STOPPING);
1160                 spin_unlock(&lli->lli_agl_lock);
1161                 wake_up(&agl_thread->t_ctl_waitq);
1162
1163                 CDEBUG(D_READA, "stop agl thread: sai %p pid %u\n",
1164                        sai, (unsigned int)agl_thread->t_pid);
1165                 l_wait_event(agl_thread->t_ctl_waitq,
1166                              thread_is_stopped(agl_thread),
1167                              &lwi);
1168         } else {
1169                 /* Set agl_thread flags anyway. */
1170                 thread_set_flags(agl_thread, SVC_STOPPED);
1171         }
1172
1173         /*
1174          * wait for inflight statahead RPCs to finish, and then we can free sai
1175          * safely because statahead RPC will access sai data
1176          */
1177         while (sai->sai_sent != sai->sai_replied) {
1178                 /* in case we're not woken up, timeout wait */
1179                 lwi = LWI_TIMEOUT(msecs_to_jiffies(MSEC_PER_SEC >> 3),
1180                                   NULL, NULL);
1181                 l_wait_event(sa_thread->t_ctl_waitq,
1182                              sai->sai_sent == sai->sai_replied, &lwi);
1183         }
1184
1185         /* release resources held by statahead RPCs */
1186         sa_handle_callback(sai);
1187
1188         spin_lock(&lli->lli_sa_lock);
1189         thread_set_flags(sa_thread, SVC_STOPPED);
1190         spin_unlock(&lli->lli_sa_lock);
1191
1192         CDEBUG(D_READA, "statahead thread stopped: sai %p, parent %pd\n",
1193                sai, parent);
1194
1195         wake_up(&sai->sai_waitq);
1196         wake_up(&sa_thread->t_ctl_waitq);
1197         ll_sai_put(sai);
1198
1199         return rc;
1200 }
1201
1202 /* authorize opened dir handle @key to statahead */
1203 void ll_authorize_statahead(struct inode *dir, void *key)
1204 {
1205         struct ll_inode_info *lli = ll_i2info(dir);
1206
1207         spin_lock(&lli->lli_sa_lock);
1208         if (!lli->lli_opendir_key && !lli->lli_sai) {
1209                 /*
1210                  * if lli_sai is not NULL, it means previous statahead is not
1211                  * finished yet, we'd better not start a new statahead for now.
1212                  */
1213                 LASSERT(!lli->lli_opendir_pid);
1214                 lli->lli_opendir_key = key;
1215                 lli->lli_opendir_pid = current_pid();
1216                 lli->lli_sa_enabled = 1;
1217         }
1218         spin_unlock(&lli->lli_sa_lock);
1219 }
1220
1221 /*
1222  * deauthorize opened dir handle @key to statahead, but statahead thread may
1223  * still be running, notify it to quit.
1224  */
1225 void ll_deauthorize_statahead(struct inode *dir, void *key)
1226 {
1227         struct ll_inode_info *lli = ll_i2info(dir);
1228         struct ll_statahead_info *sai;
1229
1230         LASSERT(lli->lli_opendir_key == key);
1231         LASSERT(lli->lli_opendir_pid);
1232
1233         CDEBUG(D_READA, "deauthorize statahead for "DFID"\n",
1234                PFID(&lli->lli_fid));
1235
1236         spin_lock(&lli->lli_sa_lock);
1237         lli->lli_opendir_key = NULL;
1238         lli->lli_opendir_pid = 0;
1239         lli->lli_sa_enabled = 0;
1240         sai = lli->lli_sai;
1241         if (sai && thread_is_running(&sai->sai_thread)) {
1242                 /*
1243                  * statahead thread may not quit yet because it needs to cache
1244                  * entries, now it's time to tell it to quit.
1245                  */
1246                 thread_set_flags(&sai->sai_thread, SVC_STOPPING);
1247                 wake_up(&sai->sai_thread.t_ctl_waitq);
1248         }
1249         spin_unlock(&lli->lli_sa_lock);
1250 }
1251
1252 enum {
1253         /**
1254          * not first dirent, or is "."
1255          */
1256         LS_NOT_FIRST_DE = 0,
1257         /**
1258          * the first non-hidden dirent
1259          */
1260         LS_FIRST_DE,
1261         /**
1262          * the first hidden dirent, that is "."
1263          */
1264         LS_FIRST_DOT_DE
1265 };
1266
1267 /* file is first dirent under @dir */
1268 static int is_first_dirent(struct inode *dir, struct dentry *dentry)
1269 {
1270         const struct qstr  *target = &dentry->d_name;
1271         struct md_op_data *op_data;
1272         struct page       *page;
1273         __u64            pos    = 0;
1274         int                dot_de;
1275         int rc = LS_NOT_FIRST_DE;
1276
1277         op_data = ll_prep_md_op_data(NULL, dir, dir, NULL, 0, 0,
1278                                      LUSTRE_OPC_ANY, dir);
1279         if (IS_ERR(op_data))
1280                 return PTR_ERR(op_data);
1281         /**
1282          * FIXME choose the start offset of the readdir
1283          */
1284         op_data->op_max_pages = ll_i2sbi(dir)->ll_md_brw_pages;
1285
1286         page = ll_get_dir_page(dir, op_data, pos);
1287
1288         while (1) {
1289                 struct lu_dirpage *dp;
1290                 struct lu_dirent  *ent;
1291
1292                 if (IS_ERR(page)) {
1293                         struct ll_inode_info *lli = ll_i2info(dir);
1294
1295                         rc = PTR_ERR(page);
1296                         CERROR("%s: error reading dir "DFID" at %llu: opendir_pid = %u : rc = %d\n",
1297                                ll_get_fsname(dir->i_sb, NULL, 0),
1298                                PFID(ll_inode2fid(dir)), pos,
1299                                lli->lli_opendir_pid, rc);
1300                         break;
1301                 }
1302
1303                 dp = page_address(page);
1304                 for (ent = lu_dirent_start(dp); ent;
1305                      ent = lu_dirent_next(ent)) {
1306                         __u64 hash;
1307                         int namelen;
1308                         char *name;
1309
1310                         hash = le64_to_cpu(ent->lde_hash);
1311                         /* The ll_get_dir_page() can return any page containing
1312                          * the given hash which may be not the start hash.
1313                          */
1314                         if (unlikely(hash < pos))
1315                                 continue;
1316
1317                         namelen = le16_to_cpu(ent->lde_namelen);
1318                         if (unlikely(namelen == 0))
1319                                 /*
1320                                  * skip dummy record.
1321                                  */
1322                                 continue;
1323
1324                         name = ent->lde_name;
1325                         if (name[0] == '.') {
1326                                 if (namelen == 1)
1327                                         /*
1328                                          * skip "."
1329                                          */
1330                                         continue;
1331                                 else if (name[1] == '.' && namelen == 2)
1332                                         /*
1333                                          * skip ".."
1334                                          */
1335                                         continue;
1336                                 else
1337                                         dot_de = 1;
1338                         } else {
1339                                 dot_de = 0;
1340                         }
1341
1342                         if (dot_de && target->name[0] != '.') {
1343                                 CDEBUG(D_READA, "%.*s skip hidden file %.*s\n",
1344                                        target->len, target->name,
1345                                        namelen, name);
1346                                 continue;
1347                         }
1348
1349                         if (target->len != namelen ||
1350                             memcmp(target->name, name, namelen) != 0)
1351                                 rc = LS_NOT_FIRST_DE;
1352                         else if (!dot_de)
1353                                 rc = LS_FIRST_DE;
1354                         else
1355                                 rc = LS_FIRST_DOT_DE;
1356
1357                         ll_release_page(dir, page, false);
1358                         goto out;
1359                 }
1360                 pos = le64_to_cpu(dp->ldp_hash_end);
1361                 if (pos == MDS_DIR_END_OFF) {
1362                         /*
1363                          * End of directory reached.
1364                          */
1365                         ll_release_page(dir, page, false);
1366                         goto out;
1367                 } else {
1368                         /*
1369                          * chain is exhausted
1370                          * Normal case: continue to the next page.
1371                          */
1372                         ll_release_page(dir, page,
1373                                         le32_to_cpu(dp->ldp_flags) &
1374                                         LDF_COLLIDE);
1375                         page = ll_get_dir_page(dir, op_data, pos);
1376                 }
1377         }
1378 out:
1379         ll_finish_md_op_data(op_data);
1380         return rc;
1381 }
1382
1383 /**
1384  * revalidate @dentryp from statahead cache
1385  *
1386  * \param[in]  dir      parent directory
1387  * \param[in]  sai      sai structure
1388  * \param[out] dentryp  pointer to dentry which will be revalidated
1389  * \param[in]  unplug   unplug statahead window only (normally for negative
1390  *                      dentry)
1391  * \retval              1 on success, dentry is saved in @dentryp
1392  * \retval              0 if revalidation failed (no proper lock on client)
1393  * \retval              negative number upon error
1394  */
1395 static int revalidate_statahead_dentry(struct inode *dir,
1396                                        struct ll_statahead_info *sai,
1397                                        struct dentry **dentryp,
1398                                        bool unplug)
1399 {
1400         struct sa_entry *entry = NULL;
1401         struct l_wait_info lwi = { 0 };
1402         struct ll_dentry_data *ldd;
1403         struct ll_inode_info *lli;
1404         int rc = 0;
1405
1406         if ((*dentryp)->d_name.name[0] == '.') {
1407                 if (sai->sai_ls_all ||
1408                     sai->sai_miss_hidden >= sai->sai_skip_hidden) {
1409                         /*
1410                          * Hidden dentry is the first one, or statahead
1411                          * thread does not skip so many hidden dentries
1412                          * before "sai_ls_all" enabled as below.
1413                          */
1414                 } else {
1415                         if (!sai->sai_ls_all)
1416                                 /*
1417                                  * It maybe because hidden dentry is not
1418                                  * the first one, "sai_ls_all" was not
1419                                  * set, then "ls -al" missed. Enable
1420                                  * "sai_ls_all" for such case.
1421                                  */
1422                                 sai->sai_ls_all = 1;
1423
1424                         /*
1425                          * Such "getattr" has been skipped before
1426                          * "sai_ls_all" enabled as above.
1427                          */
1428                         sai->sai_miss_hidden++;
1429                         return -EAGAIN;
1430                 }
1431         }
1432
1433         if (unplug) {
1434                 rc = 1;
1435                 goto out_unplug;
1436         }
1437
1438         entry = sa_get(sai, &(*dentryp)->d_name);
1439         if (!entry) {
1440                 rc = -EAGAIN;
1441                 goto out_unplug;
1442         }
1443
1444         /* if statahead is busy in readdir, help it do post-work */
1445         if (!sa_ready(entry) && sai->sai_in_readpage)
1446                 sa_handle_callback(sai);
1447
1448         if (!sa_ready(entry)) {
1449                 sai->sai_index_wait = entry->se_index;
1450                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(30), NULL,
1451                                        LWI_ON_SIGNAL_NOOP, NULL);
1452                 rc = l_wait_event(sai->sai_waitq, sa_ready(entry), &lwi);
1453                 if (rc < 0) {
1454                         /*
1455                          * entry may not be ready, so it may be used by inflight
1456                          * statahead RPC, don't free it.
1457                          */
1458                         entry = NULL;
1459                         rc = -EAGAIN;
1460                         goto out_unplug;
1461                 }
1462         }
1463
1464         if (entry->se_state == SA_ENTRY_SUCC && entry->se_inode) {
1465                 struct inode *inode = entry->se_inode;
1466                 struct lookup_intent it = { .it_op = IT_GETATTR,
1467                                             .it_lock_handle = entry->se_handle };
1468                 __u64 bits;
1469
1470                 rc = md_revalidate_lock(ll_i2mdexp(dir), &it,
1471                                         ll_inode2fid(inode), &bits);
1472                 if (rc == 1) {
1473                         if (!(*dentryp)->d_inode) {
1474                                 struct dentry *alias;
1475
1476                                 alias = ll_splice_alias(inode, *dentryp);
1477                                 if (IS_ERR(alias)) {
1478                                         rc = PTR_ERR(alias);
1479                                         goto out_unplug;
1480                                 }
1481                                 *dentryp = alias;
1482                                 /**
1483                                  * statahead prepared this inode, transfer inode
1484                                  * refcount from sa_entry to dentry
1485                                  */
1486                                 entry->se_inode = NULL;
1487                         } else if ((*dentryp)->d_inode != inode) {
1488                                 /* revalidate, but inode is recreated */
1489                                 CDEBUG(D_READA,
1490                                        "%s: stale dentry %pd inode "DFID", statahead inode "DFID"\n",
1491                                        ll_get_fsname((*dentryp)->d_inode->i_sb,
1492                                                      NULL, 0),
1493                                        *dentryp,
1494                                        PFID(ll_inode2fid((*dentryp)->d_inode)),
1495                                        PFID(ll_inode2fid(inode)));
1496                                 rc = -ESTALE;
1497                                 goto out_unplug;
1498                         }
1499
1500                         if ((bits & MDS_INODELOCK_LOOKUP) &&
1501                             d_lustre_invalid(*dentryp))
1502                                 d_lustre_revalidate(*dentryp);
1503                         ll_intent_release(&it);
1504                 }
1505         }
1506 out_unplug:
1507         /*
1508          * statahead cached sa_entry can be used only once, and will be killed
1509          * right after use, so if lookup/revalidate accessed statahead cache,
1510          * set dentry ldd_sa_generation to parent lli_sa_generation, later if we
1511          * stat this file again, we know we've done statahead before, see
1512          * dentry_may_statahead().
1513          */
1514         ldd = ll_d2d(*dentryp);
1515         lli = ll_i2info(dir);
1516         /* ldd can be NULL if llite lookup failed. */
1517         if (ldd)
1518                 ldd->lld_sa_generation = lli->lli_sa_generation;
1519         sa_put(sai, entry);
1520         return rc;
1521 }
1522
1523 /**
1524  * start statahead thread
1525  *
1526  * \param[in] dir       parent directory
1527  * \param[in] dentry    dentry that triggers statahead, normally the first
1528  *                      dirent under @dir
1529  * \retval              -EAGAIN on success, because when this function is
1530  *                      called, it's already in lookup call, so client should
1531  *                      do it itself instead of waiting for statahead thread
1532  *                      to do it asynchronously.
1533  * \retval              negative number upon error
1534  */
1535 static int start_statahead_thread(struct inode *dir, struct dentry *dentry)
1536 {
1537         struct ll_inode_info *lli = ll_i2info(dir);
1538         struct ll_statahead_info *sai = NULL;
1539         struct l_wait_info lwi = { 0 };
1540         struct ptlrpc_thread *thread;
1541         struct task_struct *task;
1542         struct dentry *parent = dentry->d_parent;
1543         int rc;
1544
1545         /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */
1546         rc = is_first_dirent(dir, dentry);
1547         if (rc == LS_NOT_FIRST_DE) {
1548                 /* It is not "ls -{a}l" operation, no need statahead for it. */
1549                 rc = -EFAULT;
1550                 goto out;
1551         }
1552
1553         sai = ll_sai_alloc(parent);
1554         if (!sai) {
1555                 rc = -ENOMEM;
1556                 goto out;
1557         }
1558
1559         sai->sai_ls_all = (rc == LS_FIRST_DOT_DE);
1560         /*
1561          * if current lli_opendir_key was deauthorized, or dir re-opened by
1562          * another process, don't start statahead, otherwise the newly spawned
1563          * statahead thread won't be notified to quit.
1564          */
1565         spin_lock(&lli->lli_sa_lock);
1566         if (unlikely(lli->lli_sai || lli->lli_opendir_key ||
1567                      lli->lli_opendir_pid != current->pid)) {
1568                 spin_unlock(&lli->lli_sa_lock);
1569                 rc = -EPERM;
1570                 goto out;
1571         }
1572         lli->lli_sai = sai;
1573         spin_unlock(&lli->lli_sa_lock);
1574
1575         atomic_inc(&ll_i2sbi(parent->d_inode)->ll_sa_running);
1576
1577         CDEBUG(D_READA, "start statahead thread: [pid %d] [parent %pd]\n",
1578                current_pid(), parent);
1579
1580         task = kthread_run(ll_statahead_thread, parent, "ll_sa_%u",
1581                            lli->lli_opendir_pid);
1582         thread = &sai->sai_thread;
1583         if (IS_ERR(task)) {
1584                 rc = PTR_ERR(task);
1585                 CERROR("can't start ll_sa thread, rc : %d\n", rc);
1586                 goto out;
1587         }
1588
1589         l_wait_event(thread->t_ctl_waitq,
1590                      thread_is_running(thread) || thread_is_stopped(thread),
1591                      &lwi);
1592         ll_sai_put(sai);
1593
1594         /*
1595          * We don't stat-ahead for the first dirent since we are already in
1596          * lookup.
1597          */
1598         return -EAGAIN;
1599
1600 out:
1601         /*
1602          * once we start statahead thread failed, disable statahead so
1603          * that subsequent stat won't waste time to try it.
1604          */
1605         spin_lock(&lli->lli_sa_lock);
1606         lli->lli_sa_enabled = 0;
1607         lli->lli_sai = NULL;
1608         spin_unlock(&lli->lli_sa_lock);
1609         if (sai)
1610                 ll_sai_free(sai);
1611         return rc;
1612 }
1613
1614 /**
1615  * statahead entry function, this is called when client getattr on a file, it
1616  * will start statahead thread if this is the first dir entry, else revalidate
1617  * dentry from statahead cache.
1618  *
1619  * \param[in]  dir      parent directory
1620  * \param[out] dentryp  dentry to getattr
1621  * \param[in]  unplug   unplug statahead window only (normally for negative
1622  *                      dentry)
1623  * \retval              1 on success
1624  * \retval              0 revalidation from statahead cache failed, caller needs
1625  *                      to getattr from server directly
1626  * \retval              negative number on error, caller often ignores this and
1627  *                      then getattr from server
1628  */
1629 int ll_statahead(struct inode *dir, struct dentry **dentryp, bool unplug)
1630 {
1631         struct ll_statahead_info *sai;
1632
1633         sai = ll_sai_get(dir);
1634         if (sai) {
1635                 int rc;
1636
1637                 rc = revalidate_statahead_dentry(dir, sai, dentryp, unplug);
1638                 CDEBUG(D_READA, "revalidate statahead %pd: %d.\n",
1639                        *dentryp, rc);
1640                 ll_sai_put(sai);
1641                 return rc;
1642         }
1643         return start_statahead_thread(dir, *dentryp);
1644 }