Merge tag 'pm-4.8-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/rafael/linux-pm
[cascardo/linux.git] / drivers / staging / lustre / lustre / obdclass / lu_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/lu_object.c
33  *
34  * Lustre Object.
35  * These are the only exported functions, they provide some generic
36  * infrastructure for managing object devices
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42
43 #include "../../include/linux/libcfs/libcfs.h"
44
45 # include <linux/module.h>
46
47 /* hash_long() */
48 #include "../../include/linux/libcfs/libcfs_hash.h"
49 #include "../include/obd_class.h"
50 #include "../include/obd_support.h"
51 #include "../include/lustre_disk.h"
52 #include "../include/lustre_fid.h"
53 #include "../include/lu_object.h"
54 #include "../include/cl_object.h"
55 #include "../include/lu_ref.h"
56 #include <linux/list.h>
57
58 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
59 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
60
61 /**
62  * Decrease reference counter on object. If last reference is freed, return
63  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
64  * case, free object immediately.
65  */
66 void lu_object_put(const struct lu_env *env, struct lu_object *o)
67 {
68         struct lu_site_bkt_data *bkt;
69         struct lu_object_header *top;
70         struct lu_site    *site;
71         struct lu_object        *orig;
72         struct cfs_hash_bd          bd;
73         const struct lu_fid     *fid;
74
75         top  = o->lo_header;
76         site = o->lo_dev->ld_site;
77         orig = o;
78
79         /*
80          * till we have full fids-on-OST implemented anonymous objects
81          * are possible in OSP. such an object isn't listed in the site
82          * so we should not remove it from the site.
83          */
84         fid = lu_object_fid(o);
85         if (fid_is_zero(fid)) {
86                 LASSERT(!top->loh_hash.next && !top->loh_hash.pprev);
87                 LASSERT(list_empty(&top->loh_lru));
88                 if (!atomic_dec_and_test(&top->loh_ref))
89                         return;
90                 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
91                         if (o->lo_ops->loo_object_release)
92                                 o->lo_ops->loo_object_release(env, o);
93                 }
94                 lu_object_free(env, orig);
95                 return;
96         }
97
98         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
99         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
100
101         if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
102                 if (lu_object_is_dying(top)) {
103                         /*
104                          * somebody may be waiting for this, currently only
105                          * used for cl_object, see cl_object_put_last().
106                          */
107                         wake_up_all(&bkt->lsb_marche_funebre);
108                 }
109                 return;
110         }
111
112         /*
113          * When last reference is released, iterate over object
114          * layers, and notify them that object is no longer busy.
115          */
116         list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
117                 if (o->lo_ops->loo_object_release)
118                         o->lo_ops->loo_object_release(env, o);
119         }
120
121         if (!lu_object_is_dying(top)) {
122                 LASSERT(list_empty(&top->loh_lru));
123                 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
124                 bkt->lsb_lru_len++;
125                 lprocfs_counter_incr(site->ls_stats, LU_SS_LRU_LEN);
126                 CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, lru_len: %ld\n",
127                        o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
128                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
129                 return;
130         }
131
132         /*
133          * If object is dying (will not be cached), then removed it
134          * from hash table and LRU.
135          *
136          * This is done with hash table and LRU lists locked. As the only
137          * way to acquire first reference to previously unreferenced
138          * object is through hash-table lookup (lu_object_find()),
139          * or LRU scanning (lu_site_purge()), that are done under hash-table
140          * and LRU lock, no race with concurrent object lookup is possible
141          * and we can safely destroy object below.
142          */
143         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
144                 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
145         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
146         /*
147          * Object was already removed from hash and lru above, can
148          * kill it.
149          */
150         lu_object_free(env, orig);
151 }
152 EXPORT_SYMBOL(lu_object_put);
153
154 /**
155  * Kill the object and take it out of LRU cache.
156  * Currently used by client code for layout change.
157  */
158 void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
159 {
160         struct lu_object_header *top;
161
162         top = o->lo_header;
163         set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
164         if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
165                 struct lu_site *site = o->lo_dev->ld_site;
166                 struct cfs_hash *obj_hash = site->ls_obj_hash;
167                 struct cfs_hash_bd bd;
168
169                 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
170                 if (!list_empty(&top->loh_lru)) {
171                         struct lu_site_bkt_data *bkt;
172
173                         list_del_init(&top->loh_lru);
174                         bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
175                         bkt->lsb_lru_len--;
176                         lprocfs_counter_decr(site->ls_stats, LU_SS_LRU_LEN);
177                 }
178                 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
179                 cfs_hash_bd_unlock(obj_hash, &bd, 1);
180         }
181 }
182 EXPORT_SYMBOL(lu_object_unhash);
183
184 /**
185  * Allocate new object.
186  *
187  * This follows object creation protocol, described in the comment within
188  * struct lu_device_operations definition.
189  */
190 static struct lu_object *lu_object_alloc(const struct lu_env *env,
191                                          struct lu_device *dev,
192                                          const struct lu_fid *f,
193                                          const struct lu_object_conf *conf)
194 {
195         struct lu_object *scan;
196         struct lu_object *top;
197         struct list_head *layers;
198         unsigned int init_mask = 0;
199         unsigned int init_flag;
200         int clean;
201         int result;
202
203         /*
204          * Create top-level object slice. This will also create
205          * lu_object_header.
206          */
207         top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
208         if (!top)
209                 return ERR_PTR(-ENOMEM);
210         if (IS_ERR(top))
211                 return top;
212         /*
213          * This is the only place where object fid is assigned. It's constant
214          * after this point.
215          */
216         top->lo_header->loh_fid = *f;
217         layers = &top->lo_header->loh_layers;
218
219         do {
220                 /*
221                  * Call ->loo_object_init() repeatedly, until no more new
222                  * object slices are created.
223                  */
224                 clean = 1;
225                 init_flag = 1;
226                 list_for_each_entry(scan, layers, lo_linkage) {
227                         if (init_mask & init_flag)
228                                 goto next;
229                         clean = 0;
230                         scan->lo_header = top->lo_header;
231                         result = scan->lo_ops->loo_object_init(env, scan, conf);
232                         if (result != 0) {
233                                 lu_object_free(env, top);
234                                 return ERR_PTR(result);
235                         }
236                         init_mask |= init_flag;
237 next:
238                         init_flag <<= 1;
239                 }
240         } while (!clean);
241
242         list_for_each_entry_reverse(scan, layers, lo_linkage) {
243                 if (scan->lo_ops->loo_object_start) {
244                         result = scan->lo_ops->loo_object_start(env, scan);
245                         if (result != 0) {
246                                 lu_object_free(env, top);
247                                 return ERR_PTR(result);
248                         }
249                 }
250         }
251
252         lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
253         return top;
254 }
255
256 /**
257  * Free an object.
258  */
259 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
260 {
261         struct lu_site_bkt_data *bkt;
262         struct lu_site    *site;
263         struct lu_object        *scan;
264         struct list_head              *layers;
265         struct list_head               splice;
266
267         site   = o->lo_dev->ld_site;
268         layers = &o->lo_header->loh_layers;
269         bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
270         /*
271          * First call ->loo_object_delete() method to release all resources.
272          */
273         list_for_each_entry_reverse(scan, layers, lo_linkage) {
274                 if (scan->lo_ops->loo_object_delete)
275                         scan->lo_ops->loo_object_delete(env, scan);
276         }
277
278         /*
279          * Then, splice object layers into stand-alone list, and call
280          * ->loo_object_free() on all layers to free memory. Splice is
281          * necessary, because lu_object_header is freed together with the
282          * top-level slice.
283          */
284         INIT_LIST_HEAD(&splice);
285         list_splice_init(layers, &splice);
286         while (!list_empty(&splice)) {
287                 /*
288                  * Free layers in bottom-to-top order, so that object header
289                  * lives as long as possible and ->loo_object_free() methods
290                  * can look at its contents.
291                  */
292                 o = container_of0(splice.prev, struct lu_object, lo_linkage);
293                 list_del_init(&o->lo_linkage);
294                 o->lo_ops->loo_object_free(env, o);
295         }
296
297         if (waitqueue_active(&bkt->lsb_marche_funebre))
298                 wake_up_all(&bkt->lsb_marche_funebre);
299 }
300
301 /**
302  * Free \a nr objects from the cold end of the site LRU list.
303  */
304 int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
305 {
306         struct lu_object_header *h;
307         struct lu_object_header *temp;
308         struct lu_site_bkt_data *bkt;
309         struct cfs_hash_bd          bd;
310         struct cfs_hash_bd          bd2;
311         struct list_head               dispose;
312         int                   did_sth;
313         int                   start;
314         int                   count;
315         int                   bnr;
316         int                   i;
317
318         if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
319                 return 0;
320
321         INIT_LIST_HEAD(&dispose);
322         /*
323          * Under LRU list lock, scan LRU list and move unreferenced objects to
324          * the dispose list, removing them from LRU and hash table.
325          */
326         start = s->ls_purge_start;
327         bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
328  again:
329         did_sth = 0;
330         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
331                 if (i < start)
332                         continue;
333                 count = bnr;
334                 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
335                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
336
337                 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
338                         LASSERT(atomic_read(&h->loh_ref) == 0);
339
340                         cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
341                         LASSERT(bd.bd_bucket == bd2.bd_bucket);
342
343                         cfs_hash_bd_del_locked(s->ls_obj_hash,
344                                                &bd2, &h->loh_hash);
345                         list_move(&h->loh_lru, &dispose);
346                         bkt->lsb_lru_len--;
347                         lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
348                         if (did_sth == 0)
349                                 did_sth = 1;
350
351                         if (nr != ~0 && --nr == 0)
352                                 break;
353
354                         if (count > 0 && --count == 0)
355                                 break;
356                 }
357                 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
358                 cond_resched();
359                 /*
360                  * Free everything on the dispose list. This is safe against
361                  * races due to the reasons described in lu_object_put().
362                  */
363                 while (!list_empty(&dispose)) {
364                         h = container_of0(dispose.next,
365                                           struct lu_object_header, loh_lru);
366                         list_del_init(&h->loh_lru);
367                         lu_object_free(env, lu_object_top(h));
368                         lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
369                 }
370
371                 if (nr == 0)
372                         break;
373         }
374
375         if (nr != 0 && did_sth && start != 0) {
376                 start = 0; /* restart from the first bucket */
377                 goto again;
378         }
379         /* race on s->ls_purge_start, but nobody cares */
380         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
381
382         return nr;
383 }
384 EXPORT_SYMBOL(lu_site_purge);
385
386 /*
387  * Object printing.
388  *
389  * Code below has to jump through certain loops to output object description
390  * into libcfs_debug_msg-based log. The problem is that lu_object_print()
391  * composes object description from strings that are parts of _lines_ of
392  * output (i.e., strings that are not terminated by newline). This doesn't fit
393  * very well into libcfs_debug_msg() interface that assumes that each message
394  * supplied to it is a self-contained output line.
395  *
396  * To work around this, strings are collected in a temporary buffer
397  * (implemented as a value of lu_cdebug_key key), until terminating newline
398  * character is detected.
399  *
400  */
401
402 enum {
403         /**
404          * Maximal line size.
405          *
406          * XXX overflow is not handled correctly.
407          */
408         LU_CDEBUG_LINE = 512
409 };
410
411 struct lu_cdebug_data {
412         /**
413          * Temporary buffer.
414          */
415         char lck_area[LU_CDEBUG_LINE];
416 };
417
418 /* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
419 LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
420
421 /**
422  * Key, holding temporary buffer. This key is registered very early by
423  * lu_global_init().
424  */
425 static struct lu_context_key lu_global_key = {
426         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
427                     LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
428         .lct_init = lu_global_key_init,
429         .lct_fini = lu_global_key_fini
430 };
431
432 /**
433  * Printer function emitting messages through libcfs_debug_msg().
434  */
435 int lu_cdebug_printer(const struct lu_env *env,
436                       void *cookie, const char *format, ...)
437 {
438         struct libcfs_debug_msg_data *msgdata = cookie;
439         struct lu_cdebug_data   *key;
440         int used;
441         int complete;
442         va_list args;
443
444         va_start(args, format);
445
446         key = lu_context_key_get(&env->le_ctx, &lu_global_key);
447
448         used = strlen(key->lck_area);
449         complete = format[strlen(format) - 1] == '\n';
450         /*
451          * Append new chunk to the buffer.
452          */
453         vsnprintf(key->lck_area + used,
454                   ARRAY_SIZE(key->lck_area) - used, format, args);
455         if (complete) {
456                 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
457                         libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
458                 key->lck_area[0] = 0;
459         }
460         va_end(args);
461         return 0;
462 }
463 EXPORT_SYMBOL(lu_cdebug_printer);
464
465 /**
466  * Print object header.
467  */
468 void lu_object_header_print(const struct lu_env *env, void *cookie,
469                             lu_printer_t printer,
470                             const struct lu_object_header *hdr)
471 {
472         (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
473                    hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
474                    PFID(&hdr->loh_fid),
475                    hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
476                    list_empty((struct list_head *)&hdr->loh_lru) ? \
477                    "" : " lru",
478                    hdr->loh_attr & LOHA_EXISTS ? " exist":"");
479 }
480 EXPORT_SYMBOL(lu_object_header_print);
481
482 /**
483  * Print human readable representation of the \a o to the \a printer.
484  */
485 void lu_object_print(const struct lu_env *env, void *cookie,
486                      lu_printer_t printer, const struct lu_object *o)
487 {
488         static const char ruler[] = "........................................";
489         struct lu_object_header *top;
490         int depth = 4;
491
492         top = o->lo_header;
493         lu_object_header_print(env, cookie, printer, top);
494         (*printer)(env, cookie, "{\n");
495
496         list_for_each_entry(o, &top->loh_layers, lo_linkage) {
497                 /*
498                  * print `.' \a depth times followed by type name and address
499                  */
500                 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
501                            o->lo_dev->ld_type->ldt_name, o);
502
503                 if (o->lo_ops->loo_object_print)
504                         (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
505
506                 (*printer)(env, cookie, "\n");
507         }
508
509         (*printer)(env, cookie, "} header@%p\n", top);
510 }
511 EXPORT_SYMBOL(lu_object_print);
512
513 static struct lu_object *htable_lookup(struct lu_site *s,
514                                        struct cfs_hash_bd *bd,
515                                        const struct lu_fid *f,
516                                        wait_queue_t *waiter,
517                                        __u64 *version)
518 {
519         struct lu_site_bkt_data *bkt;
520         struct lu_object_header *h;
521         struct hlist_node       *hnode;
522         __u64  ver = cfs_hash_bd_version_get(bd);
523
524         if (*version == ver)
525                 return ERR_PTR(-ENOENT);
526
527         *version = ver;
528         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
529         /* cfs_hash_bd_peek_locked is a somehow "internal" function
530          * of cfs_hash, it doesn't add refcount on object.
531          */
532         hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
533         if (!hnode) {
534                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
535                 return ERR_PTR(-ENOENT);
536         }
537
538         h = container_of0(hnode, struct lu_object_header, loh_hash);
539         if (likely(!lu_object_is_dying(h))) {
540                 cfs_hash_get(s->ls_obj_hash, hnode);
541                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
542                 if (!list_empty(&h->loh_lru)) {
543                         list_del_init(&h->loh_lru);
544                         bkt->lsb_lru_len--;
545                         lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
546                 }
547                 return lu_object_top(h);
548         }
549
550         /*
551          * Lookup found an object being destroyed this object cannot be
552          * returned (to assure that references to dying objects are eventually
553          * drained), and moreover, lookup has to wait until object is freed.
554          */
555
556         init_waitqueue_entry(waiter, current);
557         add_wait_queue(&bkt->lsb_marche_funebre, waiter);
558         set_current_state(TASK_UNINTERRUPTIBLE);
559         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
560         return ERR_PTR(-EAGAIN);
561 }
562
563 /**
564  * Search cache for an object with the fid \a f. If such object is found,
565  * return it. Otherwise, create new object, insert it into cache and return
566  * it. In any case, additional reference is acquired on the returned object.
567  */
568 static struct lu_object *lu_object_find(const struct lu_env *env,
569                                         struct lu_device *dev,
570                                         const struct lu_fid *f,
571                                         const struct lu_object_conf *conf)
572 {
573         return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
574 }
575
576 static struct lu_object *lu_object_new(const struct lu_env *env,
577                                        struct lu_device *dev,
578                                        const struct lu_fid *f,
579                                        const struct lu_object_conf *conf)
580 {
581         struct lu_object        *o;
582         struct cfs_hash       *hs;
583         struct cfs_hash_bd          bd;
584
585         o = lu_object_alloc(env, dev, f, conf);
586         if (IS_ERR(o))
587                 return o;
588
589         hs = dev->ld_site->ls_obj_hash;
590         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
591         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
592         cfs_hash_bd_unlock(hs, &bd, 1);
593         return o;
594 }
595
596 /**
597  * Core logic of lu_object_find*() functions.
598  */
599 static struct lu_object *lu_object_find_try(const struct lu_env *env,
600                                             struct lu_device *dev,
601                                             const struct lu_fid *f,
602                                             const struct lu_object_conf *conf,
603                                             wait_queue_t *waiter)
604 {
605         struct lu_object      *o;
606         struct lu_object      *shadow;
607         struct lu_site  *s;
608         struct cfs_hash     *hs;
609         struct cfs_hash_bd        bd;
610         __u64             version = 0;
611
612         /*
613          * This uses standard index maintenance protocol:
614          *
615          *     - search index under lock, and return object if found;
616          *     - otherwise, unlock index, allocate new object;
617          *     - lock index and search again;
618          *     - if nothing is found (usual case), insert newly created
619          *       object into index;
620          *     - otherwise (race: other thread inserted object), free
621          *       object just allocated.
622          *     - unlock index;
623          *     - return object.
624          *
625          * For "LOC_F_NEW" case, we are sure the object is new established.
626          * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
627          * just alloc and insert directly.
628          *
629          * If dying object is found during index search, add @waiter to the
630          * site wait-queue and return ERR_PTR(-EAGAIN).
631          */
632         if (conf && conf->loc_flags & LOC_F_NEW)
633                 return lu_object_new(env, dev, f, conf);
634
635         s  = dev->ld_site;
636         hs = s->ls_obj_hash;
637         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
638         o = htable_lookup(s, &bd, f, waiter, &version);
639         cfs_hash_bd_unlock(hs, &bd, 1);
640         if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
641                 return o;
642
643         /*
644          * Allocate new object. This may result in rather complicated
645          * operations, including fld queries, inode loading, etc.
646          */
647         o = lu_object_alloc(env, dev, f, conf);
648         if (IS_ERR(o))
649                 return o;
650
651         LASSERT(lu_fid_eq(lu_object_fid(o), f));
652
653         cfs_hash_bd_lock(hs, &bd, 1);
654
655         shadow = htable_lookup(s, &bd, f, waiter, &version);
656         if (likely(PTR_ERR(shadow) == -ENOENT)) {
657                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
658                 cfs_hash_bd_unlock(hs, &bd, 1);
659                 return o;
660         }
661
662         lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
663         cfs_hash_bd_unlock(hs, &bd, 1);
664         lu_object_free(env, o);
665         return shadow;
666 }
667
668 /**
669  * Much like lu_object_find(), but top level device of object is specifically
670  * \a dev rather than top level device of the site. This interface allows
671  * objects of different "stacking" to be created within the same site.
672  */
673 struct lu_object *lu_object_find_at(const struct lu_env *env,
674                                     struct lu_device *dev,
675                                     const struct lu_fid *f,
676                                     const struct lu_object_conf *conf)
677 {
678         struct lu_site_bkt_data *bkt;
679         struct lu_object        *obj;
680         wait_queue_t       wait;
681
682         while (1) {
683                 obj = lu_object_find_try(env, dev, f, conf, &wait);
684                 if (obj != ERR_PTR(-EAGAIN))
685                         return obj;
686                 /*
687                  * lu_object_find_try() already added waiter into the
688                  * wait queue.
689                  */
690                 schedule();
691                 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
692                 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
693         }
694 }
695 EXPORT_SYMBOL(lu_object_find_at);
696
697 /**
698  * Find object with given fid, and return its slice belonging to given device.
699  */
700 struct lu_object *lu_object_find_slice(const struct lu_env *env,
701                                        struct lu_device *dev,
702                                        const struct lu_fid *f,
703                                        const struct lu_object_conf *conf)
704 {
705         struct lu_object *top;
706         struct lu_object *obj;
707
708         top = lu_object_find(env, dev, f, conf);
709         if (!IS_ERR(top)) {
710                 obj = lu_object_locate(top->lo_header, dev->ld_type);
711                 if (!obj)
712                         lu_object_put(env, top);
713         } else {
714                 obj = top;
715         }
716         return obj;
717 }
718 EXPORT_SYMBOL(lu_object_find_slice);
719
720 /**
721  * Global list of all device types.
722  */
723 static LIST_HEAD(lu_device_types);
724
725 int lu_device_type_init(struct lu_device_type *ldt)
726 {
727         int result = 0;
728
729         INIT_LIST_HEAD(&ldt->ldt_linkage);
730         if (ldt->ldt_ops->ldto_init)
731                 result = ldt->ldt_ops->ldto_init(ldt);
732         if (result == 0)
733                 list_add(&ldt->ldt_linkage, &lu_device_types);
734         return result;
735 }
736 EXPORT_SYMBOL(lu_device_type_init);
737
738 void lu_device_type_fini(struct lu_device_type *ldt)
739 {
740         list_del_init(&ldt->ldt_linkage);
741         if (ldt->ldt_ops->ldto_fini)
742                 ldt->ldt_ops->ldto_fini(ldt);
743 }
744 EXPORT_SYMBOL(lu_device_type_fini);
745
746 void lu_types_stop(void)
747 {
748         struct lu_device_type *ldt;
749
750         list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
751                 if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
752                         ldt->ldt_ops->ldto_stop(ldt);
753         }
754 }
755 EXPORT_SYMBOL(lu_types_stop);
756
757 /**
758  * Global list of all sites on this node
759  */
760 static LIST_HEAD(lu_sites);
761 static DEFINE_MUTEX(lu_sites_guard);
762
763 /**
764  * Global environment used by site shrinker.
765  */
766 static struct lu_env lu_shrink_env;
767
768 struct lu_site_print_arg {
769         struct lu_env   *lsp_env;
770         void        *lsp_cookie;
771         lu_printer_t     lsp_printer;
772 };
773
774 static int
775 lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
776                   struct hlist_node *hnode, void *data)
777 {
778         struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
779         struct lu_object_header  *h;
780
781         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
782         if (!list_empty(&h->loh_layers)) {
783                 const struct lu_object *o;
784
785                 o = lu_object_top(h);
786                 lu_object_print(arg->lsp_env, arg->lsp_cookie,
787                                 arg->lsp_printer, o);
788         } else {
789                 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
790                                        arg->lsp_printer, h);
791         }
792         return 0;
793 }
794
795 /**
796  * Print all objects in \a s.
797  */
798 void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
799                    lu_printer_t printer)
800 {
801         struct lu_site_print_arg arg = {
802                 .lsp_env     = (struct lu_env *)env,
803                 .lsp_cookie  = cookie,
804                 .lsp_printer = printer,
805         };
806
807         cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
808 }
809 EXPORT_SYMBOL(lu_site_print);
810
811 enum {
812         LU_CACHE_PERCENT_MAX     = 50,
813         LU_CACHE_PERCENT_DEFAULT = 20
814 };
815
816 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
817 module_param(lu_cache_percent, int, 0644);
818 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
819
820 /**
821  * Return desired hash table order.
822  */
823 static int lu_htable_order(void)
824 {
825         unsigned long cache_size;
826         int bits;
827
828         /*
829          * Calculate hash table size, assuming that we want reasonable
830          * performance when 20% of total memory is occupied by cache of
831          * lu_objects.
832          *
833          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
834          */
835         cache_size = totalram_pages;
836
837 #if BITS_PER_LONG == 32
838         /* limit hashtable size for lowmem systems to low RAM */
839         if (cache_size > 1 << (30 - PAGE_SHIFT))
840                 cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
841 #endif
842
843         /* clear off unreasonable cache setting. */
844         if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
845                 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
846                       lu_cache_percent, LU_CACHE_PERCENT_MAX,
847                       LU_CACHE_PERCENT_DEFAULT);
848
849                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
850         }
851         cache_size = cache_size / 100 * lu_cache_percent *
852                 (PAGE_SIZE / 1024);
853
854         for (bits = 1; (1 << bits) < cache_size; ++bits) {
855                 ;
856         }
857         return bits;
858 }
859
860 static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
861                                 const void *key, unsigned mask)
862 {
863         struct lu_fid  *fid = (struct lu_fid *)key;
864         __u32      hash;
865
866         hash = fid_flatten32(fid);
867         hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
868         hash = hash_long(hash, hs->hs_bkt_bits);
869
870         /* give me another random factor */
871         hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
872
873         hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
874         hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
875
876         return hash & mask;
877 }
878
879 static void *lu_obj_hop_object(struct hlist_node *hnode)
880 {
881         return hlist_entry(hnode, struct lu_object_header, loh_hash);
882 }
883
884 static void *lu_obj_hop_key(struct hlist_node *hnode)
885 {
886         struct lu_object_header *h;
887
888         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
889         return &h->loh_fid;
890 }
891
892 static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
893 {
894         struct lu_object_header *h;
895
896         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
897         return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
898 }
899
900 static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
901 {
902         struct lu_object_header *h;
903
904         h = hlist_entry(hnode, struct lu_object_header, loh_hash);
905         atomic_inc(&h->loh_ref);
906 }
907
908 static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
909 {
910         LBUG(); /* we should never called it */
911 }
912
913 static struct cfs_hash_ops lu_site_hash_ops = {
914         .hs_hash        = lu_obj_hop_hash,
915         .hs_key         = lu_obj_hop_key,
916         .hs_keycmp      = lu_obj_hop_keycmp,
917         .hs_object      = lu_obj_hop_object,
918         .hs_get         = lu_obj_hop_get,
919         .hs_put_locked  = lu_obj_hop_put_locked,
920 };
921
922 static void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
923 {
924         spin_lock(&s->ls_ld_lock);
925         if (list_empty(&d->ld_linkage))
926                 list_add(&d->ld_linkage, &s->ls_ld_linkage);
927         spin_unlock(&s->ls_ld_lock);
928 }
929
930 /**
931  * Initialize site \a s, with \a d as the top level device.
932  */
933 #define LU_SITE_BITS_MIN    12
934 #define LU_SITE_BITS_MAX    19
935 /**
936  * total 256 buckets, we don't want too many buckets because:
937  * - consume too much memory
938  * - avoid unbalanced LRU list
939  */
940 #define LU_SITE_BKT_BITS    8
941
942 int lu_site_init(struct lu_site *s, struct lu_device *top)
943 {
944         struct lu_site_bkt_data *bkt;
945         struct cfs_hash_bd bd;
946         char name[16];
947         int bits;
948         int i;
949
950         memset(s, 0, sizeof(*s));
951         bits = lu_htable_order();
952         snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
953         for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
954              bits >= LU_SITE_BITS_MIN; bits--) {
955                 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
956                                                  bits - LU_SITE_BKT_BITS,
957                                                  sizeof(*bkt), 0, 0,
958                                                  &lu_site_hash_ops,
959                                                  CFS_HASH_SPIN_BKTLOCK |
960                                                  CFS_HASH_NO_ITEMREF |
961                                                  CFS_HASH_DEPTH |
962                                                  CFS_HASH_ASSERT_EMPTY);
963                 if (s->ls_obj_hash)
964                         break;
965         }
966
967         if (!s->ls_obj_hash) {
968                 CERROR("failed to create lu_site hash with bits: %d\n", bits);
969                 return -ENOMEM;
970         }
971
972         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
973                 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
974                 INIT_LIST_HEAD(&bkt->lsb_lru);
975                 init_waitqueue_head(&bkt->lsb_marche_funebre);
976         }
977
978         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
979         if (!s->ls_stats) {
980                 cfs_hash_putref(s->ls_obj_hash);
981                 s->ls_obj_hash = NULL;
982                 return -ENOMEM;
983         }
984
985         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
986                              0, "created", "created");
987         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
988                              0, "cache_hit", "cache_hit");
989         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
990                              0, "cache_miss", "cache_miss");
991         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
992                              0, "cache_race", "cache_race");
993         lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
994                              0, "cache_death_race", "cache_death_race");
995         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
996                              0, "lru_purged", "lru_purged");
997         /*
998          * Unlike other counters, lru_len can be decremented so
999          * need lc_sum instead of just lc_count
1000          */
1001         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_LEN,
1002                              LPROCFS_CNTR_AVGMINMAX, "lru_len", "lru_len");
1003
1004         INIT_LIST_HEAD(&s->ls_linkage);
1005         s->ls_top_dev = top;
1006         top->ld_site = s;
1007         lu_device_get(top);
1008         lu_ref_add(&top->ld_reference, "site-top", s);
1009
1010         INIT_LIST_HEAD(&s->ls_ld_linkage);
1011         spin_lock_init(&s->ls_ld_lock);
1012
1013         lu_dev_add_linkage(s, top);
1014
1015         return 0;
1016 }
1017 EXPORT_SYMBOL(lu_site_init);
1018
1019 /**
1020  * Finalize \a s and release its resources.
1021  */
1022 void lu_site_fini(struct lu_site *s)
1023 {
1024         mutex_lock(&lu_sites_guard);
1025         list_del_init(&s->ls_linkage);
1026         mutex_unlock(&lu_sites_guard);
1027
1028         if (s->ls_obj_hash) {
1029                 cfs_hash_putref(s->ls_obj_hash);
1030                 s->ls_obj_hash = NULL;
1031         }
1032
1033         if (s->ls_top_dev) {
1034                 s->ls_top_dev->ld_site = NULL;
1035                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1036                 lu_device_put(s->ls_top_dev);
1037                 s->ls_top_dev = NULL;
1038         }
1039
1040         if (s->ls_stats)
1041                 lprocfs_free_stats(&s->ls_stats);
1042 }
1043 EXPORT_SYMBOL(lu_site_fini);
1044
1045 /**
1046  * Called when initialization of stack for this site is completed.
1047  */
1048 int lu_site_init_finish(struct lu_site *s)
1049 {
1050         int result;
1051
1052         mutex_lock(&lu_sites_guard);
1053         result = lu_context_refill(&lu_shrink_env.le_ctx);
1054         if (result == 0)
1055                 list_add(&s->ls_linkage, &lu_sites);
1056         mutex_unlock(&lu_sites_guard);
1057         return result;
1058 }
1059 EXPORT_SYMBOL(lu_site_init_finish);
1060
1061 /**
1062  * Acquire additional reference on device \a d
1063  */
1064 void lu_device_get(struct lu_device *d)
1065 {
1066         atomic_inc(&d->ld_ref);
1067 }
1068 EXPORT_SYMBOL(lu_device_get);
1069
1070 /**
1071  * Release reference on device \a d.
1072  */
1073 void lu_device_put(struct lu_device *d)
1074 {
1075         LASSERT(atomic_read(&d->ld_ref) > 0);
1076         atomic_dec(&d->ld_ref);
1077 }
1078 EXPORT_SYMBOL(lu_device_put);
1079
1080 /**
1081  * Initialize device \a d of type \a t.
1082  */
1083 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1084 {
1085         if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start)
1086                 t->ldt_ops->ldto_start(t);
1087         memset(d, 0, sizeof(*d));
1088         atomic_set(&d->ld_ref, 0);
1089         d->ld_type = t;
1090         lu_ref_init(&d->ld_reference);
1091         INIT_LIST_HEAD(&d->ld_linkage);
1092         return 0;
1093 }
1094 EXPORT_SYMBOL(lu_device_init);
1095
1096 /**
1097  * Finalize device \a d.
1098  */
1099 void lu_device_fini(struct lu_device *d)
1100 {
1101         struct lu_device_type *t;
1102
1103         t = d->ld_type;
1104         if (d->ld_obd) {
1105                 d->ld_obd->obd_lu_dev = NULL;
1106                 d->ld_obd = NULL;
1107         }
1108
1109         lu_ref_fini(&d->ld_reference);
1110         LASSERTF(atomic_read(&d->ld_ref) == 0,
1111                  "Refcount is %u\n", atomic_read(&d->ld_ref));
1112         LASSERT(t->ldt_device_nr > 0);
1113         if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop)
1114                 t->ldt_ops->ldto_stop(t);
1115 }
1116 EXPORT_SYMBOL(lu_device_fini);
1117
1118 /**
1119  * Initialize object \a o that is part of compound object \a h and was created
1120  * by device \a d.
1121  */
1122 int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1123                    struct lu_device *d)
1124 {
1125         memset(o, 0, sizeof(*o));
1126         o->lo_header = h;
1127         o->lo_dev = d;
1128         lu_device_get(d);
1129         lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
1130         INIT_LIST_HEAD(&o->lo_linkage);
1131
1132         return 0;
1133 }
1134 EXPORT_SYMBOL(lu_object_init);
1135
1136 /**
1137  * Finalize object and release its resources.
1138  */
1139 void lu_object_fini(struct lu_object *o)
1140 {
1141         struct lu_device *dev = o->lo_dev;
1142
1143         LASSERT(list_empty(&o->lo_linkage));
1144
1145         if (dev) {
1146                 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1147                               "lu_object", o);
1148                 lu_device_put(dev);
1149                 o->lo_dev = NULL;
1150         }
1151 }
1152 EXPORT_SYMBOL(lu_object_fini);
1153
1154 /**
1155  * Add object \a o as first layer of compound object \a h
1156  *
1157  * This is typically called by the ->ldo_object_alloc() method of top-level
1158  * device.
1159  */
1160 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1161 {
1162         list_move(&o->lo_linkage, &h->loh_layers);
1163 }
1164 EXPORT_SYMBOL(lu_object_add_top);
1165
1166 /**
1167  * Add object \a o as a layer of compound object, going after \a before.
1168  *
1169  * This is typically called by the ->ldo_object_alloc() method of \a
1170  * before->lo_dev.
1171  */
1172 void lu_object_add(struct lu_object *before, struct lu_object *o)
1173 {
1174         list_move(&o->lo_linkage, &before->lo_linkage);
1175 }
1176 EXPORT_SYMBOL(lu_object_add);
1177
1178 /**
1179  * Initialize compound object.
1180  */
1181 int lu_object_header_init(struct lu_object_header *h)
1182 {
1183         memset(h, 0, sizeof(*h));
1184         atomic_set(&h->loh_ref, 1);
1185         INIT_HLIST_NODE(&h->loh_hash);
1186         INIT_LIST_HEAD(&h->loh_lru);
1187         INIT_LIST_HEAD(&h->loh_layers);
1188         lu_ref_init(&h->loh_reference);
1189         return 0;
1190 }
1191 EXPORT_SYMBOL(lu_object_header_init);
1192
1193 /**
1194  * Finalize compound object.
1195  */
1196 void lu_object_header_fini(struct lu_object_header *h)
1197 {
1198         LASSERT(list_empty(&h->loh_layers));
1199         LASSERT(list_empty(&h->loh_lru));
1200         LASSERT(hlist_unhashed(&h->loh_hash));
1201         lu_ref_fini(&h->loh_reference);
1202 }
1203 EXPORT_SYMBOL(lu_object_header_fini);
1204
1205 /**
1206  * Given a compound object, find its slice, corresponding to the device type
1207  * \a dtype.
1208  */
1209 struct lu_object *lu_object_locate(struct lu_object_header *h,
1210                                    const struct lu_device_type *dtype)
1211 {
1212         struct lu_object *o;
1213
1214         list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1215                 if (o->lo_dev->ld_type == dtype)
1216                         return o;
1217         }
1218         return NULL;
1219 }
1220 EXPORT_SYMBOL(lu_object_locate);
1221
1222 /**
1223  * Finalize and free devices in the device stack.
1224  *
1225  * Finalize device stack by purging object cache, and calling
1226  * lu_device_type_operations::ldto_device_fini() and
1227  * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1228  */
1229 void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1230 {
1231         struct lu_site   *site = top->ld_site;
1232         struct lu_device *scan;
1233         struct lu_device *next;
1234
1235         lu_site_purge(env, site, ~0);
1236         for (scan = top; scan; scan = next) {
1237                 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1238                 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1239                 lu_device_put(scan);
1240         }
1241
1242         /* purge again. */
1243         lu_site_purge(env, site, ~0);
1244
1245         for (scan = top; scan; scan = next) {
1246                 const struct lu_device_type *ldt = scan->ld_type;
1247                 struct obd_type      *type;
1248
1249                 next = ldt->ldt_ops->ldto_device_free(env, scan);
1250                 type = ldt->ldt_obd_type;
1251                 if (type) {
1252                         type->typ_refcnt--;
1253                         class_put_type(type);
1254                 }
1255         }
1256 }
1257 EXPORT_SYMBOL(lu_stack_fini);
1258
1259 enum {
1260         /**
1261          * Maximal number of tld slots.
1262          */
1263         LU_CONTEXT_KEY_NR = 40
1264 };
1265
1266 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1267
1268 static DEFINE_SPINLOCK(lu_keys_guard);
1269
1270 /**
1271  * Global counter incremented whenever key is registered, unregistered,
1272  * revived or quiesced. This is used to void unnecessary calls to
1273  * lu_context_refill(). No locking is provided, as initialization and shutdown
1274  * are supposed to be externally serialized.
1275  */
1276 static unsigned key_set_version;
1277
1278 /**
1279  * Register new key.
1280  */
1281 int lu_context_key_register(struct lu_context_key *key)
1282 {
1283         int result;
1284         int i;
1285
1286         LASSERT(key->lct_init);
1287         LASSERT(key->lct_fini);
1288         LASSERT(key->lct_tags != 0);
1289
1290         result = -ENFILE;
1291         spin_lock(&lu_keys_guard);
1292         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1293                 if (!lu_keys[i]) {
1294                         key->lct_index = i;
1295                         atomic_set(&key->lct_used, 1);
1296                         lu_keys[i] = key;
1297                         lu_ref_init(&key->lct_reference);
1298                         result = 0;
1299                         ++key_set_version;
1300                         break;
1301                 }
1302         }
1303         spin_unlock(&lu_keys_guard);
1304         return result;
1305 }
1306 EXPORT_SYMBOL(lu_context_key_register);
1307
1308 static void key_fini(struct lu_context *ctx, int index)
1309 {
1310         if (ctx->lc_value && ctx->lc_value[index]) {
1311                 struct lu_context_key *key;
1312
1313                 key = lu_keys[index];
1314                 LASSERT(atomic_read(&key->lct_used) > 1);
1315
1316                 key->lct_fini(ctx, key, ctx->lc_value[index]);
1317                 lu_ref_del(&key->lct_reference, "ctx", ctx);
1318                 atomic_dec(&key->lct_used);
1319
1320                 if ((ctx->lc_tags & LCT_NOREF) == 0) {
1321 #ifdef CONFIG_MODULE_UNLOAD
1322                         LINVRNT(module_refcount(key->lct_owner) > 0);
1323 #endif
1324                         module_put(key->lct_owner);
1325                 }
1326                 ctx->lc_value[index] = NULL;
1327         }
1328 }
1329
1330 /**
1331  * Deregister key.
1332  */
1333 void lu_context_key_degister(struct lu_context_key *key)
1334 {
1335         LASSERT(atomic_read(&key->lct_used) >= 1);
1336         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1337
1338         lu_context_key_quiesce(key);
1339
1340         ++key_set_version;
1341         spin_lock(&lu_keys_guard);
1342         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1343         if (lu_keys[key->lct_index]) {
1344                 lu_keys[key->lct_index] = NULL;
1345                 lu_ref_fini(&key->lct_reference);
1346         }
1347         spin_unlock(&lu_keys_guard);
1348
1349         LASSERTF(atomic_read(&key->lct_used) == 1,
1350                  "key has instances: %d\n",
1351                  atomic_read(&key->lct_used));
1352 }
1353 EXPORT_SYMBOL(lu_context_key_degister);
1354
1355 /**
1356  * Register a number of keys. This has to be called after all keys have been
1357  * initialized by a call to LU_CONTEXT_KEY_INIT().
1358  */
1359 int lu_context_key_register_many(struct lu_context_key *k, ...)
1360 {
1361         struct lu_context_key *key = k;
1362         va_list args;
1363         int result;
1364
1365         va_start(args, k);
1366         do {
1367                 result = lu_context_key_register(key);
1368                 if (result)
1369                         break;
1370                 key = va_arg(args, struct lu_context_key *);
1371         } while (key);
1372         va_end(args);
1373
1374         if (result != 0) {
1375                 va_start(args, k);
1376                 while (k != key) {
1377                         lu_context_key_degister(k);
1378                         k = va_arg(args, struct lu_context_key *);
1379                 }
1380                 va_end(args);
1381         }
1382
1383         return result;
1384 }
1385 EXPORT_SYMBOL(lu_context_key_register_many);
1386
1387 /**
1388  * De-register a number of keys. This is a dual to
1389  * lu_context_key_register_many().
1390  */
1391 void lu_context_key_degister_many(struct lu_context_key *k, ...)
1392 {
1393         va_list args;
1394
1395         va_start(args, k);
1396         do {
1397                 lu_context_key_degister(k);
1398                 k = va_arg(args, struct lu_context_key*);
1399         } while (k);
1400         va_end(args);
1401 }
1402 EXPORT_SYMBOL(lu_context_key_degister_many);
1403
1404 /**
1405  * Revive a number of keys.
1406  */
1407 void lu_context_key_revive_many(struct lu_context_key *k, ...)
1408 {
1409         va_list args;
1410
1411         va_start(args, k);
1412         do {
1413                 lu_context_key_revive(k);
1414                 k = va_arg(args, struct lu_context_key*);
1415         } while (k);
1416         va_end(args);
1417 }
1418 EXPORT_SYMBOL(lu_context_key_revive_many);
1419
1420 /**
1421  * Quiescent a number of keys.
1422  */
1423 void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1424 {
1425         va_list args;
1426
1427         va_start(args, k);
1428         do {
1429                 lu_context_key_quiesce(k);
1430                 k = va_arg(args, struct lu_context_key*);
1431         } while (k);
1432         va_end(args);
1433 }
1434 EXPORT_SYMBOL(lu_context_key_quiesce_many);
1435
1436 /**
1437  * Return value associated with key \a key in context \a ctx.
1438  */
1439 void *lu_context_key_get(const struct lu_context *ctx,
1440                          const struct lu_context_key *key)
1441 {
1442         LINVRNT(ctx->lc_state == LCS_ENTERED);
1443         LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1444         LASSERT(lu_keys[key->lct_index] == key);
1445         return ctx->lc_value[key->lct_index];
1446 }
1447 EXPORT_SYMBOL(lu_context_key_get);
1448
1449 /**
1450  * List of remembered contexts. XXX document me.
1451  */
1452 static LIST_HEAD(lu_context_remembered);
1453
1454 /**
1455  * Destroy \a key in all remembered contexts. This is used to destroy key
1456  * values in "shared" contexts (like service threads), when a module owning
1457  * the key is about to be unloaded.
1458  */
1459 void lu_context_key_quiesce(struct lu_context_key *key)
1460 {
1461         struct lu_context *ctx;
1462
1463         if (!(key->lct_tags & LCT_QUIESCENT)) {
1464                 /*
1465                  * XXX layering violation.
1466                  */
1467                 cl_env_cache_purge(~0);
1468                 key->lct_tags |= LCT_QUIESCENT;
1469                 /*
1470                  * XXX memory barrier has to go here.
1471                  */
1472                 spin_lock(&lu_keys_guard);
1473                 list_for_each_entry(ctx, &lu_context_remembered, lc_remember)
1474                         key_fini(ctx, key->lct_index);
1475                 spin_unlock(&lu_keys_guard);
1476                 ++key_set_version;
1477         }
1478 }
1479 EXPORT_SYMBOL(lu_context_key_quiesce);
1480
1481 void lu_context_key_revive(struct lu_context_key *key)
1482 {
1483         key->lct_tags &= ~LCT_QUIESCENT;
1484         ++key_set_version;
1485 }
1486 EXPORT_SYMBOL(lu_context_key_revive);
1487
1488 static void keys_fini(struct lu_context *ctx)
1489 {
1490         int     i;
1491
1492         if (!ctx->lc_value)
1493                 return;
1494
1495         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1496                 key_fini(ctx, i);
1497
1498         kfree(ctx->lc_value);
1499         ctx->lc_value = NULL;
1500 }
1501
1502 static int keys_fill(struct lu_context *ctx)
1503 {
1504         int i;
1505
1506         LINVRNT(ctx->lc_value);
1507         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1508                 struct lu_context_key *key;
1509
1510                 key = lu_keys[i];
1511                 if (!ctx->lc_value[i] && key &&
1512                     (key->lct_tags & ctx->lc_tags) &&
1513                     /*
1514                      * Don't create values for a LCT_QUIESCENT key, as this
1515                      * will pin module owning a key.
1516                      */
1517                     !(key->lct_tags & LCT_QUIESCENT)) {
1518                         void *value;
1519
1520                         LINVRNT(key->lct_init);
1521                         LINVRNT(key->lct_index == i);
1522
1523                         value = key->lct_init(ctx, key);
1524                         if (IS_ERR(value))
1525                                 return PTR_ERR(value);
1526
1527                         if (!(ctx->lc_tags & LCT_NOREF))
1528                                 try_module_get(key->lct_owner);
1529                         lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1530                         atomic_inc(&key->lct_used);
1531                         /*
1532                          * This is the only place in the code, where an
1533                          * element of ctx->lc_value[] array is set to non-NULL
1534                          * value.
1535                          */
1536                         ctx->lc_value[i] = value;
1537                         if (key->lct_exit)
1538                                 ctx->lc_tags |= LCT_HAS_EXIT;
1539                 }
1540                 ctx->lc_version = key_set_version;
1541         }
1542         return 0;
1543 }
1544
1545 static int keys_init(struct lu_context *ctx)
1546 {
1547         ctx->lc_value = kcalloc(ARRAY_SIZE(lu_keys), sizeof(ctx->lc_value[0]),
1548                                 GFP_NOFS);
1549         if (likely(ctx->lc_value))
1550                 return keys_fill(ctx);
1551
1552         return -ENOMEM;
1553 }
1554
1555 /**
1556  * Initialize context data-structure. Create values for all keys.
1557  */
1558 int lu_context_init(struct lu_context *ctx, __u32 tags)
1559 {
1560         int     rc;
1561
1562         memset(ctx, 0, sizeof(*ctx));
1563         ctx->lc_state = LCS_INITIALIZED;
1564         ctx->lc_tags = tags;
1565         if (tags & LCT_REMEMBER) {
1566                 spin_lock(&lu_keys_guard);
1567                 list_add(&ctx->lc_remember, &lu_context_remembered);
1568                 spin_unlock(&lu_keys_guard);
1569         } else {
1570                 INIT_LIST_HEAD(&ctx->lc_remember);
1571         }
1572
1573         rc = keys_init(ctx);
1574         if (rc != 0)
1575                 lu_context_fini(ctx);
1576
1577         return rc;
1578 }
1579 EXPORT_SYMBOL(lu_context_init);
1580
1581 /**
1582  * Finalize context data-structure. Destroy key values.
1583  */
1584 void lu_context_fini(struct lu_context *ctx)
1585 {
1586         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1587         ctx->lc_state = LCS_FINALIZED;
1588
1589         if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1590                 LASSERT(list_empty(&ctx->lc_remember));
1591                 keys_fini(ctx);
1592
1593         } else { /* could race with key degister */
1594                 spin_lock(&lu_keys_guard);
1595                 keys_fini(ctx);
1596                 list_del_init(&ctx->lc_remember);
1597                 spin_unlock(&lu_keys_guard);
1598         }
1599 }
1600 EXPORT_SYMBOL(lu_context_fini);
1601
1602 /**
1603  * Called before entering context.
1604  */
1605 void lu_context_enter(struct lu_context *ctx)
1606 {
1607         LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1608         ctx->lc_state = LCS_ENTERED;
1609 }
1610 EXPORT_SYMBOL(lu_context_enter);
1611
1612 /**
1613  * Called after exiting from \a ctx
1614  */
1615 void lu_context_exit(struct lu_context *ctx)
1616 {
1617         int i;
1618
1619         LINVRNT(ctx->lc_state == LCS_ENTERED);
1620         ctx->lc_state = LCS_LEFT;
1621         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
1622                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1623                         if (ctx->lc_value[i]) {
1624                                 struct lu_context_key *key;
1625
1626                                 key = lu_keys[i];
1627                                 if (key->lct_exit)
1628                                         key->lct_exit(ctx,
1629                                                       key, ctx->lc_value[i]);
1630                         }
1631                 }
1632         }
1633 }
1634 EXPORT_SYMBOL(lu_context_exit);
1635
1636 /**
1637  * Allocate for context all missing keys that were registered after context
1638  * creation. key_set_version is only changed in rare cases when modules
1639  * are loaded and removed.
1640  */
1641 int lu_context_refill(struct lu_context *ctx)
1642 {
1643         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1644 }
1645 EXPORT_SYMBOL(lu_context_refill);
1646
1647 /**
1648  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1649  * obd being added. Currently, this is only used on client side, specifically
1650  * for echo device client, for other stack (like ptlrpc threads), context are
1651  * predefined when the lu_device type are registered, during the module probe
1652  * phase.
1653  */
1654 __u32 lu_context_tags_default;
1655 __u32 lu_session_tags_default;
1656
1657 int lu_env_init(struct lu_env *env, __u32 tags)
1658 {
1659         int result;
1660
1661         env->le_ses = NULL;
1662         result = lu_context_init(&env->le_ctx, tags);
1663         if (likely(result == 0))
1664                 lu_context_enter(&env->le_ctx);
1665         return result;
1666 }
1667 EXPORT_SYMBOL(lu_env_init);
1668
1669 void lu_env_fini(struct lu_env *env)
1670 {
1671         lu_context_exit(&env->le_ctx);
1672         lu_context_fini(&env->le_ctx);
1673         env->le_ses = NULL;
1674 }
1675 EXPORT_SYMBOL(lu_env_fini);
1676
1677 int lu_env_refill(struct lu_env *env)
1678 {
1679         int result;
1680
1681         result = lu_context_refill(&env->le_ctx);
1682         if (result == 0 && env->le_ses)
1683                 result = lu_context_refill(env->le_ses);
1684         return result;
1685 }
1686 EXPORT_SYMBOL(lu_env_refill);
1687
1688 struct lu_site_stats {
1689         unsigned        lss_populated;
1690         unsigned        lss_max_search;
1691         unsigned        lss_total;
1692         unsigned        lss_busy;
1693 };
1694
1695 static void lu_site_stats_get(struct cfs_hash *hs,
1696                               struct lu_site_stats *stats, int populated)
1697 {
1698         struct cfs_hash_bd bd;
1699         int        i;
1700
1701         cfs_hash_for_each_bucket(hs, &bd, i) {
1702                 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1703                 struct hlist_head       *hhead;
1704
1705                 cfs_hash_bd_lock(hs, &bd, 1);
1706                 stats->lss_busy  +=
1707                         cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
1708                 stats->lss_total += cfs_hash_bd_count_get(&bd);
1709                 stats->lss_max_search = max((int)stats->lss_max_search,
1710                                             cfs_hash_bd_depmax_get(&bd));
1711                 if (!populated) {
1712                         cfs_hash_bd_unlock(hs, &bd, 1);
1713                         continue;
1714                 }
1715
1716                 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1717                         if (!hlist_empty(hhead))
1718                                 stats->lss_populated++;
1719                 }
1720                 cfs_hash_bd_unlock(hs, &bd, 1);
1721         }
1722 }
1723
1724 /*
1725  * lu_cache_shrink_count returns the number of cached objects that are
1726  * candidates to be freed by shrink_slab(). A counter, which tracks
1727  * the number of items in the site's lru, is maintained in the per cpu
1728  * stats of each site. The counter is incremented when an object is added
1729  * to a site's lru and decremented when one is removed. The number of
1730  * free-able objects is the sum of all per cpu counters for all sites.
1731  *
1732  * Using a per cpu counter is a compromise solution to concurrent access:
1733  * lu_object_put() can update the counter without locking the site and
1734  * lu_cache_shrink_count can sum the counters without locking each
1735  * ls_obj_hash bucket.
1736  */
1737 static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1738                                            struct shrink_control *sc)
1739 {
1740         struct lu_site *s;
1741         struct lu_site *tmp;
1742         unsigned long cached = 0;
1743
1744         if (!(sc->gfp_mask & __GFP_FS))
1745                 return 0;
1746
1747         mutex_lock(&lu_sites_guard);
1748         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1749                 cached += ls_stats_read(s->ls_stats, LU_SS_LRU_LEN);
1750         }
1751         mutex_unlock(&lu_sites_guard);
1752
1753         cached = (cached / 100) * sysctl_vfs_cache_pressure;
1754         CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
1755                cached, sysctl_vfs_cache_pressure);
1756
1757         return cached;
1758 }
1759
1760 static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1761                                           struct shrink_control *sc)
1762 {
1763         struct lu_site *s;
1764         struct lu_site *tmp;
1765         unsigned long remain = sc->nr_to_scan, freed = 0;
1766         LIST_HEAD(splice);
1767
1768         if (!(sc->gfp_mask & __GFP_FS))
1769                 /* We must not take the lu_sites_guard lock when
1770                  * __GFP_FS is *not* set because of the deadlock
1771                  * possibility detailed above. Additionally,
1772                  * since we cannot determine the number of
1773                  * objects in the cache without taking this
1774                  * lock, we're in a particularly tough spot. As
1775                  * a result, we'll just lie and say our cache is
1776                  * empty. This _should_ be ok, as we can't
1777                  * reclaim objects when __GFP_FS is *not* set
1778                  * anyways.
1779                  */
1780                 return SHRINK_STOP;
1781
1782         mutex_lock(&lu_sites_guard);
1783         list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1784                 freed = lu_site_purge(&lu_shrink_env, s, remain);
1785                 remain -= freed;
1786                 /*
1787                  * Move just shrunk site to the tail of site list to
1788                  * assure shrinking fairness.
1789                  */
1790                 list_move_tail(&s->ls_linkage, &splice);
1791         }
1792         list_splice(&splice, lu_sites.prev);
1793         mutex_unlock(&lu_sites_guard);
1794
1795         return sc->nr_to_scan - remain;
1796 }
1797
1798 /**
1799  * Debugging printer function using printk().
1800  */
1801 static struct shrinker lu_site_shrinker = {
1802         .count_objects  = lu_cache_shrink_count,
1803         .scan_objects   = lu_cache_shrink_scan,
1804         .seeks          = DEFAULT_SEEKS,
1805 };
1806
1807 /**
1808  * Initialization of global lu_* data.
1809  */
1810 int lu_global_init(void)
1811 {
1812         int result;
1813
1814         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1815
1816         result = lu_ref_global_init();
1817         if (result != 0)
1818                 return result;
1819
1820         LU_CONTEXT_KEY_INIT(&lu_global_key);
1821         result = lu_context_key_register(&lu_global_key);
1822         if (result != 0)
1823                 return result;
1824
1825         /*
1826          * At this level, we don't know what tags are needed, so allocate them
1827          * conservatively. This should not be too bad, because this
1828          * environment is global.
1829          */
1830         mutex_lock(&lu_sites_guard);
1831         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1832         mutex_unlock(&lu_sites_guard);
1833         if (result != 0)
1834                 return result;
1835
1836         /*
1837          * seeks estimation: 3 seeks to read a record from oi, one to read
1838          * inode, one for ea. Unfortunately setting this high value results in
1839          * lu_object/inode cache consuming all the memory.
1840          */
1841         register_shrinker(&lu_site_shrinker);
1842
1843         return result;
1844 }
1845
1846 /**
1847  * Dual to lu_global_init().
1848  */
1849 void lu_global_fini(void)
1850 {
1851         unregister_shrinker(&lu_site_shrinker);
1852         lu_context_key_degister(&lu_global_key);
1853
1854         /*
1855          * Tear shrinker environment down _after_ de-registering
1856          * lu_global_key, because the latter has a value in the former.
1857          */
1858         mutex_lock(&lu_sites_guard);
1859         lu_env_fini(&lu_shrink_env);
1860         mutex_unlock(&lu_sites_guard);
1861
1862         lu_ref_global_fini();
1863 }
1864
1865 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1866 {
1867         struct lprocfs_counter ret;
1868
1869         lprocfs_stats_collect(stats, idx, &ret);
1870         if (idx == LU_SS_LRU_LEN)
1871                 /*
1872                  * protect against counter on cpu A being decremented
1873                  * before counter is incremented on cpu B; unlikely
1874                  */
1875                 return (__u32)((ret.lc_sum > 0) ? ret.lc_sum : 0);
1876
1877         return (__u32)ret.lc_count;
1878 }
1879
1880 /**
1881  * Output site statistical counters into a buffer. Suitable for
1882  * lprocfs_rd_*()-style functions.
1883  */
1884 int lu_site_stats_print(const struct lu_site *s, struct seq_file *m)
1885 {
1886         struct lu_site_stats stats;
1887
1888         memset(&stats, 0, sizeof(stats));
1889         lu_site_stats_get(s->ls_obj_hash, &stats, 1);
1890
1891         seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d %d\n",
1892                    stats.lss_busy,
1893                    stats.lss_total,
1894                    stats.lss_populated,
1895                    CFS_HASH_NHLIST(s->ls_obj_hash),
1896                    stats.lss_max_search,
1897                    ls_stats_read(s->ls_stats, LU_SS_CREATED),
1898                    ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
1899                    ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
1900                    ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
1901                    ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
1902                    ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED),
1903                    ls_stats_read(s->ls_stats, LU_SS_LRU_LEN));
1904         return 0;
1905 }
1906 EXPORT_SYMBOL(lu_site_stats_print);
1907
1908 /**
1909  * Helper function to initialize a number of kmem slab caches at once.
1910  */
1911 int lu_kmem_init(struct lu_kmem_descr *caches)
1912 {
1913         int result;
1914         struct lu_kmem_descr *iter = caches;
1915
1916         for (result = 0; iter->ckd_cache; ++iter) {
1917                 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
1918                                                         iter->ckd_size,
1919                                                         0, 0, NULL);
1920                 if (!*iter->ckd_cache) {
1921                         result = -ENOMEM;
1922                         /* free all previously allocated caches */
1923                         lu_kmem_fini(caches);
1924                         break;
1925                 }
1926         }
1927         return result;
1928 }
1929 EXPORT_SYMBOL(lu_kmem_init);
1930
1931 /**
1932  * Helper function to finalize a number of kmem slab cached at once. Dual to
1933  * lu_kmem_init().
1934  */
1935 void lu_kmem_fini(struct lu_kmem_descr *caches)
1936 {
1937         for (; caches->ckd_cache; ++caches) {
1938                 kmem_cache_destroy(*caches->ckd_cache);
1939                 *caches->ckd_cache = NULL;
1940         }
1941 }
1942 EXPORT_SYMBOL(lu_kmem_fini);