Merge remote-tracking branches 'spi/topic/octeon', 'spi/topic/omap2-mcspi', 'spi...
[cascardo/linux.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49
50 #include "echo_internal.h"
51
52 /** \defgroup echo_client Echo Client
53  * @{
54  */
55
56 struct echo_device {
57         struct cl_device        ed_cl;
58         struct echo_client_obd *ed_ec;
59
60         struct cl_site    ed_site_myself;
61         struct cl_site   *ed_site;
62         struct lu_device       *ed_next;
63 };
64
65 struct echo_object {
66         struct cl_object        eo_cl;
67         struct cl_object_header eo_hdr;
68
69         struct echo_device     *eo_dev;
70         struct list_head              eo_obj_chain;
71         struct lov_stripe_md   *eo_lsm;
72         atomic_t            eo_npages;
73         int                  eo_deleted;
74 };
75
76 struct echo_object_conf {
77         struct cl_object_conf  eoc_cl;
78         struct lov_stripe_md **eoc_md;
79 };
80
81 struct echo_page {
82         struct cl_page_slice   ep_cl;
83         struct mutex            ep_lock;
84         struct page         *ep_vmpage;
85 };
86
87 struct echo_lock {
88         struct cl_lock_slice   el_cl;
89         struct list_head             el_chain;
90         struct echo_object    *el_object;
91         __u64             el_cookie;
92         atomic_t           el_refcount;
93 };
94
95 static int echo_client_setup(const struct lu_env *env,
96                              struct obd_device *obddev,
97                              struct lustre_cfg *lcfg);
98 static int echo_client_cleanup(struct obd_device *obddev);
99
100 /** \defgroup echo_helpers Helper functions
101  * @{
102  */
103 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
104 {
105         return container_of0(dev, struct echo_device, ed_cl);
106 }
107
108 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
109 {
110         return &d->ed_cl;
111 }
112
113 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
114 {
115         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
116 }
117
118 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
119 {
120         return &eco->eo_cl;
121 }
122
123 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
124 {
125         return container_of(o, struct echo_object, eo_cl);
126 }
127
128 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
129 {
130         return container_of(s, struct echo_page, ep_cl);
131 }
132
133 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
134 {
135         return container_of(s, struct echo_lock, el_cl);
136 }
137
138 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
139 {
140         return ecl->el_cl.cls_lock;
141 }
142
143 static struct lu_context_key echo_thread_key;
144 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
145 {
146         struct echo_thread_info *info;
147
148         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
149         LASSERT(info);
150         return info;
151 }
152
153 static inline
154 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
155 {
156         return container_of(c, struct echo_object_conf, eoc_cl);
157 }
158
159 /** @} echo_helpers */
160
161 static struct echo_object *cl_echo_object_find(struct echo_device *d,
162                                                struct lov_stripe_md **lsm);
163 static int cl_echo_object_put(struct echo_object *eco);
164 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
165                               struct page **pages, int npages, int async);
166
167 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
168
169 struct echo_thread_info {
170         struct echo_object_conf eti_conf;
171         struct lustre_md        eti_md;
172
173         struct cl_2queue        eti_queue;
174         struct cl_io        eti_io;
175         struct cl_lock_descr    eti_descr;
176         struct lu_fid      eti_fid;
177         struct lu_fid           eti_fid2;
178 };
179
180 /* No session used right now */
181 struct echo_session_info {
182         unsigned long dummy;
183 };
184
185 static struct kmem_cache *echo_lock_kmem;
186 static struct kmem_cache *echo_object_kmem;
187 static struct kmem_cache *echo_thread_kmem;
188 static struct kmem_cache *echo_session_kmem;
189
190 static struct lu_kmem_descr echo_caches[] = {
191         {
192                 .ckd_cache = &echo_lock_kmem,
193                 .ckd_name  = "echo_lock_kmem",
194                 .ckd_size  = sizeof(struct echo_lock)
195         },
196         {
197                 .ckd_cache = &echo_object_kmem,
198                 .ckd_name  = "echo_object_kmem",
199                 .ckd_size  = sizeof(struct echo_object)
200         },
201         {
202                 .ckd_cache = &echo_thread_kmem,
203                 .ckd_name  = "echo_thread_kmem",
204                 .ckd_size  = sizeof(struct echo_thread_info)
205         },
206         {
207                 .ckd_cache = &echo_session_kmem,
208                 .ckd_name  = "echo_session_kmem",
209                 .ckd_size  = sizeof(struct echo_session_info)
210         },
211         {
212                 .ckd_cache = NULL
213         }
214 };
215
216 /** \defgroup echo_page Page operations
217  *
218  * Echo page operations.
219  *
220  * @{
221  */
222 static struct page *echo_page_vmpage(const struct lu_env *env,
223                                      const struct cl_page_slice *slice)
224 {
225         return cl2echo_page(slice)->ep_vmpage;
226 }
227
228 static int echo_page_own(const struct lu_env *env,
229                          const struct cl_page_slice *slice,
230                          struct cl_io *io, int nonblock)
231 {
232         struct echo_page *ep = cl2echo_page(slice);
233
234         if (!nonblock)
235                 mutex_lock(&ep->ep_lock);
236         else if (!mutex_trylock(&ep->ep_lock))
237                 return -EAGAIN;
238         return 0;
239 }
240
241 static void echo_page_disown(const struct lu_env *env,
242                              const struct cl_page_slice *slice,
243                              struct cl_io *io)
244 {
245         struct echo_page *ep = cl2echo_page(slice);
246
247         LASSERT(mutex_is_locked(&ep->ep_lock));
248         mutex_unlock(&ep->ep_lock);
249 }
250
251 static void echo_page_discard(const struct lu_env *env,
252                               const struct cl_page_slice *slice,
253                               struct cl_io *unused)
254 {
255         cl_page_delete(env, slice->cpl_page);
256 }
257
258 static int echo_page_is_vmlocked(const struct lu_env *env,
259                                  const struct cl_page_slice *slice)
260 {
261         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
262                 return -EBUSY;
263         return -ENODATA;
264 }
265
266 static void echo_page_completion(const struct lu_env *env,
267                                  const struct cl_page_slice *slice,
268                                  int ioret)
269 {
270         LASSERT(slice->cpl_page->cp_sync_io);
271 }
272
273 static void echo_page_fini(const struct lu_env *env,
274                            struct cl_page_slice *slice)
275 {
276         struct echo_page *ep    = cl2echo_page(slice);
277         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
278         struct page *vmpage      = ep->ep_vmpage;
279
280         atomic_dec(&eco->eo_npages);
281         put_page(vmpage);
282 }
283
284 static int echo_page_prep(const struct lu_env *env,
285                           const struct cl_page_slice *slice,
286                           struct cl_io *unused)
287 {
288         return 0;
289 }
290
291 static int echo_page_print(const struct lu_env *env,
292                            const struct cl_page_slice *slice,
293                            void *cookie, lu_printer_t printer)
294 {
295         struct echo_page *ep = cl2echo_page(slice);
296
297         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
298                    ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
299         return 0;
300 }
301
302 static const struct cl_page_operations echo_page_ops = {
303         .cpo_own           = echo_page_own,
304         .cpo_disown     = echo_page_disown,
305         .cpo_discard       = echo_page_discard,
306         .cpo_vmpage     = echo_page_vmpage,
307         .cpo_fini         = echo_page_fini,
308         .cpo_print       = echo_page_print,
309         .cpo_is_vmlocked   = echo_page_is_vmlocked,
310         .io = {
311                 [CRT_READ] = {
312                         .cpo_prep       = echo_page_prep,
313                         .cpo_completion  = echo_page_completion,
314                 },
315                 [CRT_WRITE] = {
316                         .cpo_prep       = echo_page_prep,
317                         .cpo_completion  = echo_page_completion,
318                 }
319         }
320 };
321
322 /** @} echo_page */
323
324 /** \defgroup echo_lock Locking
325  *
326  * echo lock operations
327  *
328  * @{
329  */
330 static void echo_lock_fini(const struct lu_env *env,
331                            struct cl_lock_slice *slice)
332 {
333         struct echo_lock *ecl = cl2echo_lock(slice);
334
335         LASSERT(list_empty(&ecl->el_chain));
336         kmem_cache_free(echo_lock_kmem, ecl);
337 }
338
339 static void echo_lock_delete(const struct lu_env *env,
340                              const struct cl_lock_slice *slice)
341 {
342         struct echo_lock *ecl      = cl2echo_lock(slice);
343
344         LASSERT(list_empty(&ecl->el_chain));
345 }
346
347 static int echo_lock_fits_into(const struct lu_env *env,
348                                const struct cl_lock_slice *slice,
349                                const struct cl_lock_descr *need,
350                                const struct cl_io *unused)
351 {
352         return 1;
353 }
354
355 static struct cl_lock_operations echo_lock_ops = {
356         .clo_fini      = echo_lock_fini,
357         .clo_delete    = echo_lock_delete,
358         .clo_fits_into = echo_lock_fits_into
359 };
360
361 /** @} echo_lock */
362
363 /** \defgroup echo_cl_ops cl_object operations
364  *
365  * operations for cl_object
366  *
367  * @{
368  */
369 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
370                           struct cl_page *page, struct page *vmpage)
371 {
372         struct echo_page *ep = cl_object_page_slice(obj, page);
373         struct echo_object *eco = cl2echo_obj(obj);
374
375         ep->ep_vmpage = vmpage;
376         get_page(vmpage);
377         mutex_init(&ep->ep_lock);
378         cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
379         atomic_inc(&eco->eo_npages);
380         return 0;
381 }
382
383 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
384                         struct cl_io *io)
385 {
386         return 0;
387 }
388
389 static int echo_lock_init(const struct lu_env *env,
390                           struct cl_object *obj, struct cl_lock *lock,
391                           const struct cl_io *unused)
392 {
393         struct echo_lock *el;
394
395         el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
396         if (el) {
397                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
398                 el->el_object = cl2echo_obj(obj);
399                 INIT_LIST_HEAD(&el->el_chain);
400                 atomic_set(&el->el_refcount, 0);
401         }
402         return !el ? -ENOMEM : 0;
403 }
404
405 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
406                          const struct cl_object_conf *conf)
407 {
408         return 0;
409 }
410
411 static const struct cl_object_operations echo_cl_obj_ops = {
412         .coo_page_init = echo_page_init,
413         .coo_lock_init = echo_lock_init,
414         .coo_io_init   = echo_io_init,
415         .coo_conf_set  = echo_conf_set
416 };
417
418 /** @} echo_cl_ops */
419
420 /** \defgroup echo_lu_ops lu_object operations
421  *
422  * operations for echo lu object.
423  *
424  * @{
425  */
426 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
427                             const struct lu_object_conf *conf)
428 {
429         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
430         struct echo_client_obd *ec     = ed->ed_ec;
431         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
432         const struct cl_object_conf *cconf;
433         struct echo_object_conf *econf;
434
435         if (ed->ed_next) {
436                 struct lu_object  *below;
437                 struct lu_device  *under;
438
439                 under = ed->ed_next;
440                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
441                                                         under);
442                 if (!below)
443                         return -ENOMEM;
444                 lu_object_add(obj, below);
445         }
446
447         cconf = lu2cl_conf(conf);
448         econf = cl2echo_conf(cconf);
449
450         LASSERT(econf->eoc_md);
451         eco->eo_lsm = *econf->eoc_md;
452         /* clear the lsm pointer so that it won't get freed. */
453         *econf->eoc_md = NULL;
454
455         eco->eo_dev = ed;
456         atomic_set(&eco->eo_npages, 0);
457         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
458
459         spin_lock(&ec->ec_lock);
460         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
461         spin_unlock(&ec->ec_lock);
462
463         return 0;
464 }
465
466 /* taken from osc_unpackmd() */
467 static int echo_alloc_memmd(struct echo_device *ed,
468                             struct lov_stripe_md **lsmp)
469 {
470         int lsm_size;
471
472         /* If export is lov/osc then use their obd method */
473         if (ed->ed_next)
474                 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
475         /* OFD has no unpackmd method, do everything here */
476         lsm_size = lov_stripe_md_size(1);
477
478         LASSERT(!*lsmp);
479         *lsmp = kzalloc(lsm_size, GFP_NOFS);
480         if (!*lsmp)
481                 return -ENOMEM;
482
483         (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
484         if (!(*lsmp)->lsm_oinfo[0]) {
485                 kfree(*lsmp);
486                 return -ENOMEM;
487         }
488
489         loi_init((*lsmp)->lsm_oinfo[0]);
490         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
491         ostid_set_seq_echo(&(*lsmp)->lsm_oi);
492
493         return lsm_size;
494 }
495
496 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
497 {
498         int lsm_size;
499
500         /* If export is lov/osc then use their obd method */
501         if (ed->ed_next)
502                 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
503         /* OFD has no unpackmd method, do everything here */
504         lsm_size = lov_stripe_md_size(1);
505
506         kfree((*lsmp)->lsm_oinfo[0]);
507         kfree(*lsmp);
508         *lsmp = NULL;
509         return 0;
510 }
511
512 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
513 {
514         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
515         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
516
517         LASSERT(atomic_read(&eco->eo_npages) == 0);
518
519         spin_lock(&ec->ec_lock);
520         list_del_init(&eco->eo_obj_chain);
521         spin_unlock(&ec->ec_lock);
522
523         lu_object_fini(obj);
524         lu_object_header_fini(obj->lo_header);
525
526         if (eco->eo_lsm)
527                 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
528         kmem_cache_free(echo_object_kmem, eco);
529 }
530
531 static int echo_object_print(const struct lu_env *env, void *cookie,
532                              lu_printer_t p, const struct lu_object *o)
533 {
534         struct echo_object *obj = cl2echo_obj(lu2cl(o));
535
536         return (*p)(env, cookie, "echoclient-object@%p", obj);
537 }
538
539 static const struct lu_object_operations echo_lu_obj_ops = {
540         .loo_object_init      = echo_object_init,
541         .loo_object_delete    = NULL,
542         .loo_object_release   = NULL,
543         .loo_object_free      = echo_object_free,
544         .loo_object_print     = echo_object_print,
545         .loo_object_invariant = NULL
546 };
547
548 /** @} echo_lu_ops */
549
550 /** \defgroup echo_lu_dev_ops  lu_device operations
551  *
552  * Operations for echo lu device.
553  *
554  * @{
555  */
556 static struct lu_object *echo_object_alloc(const struct lu_env *env,
557                                            const struct lu_object_header *hdr,
558                                            struct lu_device *dev)
559 {
560         struct echo_object *eco;
561         struct lu_object *obj = NULL;
562
563         /* we're the top dev. */
564         LASSERT(!hdr);
565         eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
566         if (eco) {
567                 struct cl_object_header *hdr = &eco->eo_hdr;
568
569                 obj = &echo_obj2cl(eco)->co_lu;
570                 cl_object_header_init(hdr);
571                 lu_object_init(obj, &hdr->coh_lu, dev);
572                 lu_object_add_top(&hdr->coh_lu, obj);
573
574                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
575                 obj->lo_ops       = &echo_lu_obj_ops;
576         }
577         return obj;
578 }
579
580 static const struct lu_device_operations echo_device_lu_ops = {
581         .ldo_object_alloc   = echo_object_alloc,
582 };
583
584 /** @} echo_lu_dev_ops */
585
586 static const struct cl_device_operations echo_device_cl_ops = {
587 };
588
589 /** \defgroup echo_init Setup and teardown
590  *
591  * Init and fini functions for echo client.
592  *
593  * @{
594  */
595 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
596 {
597         struct cl_site *site = &ed->ed_site_myself;
598         int rc;
599
600         /* initialize site */
601         rc = cl_site_init(site, &ed->ed_cl);
602         if (rc) {
603                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
604                 return rc;
605         }
606
607         rc = lu_site_init_finish(&site->cs_lu);
608         if (rc)
609                 return rc;
610
611         ed->ed_site = site;
612         return 0;
613 }
614
615 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
616 {
617         if (ed->ed_site) {
618                 cl_site_fini(ed->ed_site);
619                 ed->ed_site = NULL;
620         }
621 }
622
623 static void *echo_thread_key_init(const struct lu_context *ctx,
624                                   struct lu_context_key *key)
625 {
626         struct echo_thread_info *info;
627
628         info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
629         if (!info)
630                 info = ERR_PTR(-ENOMEM);
631         return info;
632 }
633
634 static void echo_thread_key_fini(const struct lu_context *ctx,
635                                  struct lu_context_key *key, void *data)
636 {
637         struct echo_thread_info *info = data;
638
639         kmem_cache_free(echo_thread_kmem, info);
640 }
641
642 static void echo_thread_key_exit(const struct lu_context *ctx,
643                                  struct lu_context_key *key, void *data)
644 {
645 }
646
647 static struct lu_context_key echo_thread_key = {
648         .lct_tags = LCT_CL_THREAD,
649         .lct_init = echo_thread_key_init,
650         .lct_fini = echo_thread_key_fini,
651         .lct_exit = echo_thread_key_exit
652 };
653
654 static void *echo_session_key_init(const struct lu_context *ctx,
655                                    struct lu_context_key *key)
656 {
657         struct echo_session_info *session;
658
659         session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
660         if (!session)
661                 session = ERR_PTR(-ENOMEM);
662         return session;
663 }
664
665 static void echo_session_key_fini(const struct lu_context *ctx,
666                                   struct lu_context_key *key, void *data)
667 {
668         struct echo_session_info *session = data;
669
670         kmem_cache_free(echo_session_kmem, session);
671 }
672
673 static void echo_session_key_exit(const struct lu_context *ctx,
674                                   struct lu_context_key *key, void *data)
675 {
676 }
677
678 static struct lu_context_key echo_session_key = {
679         .lct_tags = LCT_SESSION,
680         .lct_init = echo_session_key_init,
681         .lct_fini = echo_session_key_fini,
682         .lct_exit = echo_session_key_exit
683 };
684
685 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
686
687 static struct lu_device *echo_device_alloc(const struct lu_env *env,
688                                            struct lu_device_type *t,
689                                            struct lustre_cfg *cfg)
690 {
691         struct lu_device   *next;
692         struct echo_device *ed;
693         struct cl_device   *cd;
694         struct obd_device  *obd = NULL; /* to keep compiler happy */
695         struct obd_device  *tgt;
696         const char *tgt_type_name;
697         int rc;
698         int cleanup = 0;
699
700         ed = kzalloc(sizeof(*ed), GFP_NOFS);
701         if (!ed) {
702                 rc = -ENOMEM;
703                 goto out;
704         }
705
706         cleanup = 1;
707         cd = &ed->ed_cl;
708         rc = cl_device_init(cd, t);
709         if (rc)
710                 goto out;
711
712         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
713         cd->cd_ops = &echo_device_cl_ops;
714
715         cleanup = 2;
716         obd = class_name2obd(lustre_cfg_string(cfg, 0));
717         LASSERT(obd);
718         LASSERT(env);
719
720         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
721         if (!tgt) {
722                 CERROR("Can not find tgt device %s\n",
723                        lustre_cfg_string(cfg, 1));
724                 rc = -ENODEV;
725                 goto out;
726         }
727
728         next = tgt->obd_lu_dev;
729         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
730                 CERROR("echo MDT client must be run on server\n");
731                 rc = -EOPNOTSUPP;
732                 goto out;
733         }
734
735         rc = echo_site_init(env, ed);
736         if (rc)
737                 goto out;
738
739         cleanup = 3;
740
741         rc = echo_client_setup(env, obd, cfg);
742         if (rc)
743                 goto out;
744
745         ed->ed_ec = &obd->u.echo_client;
746         cleanup = 4;
747
748         /* if echo client is to be stacked upon ost device, the next is
749          * NULL since ost is not a clio device so far
750          */
751         if (next && !lu_device_is_cl(next))
752                 next = NULL;
753
754         tgt_type_name = tgt->obd_type->typ_name;
755         if (next) {
756                 if (next->ld_site) {
757                         rc = -EBUSY;
758                         goto out;
759                 }
760
761                 next->ld_site = &ed->ed_site->cs_lu;
762                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
763                                                 next->ld_type->ldt_name,
764                                                               NULL);
765                 if (rc)
766                         goto out;
767
768         } else {
769                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
770         }
771
772         ed->ed_next = next;
773         return &cd->cd_lu_dev;
774 out:
775         switch (cleanup) {
776         case 4: {
777                 int rc2;
778
779                 rc2 = echo_client_cleanup(obd);
780                 if (rc2)
781                         CERROR("Cleanup obd device %s error(%d)\n",
782                                obd->obd_name, rc2);
783         }
784
785         case 3:
786                 echo_site_fini(env, ed);
787         case 2:
788                 cl_device_fini(&ed->ed_cl);
789         case 1:
790                 kfree(ed);
791         case 0:
792         default:
793                 break;
794         }
795         return ERR_PTR(rc);
796 }
797
798 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
799                             const char *name, struct lu_device *next)
800 {
801         LBUG();
802         return 0;
803 }
804
805 static struct lu_device *echo_device_fini(const struct lu_env *env,
806                                           struct lu_device *d)
807 {
808         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
809         struct lu_device *next = ed->ed_next;
810
811         while (next)
812                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
813         return NULL;
814 }
815
816 static void echo_lock_release(const struct lu_env *env,
817                               struct echo_lock *ecl,
818                               int still_used)
819 {
820         struct cl_lock *clk = echo_lock2cl(ecl);
821
822         cl_lock_get(clk);
823         cl_unuse(env, clk);
824         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
825         if (!still_used) {
826                 cl_lock_mutex_get(env, clk);
827                 cl_lock_cancel(env, clk);
828                 cl_lock_delete(env, clk);
829                 cl_lock_mutex_put(env, clk);
830         }
831         cl_lock_put(env, clk);
832 }
833
834 static struct lu_device *echo_device_free(const struct lu_env *env,
835                                           struct lu_device *d)
836 {
837         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
838         struct echo_client_obd *ec   = ed->ed_ec;
839         struct echo_object     *eco;
840         struct lu_device       *next = ed->ed_next;
841
842         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
843                ed, next);
844
845         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
846
847         /* check if there are objects still alive.
848          * It shouldn't have any object because lu_site_purge would cleanup
849          * all of cached objects. Anyway, probably the echo device is being
850          * parallelly accessed.
851          */
852         spin_lock(&ec->ec_lock);
853         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
854                 eco->eo_deleted = 1;
855         spin_unlock(&ec->ec_lock);
856
857         /* purge again */
858         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
859
860         CDEBUG(D_INFO,
861                "Waiting for the reference of echo object to be dropped\n");
862
863         /* Wait for the last reference to be dropped. */
864         spin_lock(&ec->ec_lock);
865         while (!list_empty(&ec->ec_objects)) {
866                 spin_unlock(&ec->ec_lock);
867                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
868                 set_current_state(TASK_UNINTERRUPTIBLE);
869                 schedule_timeout(cfs_time_seconds(1));
870                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
871                 spin_lock(&ec->ec_lock);
872         }
873         spin_unlock(&ec->ec_lock);
874
875         LASSERT(list_empty(&ec->ec_locks));
876
877         CDEBUG(D_INFO, "No object exists, exiting...\n");
878
879         echo_client_cleanup(d->ld_obd);
880
881         while (next)
882                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
883
884         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
885         echo_site_fini(env, ed);
886         cl_device_fini(&ed->ed_cl);
887         kfree(ed);
888
889         return NULL;
890 }
891
892 static const struct lu_device_type_operations echo_device_type_ops = {
893         .ldto_init = echo_type_init,
894         .ldto_fini = echo_type_fini,
895
896         .ldto_start = echo_type_start,
897         .ldto_stop  = echo_type_stop,
898
899         .ldto_device_alloc = echo_device_alloc,
900         .ldto_device_free  = echo_device_free,
901         .ldto_device_init  = echo_device_init,
902         .ldto_device_fini  = echo_device_fini
903 };
904
905 static struct lu_device_type echo_device_type = {
906         .ldt_tags     = LU_DEVICE_CL,
907         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
908         .ldt_ops      = &echo_device_type_ops,
909         .ldt_ctx_tags = LCT_CL_THREAD,
910 };
911
912 /** @} echo_init */
913
914 /** \defgroup echo_exports Exported operations
915  *
916  * exporting functions to echo client
917  *
918  * @{
919  */
920
921 /* Interfaces to echo client obd device */
922 static struct echo_object *cl_echo_object_find(struct echo_device *d,
923                                                struct lov_stripe_md **lsmp)
924 {
925         struct lu_env *env;
926         struct echo_thread_info *info;
927         struct echo_object_conf *conf;
928         struct lov_stripe_md    *lsm;
929         struct echo_object *eco;
930         struct cl_object   *obj;
931         struct lu_fid *fid;
932         int refcheck;
933         int rc;
934
935         LASSERT(lsmp);
936         lsm = *lsmp;
937         LASSERT(lsm);
938         LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
939         LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
940                  POSTID(&lsm->lsm_oi));
941
942         /* Never return an object if the obd is to be freed. */
943         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
944                 return ERR_PTR(-ENODEV);
945
946         env = cl_env_get(&refcheck);
947         if (IS_ERR(env))
948                 return (void *)env;
949
950         info = echo_env_info(env);
951         conf = &info->eti_conf;
952         if (d->ed_next) {
953                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
954
955                 LASSERT(oinfo);
956                 oinfo->loi_oi = lsm->lsm_oi;
957                 conf->eoc_cl.u.coc_oinfo = oinfo;
958         }
959         conf->eoc_md = lsmp;
960
961         fid  = &info->eti_fid;
962         rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
963         if (rc != 0) {
964                 eco = ERR_PTR(rc);
965                 goto out;
966         }
967
968         /* In the function below, .hs_keycmp resolves to
969          * lu_obj_hop_keycmp()
970          */
971         /* coverity[overrun-buffer-val] */
972         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
973         if (IS_ERR(obj)) {
974                 eco = (void *)obj;
975                 goto out;
976         }
977
978         eco = cl2echo_obj(obj);
979         if (eco->eo_deleted) {
980                 cl_object_put(env, obj);
981                 eco = ERR_PTR(-EAGAIN);
982         }
983
984 out:
985         cl_env_put(env, &refcheck);
986         return eco;
987 }
988
989 static int cl_echo_object_put(struct echo_object *eco)
990 {
991         struct lu_env *env;
992         struct cl_object *obj = echo_obj2cl(eco);
993         int refcheck;
994
995         env = cl_env_get(&refcheck);
996         if (IS_ERR(env))
997                 return PTR_ERR(env);
998
999         /* an external function to kill an object? */
1000         if (eco->eo_deleted) {
1001                 struct lu_object_header *loh = obj->co_lu.lo_header;
1002
1003                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1004                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1005         }
1006
1007         cl_object_put(env, obj);
1008         cl_env_put(env, &refcheck);
1009         return 0;
1010 }
1011
1012 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1013                             u64 start, u64 end, int mode,
1014                             __u64 *cookie, __u32 enqflags)
1015 {
1016         struct cl_io *io;
1017         struct cl_lock *lck;
1018         struct cl_object *obj;
1019         struct cl_lock_descr *descr;
1020         struct echo_thread_info *info;
1021         int rc = -ENOMEM;
1022
1023         info = echo_env_info(env);
1024         io = &info->eti_io;
1025         descr = &info->eti_descr;
1026         obj = echo_obj2cl(eco);
1027
1028         descr->cld_obj   = obj;
1029         descr->cld_start = cl_index(obj, start);
1030         descr->cld_end   = cl_index(obj, end);
1031         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1032         descr->cld_enq_flags = enqflags;
1033         io->ci_obj = obj;
1034
1035         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1036         if (lck) {
1037                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1038                 struct echo_lock *el;
1039
1040                 rc = cl_wait(env, lck);
1041                 if (rc == 0) {
1042                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1043                         spin_lock(&ec->ec_lock);
1044                         if (list_empty(&el->el_chain)) {
1045                                 list_add(&el->el_chain, &ec->ec_locks);
1046                                 el->el_cookie = ++ec->ec_unique;
1047                         }
1048                         atomic_inc(&el->el_refcount);
1049                         *cookie = el->el_cookie;
1050                         spin_unlock(&ec->ec_lock);
1051                 } else {
1052                         cl_lock_release(env, lck, "ec enqueue", current);
1053                 }
1054         }
1055         return rc;
1056 }
1057
1058 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1059                            __u64 cookie)
1060 {
1061         struct echo_client_obd *ec = ed->ed_ec;
1062         struct echo_lock       *ecl = NULL;
1063         struct list_head             *el;
1064         int found = 0, still_used = 0;
1065
1066         spin_lock(&ec->ec_lock);
1067         list_for_each(el, &ec->ec_locks) {
1068                 ecl = list_entry(el, struct echo_lock, el_chain);
1069                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1070                 found = (ecl->el_cookie == cookie);
1071                 if (found) {
1072                         if (atomic_dec_and_test(&ecl->el_refcount))
1073                                 list_del_init(&ecl->el_chain);
1074                         else
1075                                 still_used = 1;
1076                         break;
1077                 }
1078         }
1079         spin_unlock(&ec->ec_lock);
1080
1081         if (!found)
1082                 return -ENOENT;
1083
1084         echo_lock_release(env, ecl, still_used);
1085         return 0;
1086 }
1087
1088 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1089                              enum cl_req_type unused, struct cl_2queue *queue)
1090 {
1091         struct cl_page *clp;
1092         struct cl_page *temp;
1093         int result = 0;
1094
1095         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1096                 int rc;
1097
1098                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1099                 if (rc == 0)
1100                         continue;
1101                 result = result ?: rc;
1102         }
1103         return result;
1104 }
1105
1106 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1107                               struct page **pages, int npages, int async)
1108 {
1109         struct lu_env      *env;
1110         struct echo_thread_info *info;
1111         struct cl_object        *obj = echo_obj2cl(eco);
1112         struct echo_device      *ed  = eco->eo_dev;
1113         struct cl_2queue        *queue;
1114         struct cl_io        *io;
1115         struct cl_page    *clp;
1116         struct lustre_handle    lh = { 0 };
1117         int page_size = cl_page_size(obj);
1118         int refcheck;
1119         int rc;
1120         int i;
1121
1122         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1123         LASSERT(ed->ed_next);
1124         env = cl_env_get(&refcheck);
1125         if (IS_ERR(env))
1126                 return PTR_ERR(env);
1127
1128         info    = echo_env_info(env);
1129         io      = &info->eti_io;
1130         queue   = &info->eti_queue;
1131
1132         cl_2queue_init(queue);
1133
1134         io->ci_ignore_layout = 1;
1135         rc = cl_io_init(env, io, CIT_MISC, obj);
1136         if (rc < 0)
1137                 goto out;
1138         LASSERT(rc == 0);
1139
1140         rc = cl_echo_enqueue0(env, eco, offset,
1141                               offset + npages * PAGE_SIZE - 1,
1142                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1143                               CEF_NEVER);
1144         if (rc < 0)
1145                 goto error_lock;
1146
1147         for (i = 0; i < npages; i++) {
1148                 LASSERT(pages[i]);
1149                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1150                                    pages[i], CPT_TRANSIENT);
1151                 if (IS_ERR(clp)) {
1152                         rc = PTR_ERR(clp);
1153                         break;
1154                 }
1155                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1156
1157                 rc = cl_page_own(env, io, clp);
1158                 if (rc) {
1159                         LASSERT(clp->cp_state == CPS_FREEING);
1160                         cl_page_put(env, clp);
1161                         break;
1162                 }
1163                 /*
1164                  * Add a page to the incoming page list of 2-queue.
1165                  */
1166                 cl_page_list_add(&queue->c2_qin, clp);
1167
1168                 /* drop the reference count for cl_page_find, so that the page
1169                  * will be freed in cl_2queue_fini.
1170                  */
1171                 cl_page_put(env, clp);
1172                 cl_page_clip(env, clp, 0, page_size);
1173
1174                 offset += page_size;
1175         }
1176
1177         if (rc == 0) {
1178                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1179
1180                 async = async && (typ == CRT_WRITE);
1181                 if (async)
1182                         rc = cl_echo_async_brw(env, io, typ, queue);
1183                 else
1184                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1185                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1186                        async ? "async" : "sync", rc);
1187         }
1188
1189         cl_echo_cancel0(env, ed, lh.cookie);
1190 error_lock:
1191         cl_2queue_discard(env, io, queue);
1192         cl_2queue_disown(env, io, queue);
1193         cl_2queue_fini(env, queue);
1194         cl_io_fini(env, io);
1195 out:
1196         cl_env_put(env, &refcheck);
1197         return rc;
1198 }
1199
1200 /** @} echo_exports */
1201
1202 static u64 last_object_id;
1203
1204 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1205                               struct obdo *oa, struct obd_trans_info *oti)
1206 {
1207         struct echo_object     *eco;
1208         struct echo_client_obd *ec = ed->ed_ec;
1209         struct lov_stripe_md   *lsm = NULL;
1210         int                  rc;
1211         int                  created = 0;
1212
1213         if (!(oa->o_valid & OBD_MD_FLID) ||
1214             !(oa->o_valid & OBD_MD_FLGROUP) ||
1215             !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1216                 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
1217                 return -EINVAL;
1218         }
1219
1220         rc = echo_alloc_memmd(ed, &lsm);
1221         if (rc < 0) {
1222                 CERROR("Cannot allocate md: rc = %d\n", rc);
1223                 goto failed;
1224         }
1225
1226         /* setup object ID here */
1227         lsm->lsm_oi = oa->o_oi;
1228
1229         if (ostid_id(&lsm->lsm_oi) == 0)
1230                 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1231
1232         rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1233         if (rc != 0) {
1234                 CERROR("Cannot create objects: rc = %d\n", rc);
1235                 goto failed;
1236         }
1237         created = 1;
1238
1239         /* See what object ID we were given */
1240         oa->o_oi = lsm->lsm_oi;
1241         oa->o_valid |= OBD_MD_FLID;
1242
1243         eco = cl_echo_object_find(ed, &lsm);
1244         if (IS_ERR(eco)) {
1245                 rc = PTR_ERR(eco);
1246                 goto failed;
1247         }
1248         cl_echo_object_put(eco);
1249
1250         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1251
1252  failed:
1253         if (created && rc)
1254                 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
1255         if (lsm)
1256                 echo_free_memmd(ed, &lsm);
1257         if (rc)
1258                 CERROR("create object failed with: rc = %d\n", rc);
1259         return rc;
1260 }
1261
1262 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1263                            struct obdo *oa)
1264 {
1265         struct lov_stripe_md   *lsm = NULL;
1266         struct echo_object     *eco;
1267         int                  rc;
1268
1269         if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1270                 /* disallow use of object id 0 */
1271                 CERROR("No valid oid\n");
1272                 return -EINVAL;
1273         }
1274
1275         rc = echo_alloc_memmd(ed, &lsm);
1276         if (rc < 0)
1277                 return rc;
1278
1279         lsm->lsm_oi = oa->o_oi;
1280         if (!(oa->o_valid & OBD_MD_FLGROUP))
1281                 ostid_set_seq_echo(&lsm->lsm_oi);
1282
1283         rc = 0;
1284         eco = cl_echo_object_find(ed, &lsm);
1285         if (!IS_ERR(eco))
1286                 *ecop = eco;
1287         else
1288                 rc = PTR_ERR(eco);
1289         if (lsm)
1290                 echo_free_memmd(ed, &lsm);
1291         return rc;
1292 }
1293
1294 static void echo_put_object(struct echo_object *eco)
1295 {
1296         int rc;
1297
1298         rc = cl_echo_object_put(eco);
1299         if (rc)
1300                 CERROR("%s: echo client drop an object failed: rc = %d\n",
1301                        eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
1302 }
1303
1304 static void
1305 echo_client_page_debug_setup(struct page *page, int rw, u64 id,
1306                              u64 offset, u64 count)
1307 {
1308         char    *addr;
1309         u64      stripe_off;
1310         u64      stripe_id;
1311         int      delta;
1312
1313         /* no partial pages on the client */
1314         LASSERT(count == PAGE_SIZE);
1315
1316         addr = kmap(page);
1317
1318         for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1319                 if (rw == OBD_BRW_WRITE) {
1320                         stripe_off = offset + delta;
1321                         stripe_id = id;
1322                 } else {
1323                         stripe_off = 0xdeadbeef00c0ffeeULL;
1324                         stripe_id = 0xdeadbeef00c0ffeeULL;
1325                 }
1326                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1327                                   stripe_off, stripe_id);
1328         }
1329
1330         kunmap(page);
1331 }
1332
1333 static int echo_client_page_debug_check(struct page *page, u64 id,
1334                                         u64 offset, u64 count)
1335 {
1336         u64     stripe_off;
1337         u64     stripe_id;
1338         char   *addr;
1339         int     delta;
1340         int     rc;
1341         int     rc2;
1342
1343         /* no partial pages on the client */
1344         LASSERT(count == PAGE_SIZE);
1345
1346         addr = kmap(page);
1347
1348         for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1349                 stripe_off = offset + delta;
1350                 stripe_id = id;
1351
1352                 rc2 = block_debug_check("test_brw",
1353                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1354                                         stripe_off, stripe_id);
1355                 if (rc2 != 0) {
1356                         CERROR("Error in echo object %#llx\n", id);
1357                         rc = rc2;
1358                 }
1359         }
1360
1361         kunmap(page);
1362         return rc;
1363 }
1364
1365 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1366                             struct echo_object *eco, u64 offset,
1367                             u64 count, int async,
1368                             struct obd_trans_info *oti)
1369 {
1370         u32            npages;
1371         struct brw_page *pga;
1372         struct brw_page *pgp;
1373         struct page         **pages;
1374         u64              off;
1375         int                  i;
1376         int                  rc;
1377         int                  verify;
1378         gfp_t                gfp_mask;
1379         int                  brw_flags = 0;
1380
1381         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1382                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1383                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1384
1385         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1386
1387         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1388
1389         if (count <= 0 ||
1390             (count & (~CFS_PAGE_MASK)) != 0)
1391                 return -EINVAL;
1392
1393         /* XXX think again with misaligned I/O */
1394         npages = count >> PAGE_SHIFT;
1395
1396         if (rw == OBD_BRW_WRITE)
1397                 brw_flags = OBD_BRW_ASYNC;
1398
1399         pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1400         if (!pga)
1401                 return -ENOMEM;
1402
1403         pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1404         if (!pages) {
1405                 kfree(pga);
1406                 return -ENOMEM;
1407         }
1408
1409         for (i = 0, pgp = pga, off = offset;
1410              i < npages;
1411              i++, pgp++, off += PAGE_SIZE) {
1412
1413                 LASSERT(!pgp->pg);      /* for cleanup */
1414
1415                 rc = -ENOMEM;
1416                 pgp->pg = alloc_page(gfp_mask);
1417                 if (!pgp->pg)
1418                         goto out;
1419
1420                 pages[i] = pgp->pg;
1421                 pgp->count = PAGE_SIZE;
1422                 pgp->off = off;
1423                 pgp->flag = brw_flags;
1424
1425                 if (verify)
1426                         echo_client_page_debug_setup(pgp->pg, rw,
1427                                                      ostid_id(&oa->o_oi), off,
1428                                                      pgp->count);
1429         }
1430
1431         /* brw mode can only be used at client */
1432         LASSERT(ed->ed_next);
1433         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1434
1435  out:
1436         if (rc != 0 || rw != OBD_BRW_READ)
1437                 verify = 0;
1438
1439         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1440                 if (!pgp->pg)
1441                         continue;
1442
1443                 if (verify) {
1444                         int vrc;
1445
1446                         vrc = echo_client_page_debug_check(pgp->pg,
1447                                                            ostid_id(&oa->o_oi),
1448                                                            pgp->off, pgp->count);
1449                         if (vrc != 0 && rc == 0)
1450                                 rc = vrc;
1451                 }
1452                 __free_page(pgp->pg);
1453         }
1454         kfree(pga);
1455         kfree(pages);
1456         return rc;
1457 }
1458
1459 static int echo_client_prep_commit(const struct lu_env *env,
1460                                    struct obd_export *exp, int rw,
1461                                    struct obdo *oa, struct echo_object *eco,
1462                                    u64 offset, u64 count,
1463                                    u64 batch, struct obd_trans_info *oti,
1464                                    int async)
1465 {
1466         struct obd_ioobj ioo;
1467         struct niobuf_local *lnb;
1468         struct niobuf_remote *rnb;
1469         u64 off;
1470         u64 npages, tot_pages;
1471         int i, ret = 0, brw_flags = 0;
1472
1473         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0)
1474                 return -EINVAL;
1475
1476         npages = batch >> PAGE_SHIFT;
1477         tot_pages = count >> PAGE_SHIFT;
1478
1479         lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1480         rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1481
1482         if (!lnb || !rnb) {
1483                 ret = -ENOMEM;
1484                 goto out;
1485         }
1486
1487         if (rw == OBD_BRW_WRITE && async)
1488                 brw_flags |= OBD_BRW_ASYNC;
1489
1490         obdo_to_ioobj(oa, &ioo);
1491
1492         off = offset;
1493
1494         for (; tot_pages; tot_pages -= npages) {
1495                 int lpages;
1496
1497                 if (tot_pages < npages)
1498                         npages = tot_pages;
1499
1500                 for (i = 0; i < npages; i++, off += PAGE_SIZE) {
1501                         rnb[i].offset = off;
1502                         rnb[i].len = PAGE_SIZE;
1503                         rnb[i].flags = brw_flags;
1504                 }
1505
1506                 ioo.ioo_bufcnt = npages;
1507                 oti->oti_transno = 0;
1508
1509                 lpages = npages;
1510                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1511                                  lnb, oti);
1512                 if (ret != 0)
1513                         goto out;
1514                 LASSERT(lpages == npages);
1515
1516                 for (i = 0; i < lpages; i++) {
1517                         struct page *page = lnb[i].page;
1518
1519                         /* read past eof? */
1520                         if (!page  && lnb[i].rc == 0)
1521                                 continue;
1522
1523                         if (async)
1524                                 lnb[i].flags |= OBD_BRW_ASYNC;
1525
1526                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1527                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1528                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1529                                 continue;
1530
1531                         if (rw == OBD_BRW_WRITE)
1532                                 echo_client_page_debug_setup(page, rw,
1533                                                             ostid_id(&oa->o_oi),
1534                                                              rnb[i].offset,
1535                                                              rnb[i].len);
1536                         else
1537                                 echo_client_page_debug_check(page,
1538                                                             ostid_id(&oa->o_oi),
1539                                                              rnb[i].offset,
1540                                                              rnb[i].len);
1541                 }
1542
1543                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1544                                    rnb, npages, lnb, oti, ret);
1545                 if (ret != 0)
1546                         goto out;
1547
1548                 /* Reset oti otherwise it would confuse ldiskfs. */
1549                 memset(oti, 0, sizeof(*oti));
1550
1551                 /* Reuse env context. */
1552                 lu_context_exit((struct lu_context *)&env->le_ctx);
1553                 lu_context_enter((struct lu_context *)&env->le_ctx);
1554         }
1555
1556 out:
1557         kfree(lnb);
1558         kfree(rnb);
1559         return ret;
1560 }
1561
1562 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1563                                  struct obd_export *exp,
1564                                  struct obd_ioctl_data *data,
1565                                  struct obd_trans_info *dummy_oti)
1566 {
1567         struct obd_device *obd = class_exp2obd(exp);
1568         struct echo_device *ed = obd2echo_dev(obd);
1569         struct echo_client_obd *ec = ed->ed_ec;
1570         struct obdo *oa = &data->ioc_obdo1;
1571         struct echo_object *eco;
1572         int rc;
1573         int async = 1;
1574         long test_mode;
1575
1576         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1577
1578         rc = echo_get_object(&eco, ed, oa);
1579         if (rc)
1580                 return rc;
1581
1582         oa->o_valid &= ~OBD_MD_FLHANDLE;
1583
1584         /* OFD/obdfilter works only via prep/commit */
1585         test_mode = (long)data->ioc_pbuf1;
1586         if (test_mode == 1)
1587                 async = 0;
1588
1589         if (!ed->ed_next && test_mode != 3) {
1590                 test_mode = 3;
1591                 data->ioc_plen1 = data->ioc_count;
1592         }
1593
1594         /* Truncate batch size to maximum */
1595         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1596                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1597
1598         switch (test_mode) {
1599         case 1:
1600                 /* fall through */
1601         case 2:
1602                 rc = echo_client_kbrw(ed, rw, oa,
1603                                       eco, data->ioc_offset,
1604                                       data->ioc_count, async, dummy_oti);
1605                 break;
1606         case 3:
1607                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1608                                              eco, data->ioc_offset,
1609                                              data->ioc_count, data->ioc_plen1,
1610                                              dummy_oti, async);
1611                 break;
1612         default:
1613                 rc = -EINVAL;
1614         }
1615         echo_put_object(eco);
1616         return rc;
1617 }
1618
1619 static int
1620 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1621                       void *karg, void __user *uarg)
1622 {
1623         struct obd_device      *obd = exp->exp_obd;
1624         struct echo_device     *ed = obd2echo_dev(obd);
1625         struct echo_client_obd *ec = ed->ed_ec;
1626         struct echo_object     *eco;
1627         struct obd_ioctl_data  *data = karg;
1628         struct obd_trans_info   dummy_oti;
1629         struct lu_env     *env;
1630         struct oti_req_ack_lock *ack_lock;
1631         struct obdo         *oa;
1632         struct lu_fid      fid;
1633         int                  rw = OBD_BRW_READ;
1634         int                  rc = 0;
1635         int                  i;
1636
1637         memset(&dummy_oti, 0, sizeof(dummy_oti));
1638
1639         oa = &data->ioc_obdo1;
1640         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1641                 oa->o_valid |= OBD_MD_FLGROUP;
1642                 ostid_set_seq_echo(&oa->o_oi);
1643         }
1644
1645         /* This FID is unpacked just for validation at this point */
1646         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1647         if (rc < 0)
1648                 return rc;
1649
1650         env = kzalloc(sizeof(*env), GFP_NOFS);
1651         if (!env)
1652                 return -ENOMEM;
1653
1654         rc = lu_env_init(env, LCT_DT_THREAD);
1655         if (rc) {
1656                 rc = -ENOMEM;
1657                 goto out;
1658         }
1659
1660         switch (cmd) {
1661         case OBD_IOC_CREATE:                /* may create echo object */
1662                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1663                         rc = -EPERM;
1664                         goto out;
1665                 }
1666
1667                 rc = echo_create_object(env, ed, oa, &dummy_oti);
1668                 goto out;
1669
1670         case OBD_IOC_DESTROY:
1671                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1672                         rc = -EPERM;
1673                         goto out;
1674                 }
1675
1676                 rc = echo_get_object(&eco, ed, oa);
1677                 if (rc == 0) {
1678                         rc = obd_destroy(env, ec->ec_exp, oa, NULL,
1679                                          &dummy_oti, NULL);
1680                         if (rc == 0)
1681                                 eco->eo_deleted = 1;
1682                         echo_put_object(eco);
1683                 }
1684                 goto out;
1685
1686         case OBD_IOC_GETATTR:
1687                 rc = echo_get_object(&eco, ed, oa);
1688                 if (rc == 0) {
1689                         struct obd_info oinfo = {
1690                                 .oi_oa = oa,
1691                         };
1692
1693                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
1694                         echo_put_object(eco);
1695                 }
1696                 goto out;
1697
1698         case OBD_IOC_SETATTR:
1699                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1700                         rc = -EPERM;
1701                         goto out;
1702                 }
1703
1704                 rc = echo_get_object(&eco, ed, oa);
1705                 if (rc == 0) {
1706                         struct obd_info oinfo = {
1707                                 .oi_oa = oa,
1708                         };
1709
1710                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1711                         echo_put_object(eco);
1712                 }
1713                 goto out;
1714
1715         case OBD_IOC_BRW_WRITE:
1716                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1717                         rc = -EPERM;
1718                         goto out;
1719                 }
1720
1721                 rw = OBD_BRW_WRITE;
1722                 /* fall through */
1723         case OBD_IOC_BRW_READ:
1724                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1725                 goto out;
1726
1727         default:
1728                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1729                 rc = -ENOTTY;
1730                 goto out;
1731         }
1732
1733 out:
1734         lu_env_fini(env);
1735         kfree(env);
1736
1737         /* XXX this should be in a helper also called by target_send_reply */
1738         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1739              i++, ack_lock++) {
1740                 if (!ack_lock->mode)
1741                         break;
1742                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1743         }
1744
1745         return rc;
1746 }
1747
1748 static int echo_client_setup(const struct lu_env *env,
1749                              struct obd_device *obddev, struct lustre_cfg *lcfg)
1750 {
1751         struct echo_client_obd *ec = &obddev->u.echo_client;
1752         struct obd_device *tgt;
1753         struct obd_uuid echo_uuid = { "ECHO_UUID" };
1754         struct obd_connect_data *ocd = NULL;
1755         int rc;
1756
1757         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1758                 CERROR("requires a TARGET OBD name\n");
1759                 return -EINVAL;
1760         }
1761
1762         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1763         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1764                 CERROR("device not attached or not set up (%s)\n",
1765                        lustre_cfg_string(lcfg, 1));
1766                 return -EINVAL;
1767         }
1768
1769         spin_lock_init(&ec->ec_lock);
1770         INIT_LIST_HEAD(&ec->ec_objects);
1771         INIT_LIST_HEAD(&ec->ec_locks);
1772         ec->ec_unique = 0;
1773
1774         ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
1775         if (!ocd)
1776                 return -ENOMEM;
1777
1778         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1779                                  OBD_CONNECT_BRW_SIZE |
1780                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
1781                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1782                                  OBD_CONNECT_FID;
1783         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1784         ocd->ocd_version = LUSTRE_VERSION_CODE;
1785         ocd->ocd_group = FID_SEQ_ECHO;
1786
1787         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
1788
1789         kfree(ocd);
1790
1791         if (rc != 0) {
1792                 CERROR("fail to connect to device %s\n",
1793                        lustre_cfg_string(lcfg, 1));
1794                 return rc;
1795         }
1796
1797         return rc;
1798 }
1799
1800 static int echo_client_cleanup(struct obd_device *obddev)
1801 {
1802         struct echo_client_obd *ec = &obddev->u.echo_client;
1803         int rc;
1804
1805         if (!list_empty(&obddev->obd_exports)) {
1806                 CERROR("still has clients!\n");
1807                 return -EBUSY;
1808         }
1809
1810         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1811         rc = obd_disconnect(ec->ec_exp);
1812         if (rc != 0)
1813                 CERROR("fail to disconnect device: %d\n", rc);
1814
1815         return rc;
1816 }
1817
1818 static int echo_client_connect(const struct lu_env *env,
1819                                struct obd_export **exp,
1820                                struct obd_device *src, struct obd_uuid *cluuid,
1821                                struct obd_connect_data *data, void *localdata)
1822 {
1823         int             rc;
1824         struct lustre_handle conn = { 0 };
1825
1826         rc = class_connect(&conn, src, cluuid);
1827         if (rc == 0) {
1828                 *exp = class_conn2export(&conn);
1829         }
1830
1831         return rc;
1832 }
1833
1834 static int echo_client_disconnect(struct obd_export *exp)
1835 {
1836         int                  rc;
1837
1838         if (!exp) {
1839                 rc = -EINVAL;
1840                 goto out;
1841         }
1842
1843         rc = class_disconnect(exp);
1844         goto out;
1845  out:
1846         return rc;
1847 }
1848
1849 static struct obd_ops echo_client_obd_ops = {
1850         .owner          = THIS_MODULE,
1851         .iocontrol      = echo_client_iocontrol,
1852         .connect        = echo_client_connect,
1853         .disconnect     = echo_client_disconnect
1854 };
1855
1856 static int echo_client_init(void)
1857 {
1858         int rc;
1859
1860         rc = lu_kmem_init(echo_caches);
1861         if (rc == 0) {
1862                 rc = class_register_type(&echo_client_obd_ops, NULL,
1863                                          LUSTRE_ECHO_CLIENT_NAME,
1864                                          &echo_device_type);
1865                 if (rc)
1866                         lu_kmem_fini(echo_caches);
1867         }
1868         return rc;
1869 }
1870
1871 static void echo_client_exit(void)
1872 {
1873         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1874         lu_kmem_fini(echo_caches);
1875 }
1876
1877 static int __init obdecho_init(void)
1878 {
1879         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1880
1881         LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
1882
1883         return echo_client_init();
1884 }
1885
1886 static void /*__exit*/ obdecho_exit(void)
1887 {
1888         echo_client_exit();
1889
1890 }
1891
1892 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1893 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1894 MODULE_VERSION(LUSTRE_VERSION_STRING);
1895 MODULE_LICENSE("GPL");
1896
1897 module_init(obdecho_init);
1898 module_exit(obdecho_exit);
1899
1900 /** @} echo_client */