staging: lustre: use CONFIG_PROC_FS
[cascardo/linux.git] / drivers / staging / lustre / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/pagemap.h>
42 #include <linux/mm.h>
43 #include <asm/div64.h>
44 #include <linux/seq_file.h>
45 #include <linux/namei.h>
46 #include <asm/uaccess.h>
47
48 #include "../include/lustre/lustre_idl.h"
49 #include "../include/obd_support.h"
50 #include "../include/lustre_lib.h"
51 #include "../include/lustre_net.h"
52 #include "../include/obd_class.h"
53 #include "../include/lprocfs_status.h"
54 #include "../include/lustre_lite.h"
55 #include "../include/lustre_fid.h"
56 #include "lmv_internal.h"
57
58 static void lmv_activate_target(struct lmv_obd *lmv,
59                                 struct lmv_tgt_desc *tgt,
60                                 int activate)
61 {
62         if (tgt->ltd_active == activate)
63                 return;
64
65         tgt->ltd_active = activate;
66         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc    *uninitialized_var(tgt);
80         struct obd_device      *obd;
81         int                  i;
82         int                  rc = 0;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89                 tgt = lmv->tgts[i];
90                 if (tgt == NULL || tgt->ltd_exp == NULL)
91                         continue;
92
93                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
94                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95
96                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97                         break;
98         }
99
100         if (i == lmv->desc.ld_tgt_count)
101                 GOTO(out_lmv_lock, rc = -EINVAL);
102
103         obd = class_exp2obd(tgt->ltd_exp);
104         if (obd == NULL)
105                 GOTO(out_lmv_lock, rc = -ENOTCONN);
106
107         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
108                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
109                obd->obd_type->typ_name, i);
110         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
111
112         if (tgt->ltd_active == activate) {
113                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
114                        activate ? "" : "in");
115                 GOTO(out_lmv_lock, rc);
116         }
117
118         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
119                activate ? "" : "in");
120         lmv_activate_target(lmv, tgt, activate);
121
122  out_lmv_lock:
123         spin_unlock(&lmv->lmv_lock);
124         return rc;
125 }
126
127 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
128 {
129         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
130
131         return obd_get_uuid(lmv->tgts[0]->ltd_exp);
132 }
133
134 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
135                       enum obd_notify_event ev, void *data)
136 {
137         struct obd_connect_data *conn_data;
138         struct lmv_obd    *lmv = &obd->u.lmv;
139         struct obd_uuid  *uuid;
140         int                   rc = 0;
141
142         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
143                 CERROR("unexpected notification of %s %s!\n",
144                        watched->obd_type->typ_name,
145                        watched->obd_name);
146                 return -EINVAL;
147         }
148
149         uuid = &watched->u.cli.cl_target_uuid;
150         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
151                 /*
152                  * Set MDC as active before notifying the observer, so the
153                  * observer can use the MDC normally.
154                  */
155                 rc = lmv_set_mdc_active(lmv, uuid,
156                                         ev == OBD_NOTIFY_ACTIVE);
157                 if (rc) {
158                         CERROR("%sactivation of %s failed: %d\n",
159                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
160                                uuid->uuid, rc);
161                         return rc;
162                 }
163         } else if (ev == OBD_NOTIFY_OCD) {
164                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
165                 /*
166                  * XXX: Make sure that ocd_connect_flags from all targets are
167                  * the same. Otherwise one of MDTs runs wrong version or
168                  * something like this.  --umka
169                  */
170                 obd->obd_self_export->exp_connect_data = *conn_data;
171         }
172 #if 0
173         else if (ev == OBD_NOTIFY_DISCON) {
174                 /*
175                  * For disconnect event, flush fld cache for failout MDS case.
176                  */
177                 fld_client_flush(&lmv->lmv_fld);
178         }
179 #endif
180         /*
181          * Pass the notification up the chain.
182          */
183         if (obd->obd_observer)
184                 rc = obd_notify(obd->obd_observer, watched, ev, data);
185
186         return rc;
187 }
188
189 /**
190  * This is fake connect function. Its purpose is to initialize lmv and say
191  * caller that everything is okay. Real connection will be performed later.
192  */
193 static int lmv_connect(const struct lu_env *env,
194                        struct obd_export **exp, struct obd_device *obd,
195                        struct obd_uuid *cluuid, struct obd_connect_data *data,
196                        void *localdata)
197 {
198         struct proc_dir_entry *lmv_proc_dir;
199         struct lmv_obd  *lmv = &obd->u.lmv;
200         struct lustre_handle  conn = { 0 };
201         int                 rc = 0;
202
203         /*
204          * We don't want to actually do the underlying connections more than
205          * once, so keep track.
206          */
207         lmv->refcount++;
208         if (lmv->refcount > 1) {
209                 *exp = NULL;
210                 return 0;
211         }
212
213         rc = class_connect(&conn, obd, cluuid);
214         if (rc) {
215                 CERROR("class_connection() returned %d\n", rc);
216                 return rc;
217         }
218
219         *exp = class_conn2export(&conn);
220         class_export_get(*exp);
221
222         lmv->exp = *exp;
223         lmv->connected = 0;
224         lmv->cluuid = *cluuid;
225
226         if (data)
227                 lmv->conn_data = *data;
228
229         if (obd->obd_proc_private != NULL) {
230                 lmv_proc_dir = obd->obd_proc_private;
231         } else {
232                 lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
233                                                 NULL, NULL);
234                 if (IS_ERR(lmv_proc_dir)) {
235                         CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
236                                obd->obd_type->typ_name, obd->obd_name);
237                         lmv_proc_dir = NULL;
238                 }
239                 obd->obd_proc_private = lmv_proc_dir;
240         }
241
242         /*
243          * All real clients should perform actual connection right away, because
244          * it is possible, that LMV will not have opportunity to connect targets
245          * and MDC stuff will be called directly, for instance while reading
246          * ../mdc/../kbytesfree procfs file, etc.
247          */
248         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
249                 rc = lmv_check_connect(obd);
250
251         if (rc && lmv_proc_dir) {
252                 lprocfs_remove(&lmv_proc_dir);
253                 obd->obd_proc_private = NULL;
254         }
255
256         return rc;
257 }
258
259 static void lmv_set_timeouts(struct obd_device *obd)
260 {
261         struct lmv_tgt_desc   *tgt;
262         struct lmv_obd  *lmv;
263         int                 i;
264
265         lmv = &obd->u.lmv;
266         if (lmv->server_timeout == 0)
267                 return;
268
269         if (lmv->connected == 0)
270                 return;
271
272         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
273                 tgt = lmv->tgts[i];
274                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
275                         continue;
276
277                 obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
278                                    KEY_INTERMDS, 0, NULL, NULL);
279         }
280 }
281
282 static int lmv_init_ea_size(struct obd_export *exp, int easize,
283                             int def_easize, int cookiesize, int def_cookiesize)
284 {
285         struct obd_device   *obd = exp->exp_obd;
286         struct lmv_obd      *lmv = &obd->u.lmv;
287         int               i;
288         int               rc = 0;
289         int               change = 0;
290
291         if (lmv->max_easize < easize) {
292                 lmv->max_easize = easize;
293                 change = 1;
294         }
295         if (lmv->max_def_easize < def_easize) {
296                 lmv->max_def_easize = def_easize;
297                 change = 1;
298         }
299         if (lmv->max_cookiesize < cookiesize) {
300                 lmv->max_cookiesize = cookiesize;
301                 change = 1;
302         }
303         if (lmv->max_def_cookiesize < def_cookiesize) {
304                 lmv->max_def_cookiesize = def_cookiesize;
305                 change = 1;
306         }
307         if (change == 0)
308                 return 0;
309
310         if (lmv->connected == 0)
311                 return 0;
312
313         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
314                 if (lmv->tgts[i] == NULL ||
315                     lmv->tgts[i]->ltd_exp == NULL ||
316                     lmv->tgts[i]->ltd_active == 0) {
317                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
318                         continue;
319                 }
320
321                 rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
322                                      cookiesize, def_cookiesize);
323                 if (rc) {
324                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
325                                " rc = %d.\n", obd->obd_name, i, rc);
326                         break;
327                 }
328         }
329         return rc;
330 }
331
332 #define MAX_STRING_SIZE 128
333
334 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
335 {
336         struct proc_dir_entry   *lmv_proc_dir;
337         struct lmv_obd    *lmv = &obd->u.lmv;
338         struct obd_uuid  *cluuid = &lmv->cluuid;
339         struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
340         struct obd_device       *mdc_obd;
341         struct obd_export       *mdc_exp;
342         struct lu_fld_target     target;
343         int                   rc;
344
345         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
346                                         &obd->obd_uuid);
347         if (!mdc_obd) {
348                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
349                 return -EINVAL;
350         }
351
352         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
353                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
354                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
355                 cluuid->uuid);
356
357         if (!mdc_obd->obd_set_up) {
358                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
359                 return -EINVAL;
360         }
361
362         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
363                          &lmv->conn_data, NULL);
364         if (rc) {
365                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
366                 return rc;
367         }
368
369         /*
370          * Init fid sequence client for this mdc and add new fld target.
371          */
372         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
373         if (rc)
374                 return rc;
375
376         target.ft_srv = NULL;
377         target.ft_exp = mdc_exp;
378         target.ft_idx = tgt->ltd_idx;
379
380         fld_client_add_target(&lmv->lmv_fld, &target);
381
382         rc = obd_register_observer(mdc_obd, obd);
383         if (rc) {
384                 obd_disconnect(mdc_exp);
385                 CERROR("target %s register_observer error %d\n",
386                        tgt->ltd_uuid.uuid, rc);
387                 return rc;
388         }
389
390         if (obd->obd_observer) {
391                 /*
392                  * Tell the observer about the new target.
393                  */
394                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
395                                 OBD_NOTIFY_ACTIVE,
396                                 (void *)(tgt - lmv->tgts[0]));
397                 if (rc) {
398                         obd_disconnect(mdc_exp);
399                         return rc;
400                 }
401         }
402
403         tgt->ltd_active = 1;
404         tgt->ltd_exp = mdc_exp;
405         lmv->desc.ld_active_tgt_count++;
406
407         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
408                         lmv->max_cookiesize, lmv->max_def_cookiesize);
409
410         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
411                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
412                 atomic_read(&obd->obd_refcount));
413
414         lmv_proc_dir = obd->obd_proc_private;
415         if (lmv_proc_dir) {
416                 struct proc_dir_entry *mdc_symlink;
417
418                 LASSERT(mdc_obd->obd_type != NULL);
419                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
420                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
421                                                   lmv_proc_dir,
422                                                   "../../../%s/%s",
423                                                   mdc_obd->obd_type->typ_name,
424                                                   mdc_obd->obd_name);
425                 if (mdc_symlink == NULL) {
426                         CERROR("Could not register LMV target "
427                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
428                                obd->obd_type->typ_name, obd->obd_name,
429                                mdc_obd->obd_name);
430                         lprocfs_remove(&lmv_proc_dir);
431                         obd->obd_proc_private = NULL;
432                 }
433         }
434         return 0;
435 }
436
437 static void lmv_del_target(struct lmv_obd *lmv, int index)
438 {
439         if (lmv->tgts[index] == NULL)
440                 return;
441
442         OBD_FREE_PTR(lmv->tgts[index]);
443         lmv->tgts[index] = NULL;
444         return;
445 }
446
447 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
448                            __u32 index, int gen)
449 {
450         struct lmv_obd      *lmv = &obd->u.lmv;
451         struct lmv_tgt_desc *tgt;
452         int               rc = 0;
453
454         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
455
456         lmv_init_lock(lmv);
457
458         if (lmv->desc.ld_tgt_count == 0) {
459                 struct obd_device *mdc_obd;
460
461                 mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
462                                                 &obd->obd_uuid);
463                 if (!mdc_obd) {
464                         lmv_init_unlock(lmv);
465                         CERROR("%s: Target %s not attached: rc = %d\n",
466                                obd->obd_name, uuidp->uuid, -EINVAL);
467                         return -EINVAL;
468                 }
469         }
470
471         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
472                 tgt = lmv->tgts[index];
473                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
474                        " rc = %d\n", obd->obd_name,
475                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
476                 lmv_init_unlock(lmv);
477                 return -EEXIST;
478         }
479
480         if (index >= lmv->tgts_size) {
481                 /* We need to reallocate the lmv target array. */
482                 struct lmv_tgt_desc **newtgts, **old = NULL;
483                 __u32 newsize = 1;
484                 __u32 oldsize = 0;
485
486                 while (newsize < index + 1)
487                         newsize = newsize << 1;
488                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
489                 if (newtgts == NULL) {
490                         lmv_init_unlock(lmv);
491                         return -ENOMEM;
492                 }
493
494                 if (lmv->tgts_size) {
495                         memcpy(newtgts, lmv->tgts,
496                                sizeof(*newtgts) * lmv->tgts_size);
497                         old = lmv->tgts;
498                         oldsize = lmv->tgts_size;
499                 }
500
501                 lmv->tgts = newtgts;
502                 lmv->tgts_size = newsize;
503                 smp_rmb();
504                 if (old)
505                         OBD_FREE(old, sizeof(*old) * oldsize);
506
507                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
508                        lmv->tgts_size);
509         }
510
511         OBD_ALLOC_PTR(tgt);
512         if (!tgt) {
513                 lmv_init_unlock(lmv);
514                 return -ENOMEM;
515         }
516
517         mutex_init(&tgt->ltd_fid_mutex);
518         tgt->ltd_idx = index;
519         tgt->ltd_uuid = *uuidp;
520         tgt->ltd_active = 0;
521         lmv->tgts[index] = tgt;
522         if (index >= lmv->desc.ld_tgt_count)
523                 lmv->desc.ld_tgt_count = index + 1;
524
525         if (lmv->connected) {
526                 rc = lmv_connect_mdc(obd, tgt);
527                 if (rc) {
528                         spin_lock(&lmv->lmv_lock);
529                         lmv->desc.ld_tgt_count--;
530                         memset(tgt, 0, sizeof(*tgt));
531                         spin_unlock(&lmv->lmv_lock);
532                 } else {
533                         int easize = sizeof(struct lmv_stripe_md) +
534                                 lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
535                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
536                 }
537         }
538
539         lmv_init_unlock(lmv);
540         return rc;
541 }
542
543 int lmv_check_connect(struct obd_device *obd)
544 {
545         struct lmv_obd       *lmv = &obd->u.lmv;
546         struct lmv_tgt_desc  *tgt;
547         int                i;
548         int                rc;
549         int                easize;
550
551         if (lmv->connected)
552                 return 0;
553
554         lmv_init_lock(lmv);
555         if (lmv->connected) {
556                 lmv_init_unlock(lmv);
557                 return 0;
558         }
559
560         if (lmv->desc.ld_tgt_count == 0) {
561                 lmv_init_unlock(lmv);
562                 CERROR("%s: no targets configured.\n", obd->obd_name);
563                 return -EINVAL;
564         }
565
566         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
567                lmv->cluuid.uuid, obd->obd_name);
568
569         LASSERT(lmv->tgts != NULL);
570
571         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
572                 tgt = lmv->tgts[i];
573                 if (tgt == NULL)
574                         continue;
575                 rc = lmv_connect_mdc(obd, tgt);
576                 if (rc)
577                         GOTO(out_disc, rc);
578         }
579
580         lmv_set_timeouts(obd);
581         class_export_put(lmv->exp);
582         lmv->connected = 1;
583         easize = lmv_get_easize(lmv);
584         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
585         lmv_init_unlock(lmv);
586         return 0;
587
588  out_disc:
589         while (i-- > 0) {
590                 int rc2;
591                 tgt = lmv->tgts[i];
592                 if (tgt == NULL)
593                         continue;
594                 tgt->ltd_active = 0;
595                 if (tgt->ltd_exp) {
596                         --lmv->desc.ld_active_tgt_count;
597                         rc2 = obd_disconnect(tgt->ltd_exp);
598                         if (rc2) {
599                                 CERROR("LMV target %s disconnect on "
600                                        "MDC idx %d: error %d\n",
601                                        tgt->ltd_uuid.uuid, i, rc2);
602                         }
603                 }
604         }
605         class_disconnect(lmv->exp);
606         lmv_init_unlock(lmv);
607         return rc;
608 }
609
610 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
611 {
612         struct proc_dir_entry  *lmv_proc_dir;
613         struct lmv_obd   *lmv = &obd->u.lmv;
614         struct obd_device      *mdc_obd;
615         int                  rc;
616
617         LASSERT(tgt != NULL);
618         LASSERT(obd != NULL);
619
620         mdc_obd = class_exp2obd(tgt->ltd_exp);
621
622         if (mdc_obd) {
623                 mdc_obd->obd_force = obd->obd_force;
624                 mdc_obd->obd_fail = obd->obd_fail;
625                 mdc_obd->obd_no_recov = obd->obd_no_recov;
626         }
627
628         lmv_proc_dir = obd->obd_proc_private;
629         if (lmv_proc_dir)
630                 lprocfs_remove_proc_entry(mdc_obd->obd_name, lmv_proc_dir);
631
632         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
633         if (rc)
634                 CERROR("Can't finalize fids factory\n");
635
636         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
637                tgt->ltd_exp->exp_obd->obd_name,
638                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
639
640         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
641         rc = obd_disconnect(tgt->ltd_exp);
642         if (rc) {
643                 if (tgt->ltd_active) {
644                         CERROR("Target %s disconnect error %d\n",
645                                tgt->ltd_uuid.uuid, rc);
646                 }
647         }
648
649         lmv_activate_target(lmv, tgt, 0);
650         tgt->ltd_exp = NULL;
651         return 0;
652 }
653
654 static int lmv_disconnect(struct obd_export *exp)
655 {
656         struct obd_device     *obd = class_exp2obd(exp);
657         struct lmv_obd  *lmv = &obd->u.lmv;
658         int                 rc;
659         int                 i;
660
661         if (!lmv->tgts)
662                 goto out_local;
663
664         /*
665          * Only disconnect the underlying layers on the final disconnect.
666          */
667         lmv->refcount--;
668         if (lmv->refcount != 0)
669                 goto out_local;
670
671         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
672                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
673                         continue;
674
675                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
676         }
677
678         if (obd->obd_proc_private)
679                 lprocfs_remove((struct proc_dir_entry **)&obd->obd_proc_private);
680         else
681                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
682                        obd->obd_type->typ_name, obd->obd_name);
683
684 out_local:
685         /*
686          * This is the case when no real connection is established by
687          * lmv_check_connect().
688          */
689         if (!lmv->connected)
690                 class_export_put(exp);
691         rc = class_disconnect(exp);
692         if (lmv->refcount == 0)
693                 lmv->connected = 0;
694         return rc;
695 }
696
697 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
698 {
699         struct obd_device       *obddev = class_exp2obd(exp);
700         struct lmv_obd          *lmv = &obddev->u.lmv;
701         struct getinfo_fid2path *gf;
702         struct lmv_tgt_desc     *tgt;
703         struct getinfo_fid2path *remote_gf = NULL;
704         int                     remote_gf_size = 0;
705         int                     rc;
706
707         gf = (struct getinfo_fid2path *)karg;
708         tgt = lmv_find_target(lmv, &gf->gf_fid);
709         if (IS_ERR(tgt))
710                 return PTR_ERR(tgt);
711
712 repeat_fid2path:
713         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
714         if (rc != 0 && rc != -EREMOTE)
715                 GOTO(out_fid2path, rc);
716
717         /* If remote_gf != NULL, it means just building the
718          * path on the remote MDT, copy this path segment to gf */
719         if (remote_gf != NULL) {
720                 struct getinfo_fid2path *ori_gf;
721                 char *ptr;
722
723                 ori_gf = (struct getinfo_fid2path *)karg;
724                 if (strlen(ori_gf->gf_path) +
725                     strlen(gf->gf_path) > ori_gf->gf_pathlen)
726                         GOTO(out_fid2path, rc = -EOVERFLOW);
727
728                 ptr = ori_gf->gf_path;
729
730                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
731                         strlen(ori_gf->gf_path));
732
733                 strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
734                 ptr += strlen(gf->gf_path);
735                 *ptr = '/';
736         }
737
738         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
739                tgt->ltd_exp->exp_obd->obd_name,
740                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
741                gf->gf_linkno);
742
743         if (rc == 0)
744                 GOTO(out_fid2path, rc);
745
746         /* sigh, has to go to another MDT to do path building further */
747         if (remote_gf == NULL) {
748                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
749                 OBD_ALLOC(remote_gf, remote_gf_size);
750                 if (remote_gf == NULL)
751                         GOTO(out_fid2path, rc = -ENOMEM);
752                 remote_gf->gf_pathlen = PATH_MAX;
753         }
754
755         if (!fid_is_sane(&gf->gf_fid)) {
756                 CERROR("%s: invalid FID "DFID": rc = %d\n",
757                        tgt->ltd_exp->exp_obd->obd_name,
758                        PFID(&gf->gf_fid), -EINVAL);
759                 GOTO(out_fid2path, rc = -EINVAL);
760         }
761
762         tgt = lmv_find_target(lmv, &gf->gf_fid);
763         if (IS_ERR(tgt))
764                 GOTO(out_fid2path, rc = -EINVAL);
765
766         remote_gf->gf_fid = gf->gf_fid;
767         remote_gf->gf_recno = -1;
768         remote_gf->gf_linkno = -1;
769         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
770         gf = remote_gf;
771         goto repeat_fid2path;
772
773 out_fid2path:
774         if (remote_gf != NULL)
775                 OBD_FREE(remote_gf, remote_gf_size);
776         return rc;
777 }
778
779 static int lmv_hsm_req_count(struct lmv_obd *lmv,
780                              const struct hsm_user_request *hur,
781                              const struct lmv_tgt_desc *tgt_mds)
782 {
783         int                     i, nr = 0;
784         struct lmv_tgt_desc    *curr_tgt;
785
786         /* count how many requests must be sent to the given target */
787         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
788                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
789                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
790                         nr++;
791         }
792         return nr;
793 }
794
795 static void lmv_hsm_req_build(struct lmv_obd *lmv,
796                               struct hsm_user_request *hur_in,
797                               const struct lmv_tgt_desc *tgt_mds,
798                               struct hsm_user_request *hur_out)
799 {
800         int                     i, nr_out;
801         struct lmv_tgt_desc    *curr_tgt;
802
803         /* build the hsm_user_request for the given target */
804         hur_out->hur_request = hur_in->hur_request;
805         nr_out = 0;
806         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
807                 curr_tgt = lmv_find_target(lmv,
808                                         &hur_in->hur_user_item[i].hui_fid);
809                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
810                         hur_out->hur_user_item[nr_out] =
811                                 hur_in->hur_user_item[i];
812                         nr_out++;
813                 }
814         }
815         hur_out->hur_request.hr_itemcount = nr_out;
816         memcpy(hur_data(hur_out), hur_data(hur_in),
817                hur_in->hur_request.hr_data_len);
818 }
819
820 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
821                                  struct lustre_kernelcomm *lk, void *uarg)
822 {
823         int     i, rc = 0;
824
825         /* unregister request (call from llapi_hsm_copytool_fini) */
826         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
827                 /* best effort: try to clean as much as possible
828                  * (continue on error) */
829                 obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
830         }
831
832         /* Whatever the result, remove copytool from kuc groups.
833          * Unreached coordinators will get EPIPE on next requests
834          * and will unregister automatically.
835          */
836         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
837         return rc;
838 }
839
840 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
841                                struct lustre_kernelcomm *lk, void *uarg)
842 {
843         struct file     *filp;
844         int              i, j, err;
845         int              rc = 0;
846         bool             any_set = false;
847
848         /* All or nothing: try to register to all MDS.
849          * In case of failure, unregister from previous MDS,
850          * except if it because of inactive target. */
851         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
852                 err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
853                                    len, lk, uarg);
854                 if (err) {
855                         if (lmv->tgts[i]->ltd_active) {
856                                 /* permanent error */
857                                 CERROR("error: iocontrol MDC %s on MDT"
858                                        "idx %d cmd %x: err = %d\n",
859                                         lmv->tgts[i]->ltd_uuid.uuid,
860                                         i, cmd, err);
861                                 rc = err;
862                                 lk->lk_flags |= LK_FLG_STOP;
863                                 /* unregister from previous MDS */
864                                 for (j = 0; j < i; j++)
865                                         obd_iocontrol(cmd,
866                                                   lmv->tgts[j]->ltd_exp,
867                                                   len, lk, uarg);
868                                 return rc;
869                         }
870                         /* else: transient error.
871                          * kuc will register to the missing MDT
872                          * when it is back */
873                 } else {
874                         any_set = true;
875                 }
876         }
877
878         if (!any_set)
879                 /* no registration done: return error */
880                 return -ENOTCONN;
881
882         /* at least one registration done, with no failure */
883         filp = fget(lk->lk_wfd);
884         if (filp == NULL) {
885                 return -EBADF;
886         }
887         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data);
888         if (rc != 0 && filp != NULL)
889                 fput(filp);
890         return rc;
891 }
892
893
894
895
896 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
897                          int len, void *karg, void *uarg)
898 {
899         struct obd_device    *obddev = class_exp2obd(exp);
900         struct lmv_obd       *lmv = &obddev->u.lmv;
901         int                i = 0;
902         int                rc = 0;
903         int                set = 0;
904         int                count = lmv->desc.ld_tgt_count;
905
906         if (count == 0)
907                 return -ENOTTY;
908
909         switch (cmd) {
910         case IOC_OBD_STATFS: {
911                 struct obd_ioctl_data *data = karg;
912                 struct obd_device *mdc_obd;
913                 struct obd_statfs stat_buf = {0};
914                 __u32 index;
915
916                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
917                 if ((index >= count))
918                         return -ENODEV;
919
920                 if (lmv->tgts[index] == NULL ||
921                     lmv->tgts[index]->ltd_active == 0)
922                         return -ENODATA;
923
924                 mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
925                 if (!mdc_obd)
926                         return -EINVAL;
927
928                 /* copy UUID */
929                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
930                                      min((int) data->ioc_plen2,
931                                          (int) sizeof(struct obd_uuid))))
932                         return -EFAULT;
933
934                 rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
935                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
936                                 0);
937                 if (rc)
938                         return rc;
939                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
940                                      min((int) data->ioc_plen1,
941                                          (int) sizeof(stat_buf))))
942                         return -EFAULT;
943                 break;
944         }
945         case OBD_IOC_QUOTACTL: {
946                 struct if_quotactl *qctl = karg;
947                 struct lmv_tgt_desc *tgt = NULL;
948                 struct obd_quotactl *oqctl;
949
950                 if (qctl->qc_valid == QC_MDTIDX) {
951                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
952                                 return -EINVAL;
953
954                         tgt = lmv->tgts[qctl->qc_idx];
955                         if (tgt == NULL || tgt->ltd_exp == NULL)
956                                 return -EINVAL;
957                 } else if (qctl->qc_valid == QC_UUID) {
958                         for (i = 0; i < count; i++) {
959                                 tgt = lmv->tgts[i];
960                                 if (tgt == NULL)
961                                         continue;
962                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
963                                                      &qctl->obd_uuid))
964                                         continue;
965
966                                 if (tgt->ltd_exp == NULL)
967                                         return -EINVAL;
968
969                                 break;
970                         }
971                 } else {
972                         return -EINVAL;
973                 }
974
975                 if (i >= count)
976                         return -EAGAIN;
977
978                 LASSERT(tgt && tgt->ltd_exp);
979                 OBD_ALLOC_PTR(oqctl);
980                 if (!oqctl)
981                         return -ENOMEM;
982
983                 QCTL_COPY(oqctl, qctl);
984                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
985                 if (rc == 0) {
986                         QCTL_COPY(qctl, oqctl);
987                         qctl->qc_valid = QC_MDTIDX;
988                         qctl->obd_uuid = tgt->ltd_uuid;
989                 }
990                 OBD_FREE_PTR(oqctl);
991                 break;
992         }
993         case OBD_IOC_CHANGELOG_SEND:
994         case OBD_IOC_CHANGELOG_CLEAR: {
995                 struct ioc_changelog *icc = karg;
996
997                 if (icc->icc_mdtindex >= count)
998                         return -ENODEV;
999
1000                 if (lmv->tgts[icc->icc_mdtindex] == NULL ||
1001                     lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
1002                     lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
1003                         return -ENODEV;
1004                 rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
1005                                    sizeof(*icc), icc, NULL);
1006                 break;
1007         }
1008         case LL_IOC_GET_CONNECT_FLAGS: {
1009                 if (lmv->tgts[0] == NULL)
1010                         return -ENODATA;
1011                 rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
1012                 break;
1013         }
1014         case OBD_IOC_FID2PATH: {
1015                 rc = lmv_fid2path(exp, len, karg, uarg);
1016                 break;
1017         }
1018         case LL_IOC_HSM_STATE_GET:
1019         case LL_IOC_HSM_STATE_SET:
1020         case LL_IOC_HSM_ACTION: {
1021                 struct md_op_data       *op_data = karg;
1022                 struct lmv_tgt_desc     *tgt;
1023
1024                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1025                 if (IS_ERR(tgt))
1026                                 return PTR_ERR(tgt);
1027
1028                 if (tgt->ltd_exp == NULL)
1029                                 return -EINVAL;
1030
1031                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1032                 break;
1033         }
1034         case LL_IOC_HSM_PROGRESS: {
1035                 const struct hsm_progress_kernel *hpk = karg;
1036                 struct lmv_tgt_desc     *tgt;
1037
1038                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1039                 if (IS_ERR(tgt))
1040                         return PTR_ERR(tgt);
1041                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1042                 break;
1043         }
1044         case LL_IOC_HSM_REQUEST: {
1045                 struct hsm_user_request *hur = karg;
1046                 struct lmv_tgt_desc     *tgt;
1047                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1048
1049                 if (reqcount == 0)
1050                         return 0;
1051
1052                 /* if the request is about a single fid
1053                  * or if there is a single MDS, no need to split
1054                  * the request. */
1055                 if (reqcount == 1 || count == 1) {
1056                         tgt = lmv_find_target(lmv,
1057                                               &hur->hur_user_item[0].hui_fid);
1058                         if (IS_ERR(tgt))
1059                                 return PTR_ERR(tgt);
1060                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1061                 } else {
1062                         /* split fid list to their respective MDS */
1063                         for (i = 0; i < count; i++) {
1064                                 unsigned int            nr, reqlen;
1065                                 int                     rc1;
1066                                 struct hsm_user_request *req;
1067
1068                                 nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1069                                 if (nr == 0) /* nothing for this MDS */
1070                                         continue;
1071
1072                                 /* build a request with fids for this MDS */
1073                                 reqlen = offsetof(typeof(*hur),
1074                                                   hur_user_item[nr])
1075                                          + hur->hur_request.hr_data_len;
1076                                 OBD_ALLOC_LARGE(req, reqlen);
1077                                 if (req == NULL)
1078                                         return -ENOMEM;
1079
1080                                 lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1081
1082                                 rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1083                                                     reqlen, req, uarg);
1084                                 if (rc1 != 0 && rc == 0)
1085                                         rc = rc1;
1086                                 OBD_FREE_LARGE(req, reqlen);
1087                         }
1088                 }
1089                 break;
1090         }
1091         case LL_IOC_LOV_SWAP_LAYOUTS: {
1092                 struct md_op_data       *op_data = karg;
1093                 struct lmv_tgt_desc     *tgt1, *tgt2;
1094
1095                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1096                 if (IS_ERR(tgt1))
1097                         return PTR_ERR(tgt1);
1098
1099                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1100                 if (IS_ERR(tgt2))
1101                         return PTR_ERR(tgt2);
1102
1103                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1104                         return -EINVAL;
1105
1106                 /* only files on same MDT can have their layouts swapped */
1107                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1108                         return -EPERM;
1109
1110                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1111                 break;
1112         }
1113         case LL_IOC_HSM_CT_START: {
1114                 struct lustre_kernelcomm *lk = karg;
1115                 if (lk->lk_flags & LK_FLG_STOP)
1116                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1117                 else
1118                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1119                 break;
1120         }
1121         default:
1122                 for (i = 0; i < count; i++) {
1123                         struct obd_device *mdc_obd;
1124                         int err;
1125
1126                         if (lmv->tgts[i] == NULL ||
1127                             lmv->tgts[i]->ltd_exp == NULL)
1128                                 continue;
1129                         /* ll_umount_begin() sets force flag but for lmv, not
1130                          * mdc. Let's pass it through */
1131                         mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1132                         mdc_obd->obd_force = obddev->obd_force;
1133                         err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1134                                             karg, uarg);
1135                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1136                                 return err;
1137                         } else if (err) {
1138                                 if (lmv->tgts[i]->ltd_active) {
1139                                         CERROR("error: iocontrol MDC %s on MDT"
1140                                                "idx %d cmd %x: err = %d\n",
1141                                                 lmv->tgts[i]->ltd_uuid.uuid,
1142                                                 i, cmd, err);
1143                                         if (!rc)
1144                                                 rc = err;
1145                                 }
1146                         } else
1147                                 set = 1;
1148                 }
1149                 if (!set && !rc)
1150                         rc = -EIO;
1151         }
1152         return rc;
1153 }
1154
1155 #if 0
1156 static int lmv_all_chars_policy(int count, const char *name,
1157                                 int len)
1158 {
1159         unsigned int c = 0;
1160
1161         while (len > 0)
1162                 c += name[--len];
1163         c = c % count;
1164         return c;
1165 }
1166
1167 static int lmv_nid_policy(struct lmv_obd *lmv)
1168 {
1169         struct obd_import *imp;
1170         __u32         id;
1171
1172         /*
1173          * XXX: To get nid we assume that underlying obd device is mdc.
1174          */
1175         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1176         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1177         return id % lmv->desc.ld_tgt_count;
1178 }
1179
1180 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1181                           placement_policy_t placement)
1182 {
1183         switch (placement) {
1184         case PLACEMENT_CHAR_POLICY:
1185                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1186                                             op_data->op_name,
1187                                             op_data->op_namelen);
1188         case PLACEMENT_NID_POLICY:
1189                 return lmv_nid_policy(lmv);
1190
1191         default:
1192                 break;
1193         }
1194
1195         CERROR("Unsupported placement policy %x\n", placement);
1196         return -EINVAL;
1197 }
1198 #endif
1199
1200 /**
1201  * This is _inode_ placement policy function (not name).
1202  */
1203 static int lmv_placement_policy(struct obd_device *obd,
1204                                 struct md_op_data *op_data,
1205                                 mdsno_t *mds)
1206 {
1207         struct lmv_obd    *lmv = &obd->u.lmv;
1208
1209         LASSERT(mds != NULL);
1210
1211         if (lmv->desc.ld_tgt_count == 1) {
1212                 *mds = 0;
1213                 return 0;
1214         }
1215
1216         /**
1217          * If stripe_offset is provided during setdirstripe
1218          * (setdirstripe -i xx), xx MDS will be chosen.
1219          */
1220         if (op_data->op_cli_flags & CLI_SET_MEA) {
1221                 struct lmv_user_md *lum;
1222
1223                 lum = (struct lmv_user_md *)op_data->op_data;
1224                 if (lum->lum_type == LMV_STRIPE_TYPE &&
1225                     lum->lum_stripe_offset != -1) {
1226                         if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1227                                 CERROR("%s: Stripe_offset %d > MDT count %d:"
1228                                        " rc = %d\n", obd->obd_name,
1229                                        lum->lum_stripe_offset,
1230                                        lmv->desc.ld_tgt_count, -ERANGE);
1231                                 return -ERANGE;
1232                         }
1233                         *mds = lum->lum_stripe_offset;
1234                         return 0;
1235                 }
1236         }
1237
1238         /* Allocate new fid on target according to operation type and parent
1239          * home mds. */
1240         *mds = op_data->op_mds;
1241         return 0;
1242 }
1243
1244 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
1245                     mdsno_t mds)
1246 {
1247         struct lmv_tgt_desc     *tgt;
1248         int                      rc;
1249
1250         tgt = lmv_get_target(lmv, mds);
1251         if (IS_ERR(tgt))
1252                 return PTR_ERR(tgt);
1253
1254         /*
1255          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1256          * on server that seq in new allocated fid is not yet known.
1257          */
1258         mutex_lock(&tgt->ltd_fid_mutex);
1259
1260         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1261                 GOTO(out, rc = -ENODEV);
1262
1263         /*
1264          * Asking underlaying tgt layer to allocate new fid.
1265          */
1266         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1267         if (rc > 0) {
1268                 LASSERT(fid_is_sane(fid));
1269                 rc = 0;
1270         }
1271
1272 out:
1273         mutex_unlock(&tgt->ltd_fid_mutex);
1274         return rc;
1275 }
1276
1277 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1278                   struct md_op_data *op_data)
1279 {
1280         struct obd_device     *obd = class_exp2obd(exp);
1281         struct lmv_obd  *lmv = &obd->u.lmv;
1282         mdsno_t         mds = 0;
1283         int                 rc;
1284
1285         LASSERT(op_data != NULL);
1286         LASSERT(fid != NULL);
1287
1288         rc = lmv_placement_policy(obd, op_data, &mds);
1289         if (rc) {
1290                 CERROR("Can't get target for allocating fid, "
1291                        "rc %d\n", rc);
1292                 return rc;
1293         }
1294
1295         rc = __lmv_fid_alloc(lmv, fid, mds);
1296         if (rc) {
1297                 CERROR("Can't alloc new fid, rc %d\n", rc);
1298                 return rc;
1299         }
1300
1301         return rc;
1302 }
1303
1304 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1305 {
1306         struct lmv_obd       *lmv = &obd->u.lmv;
1307         struct lprocfs_static_vars  lvars;
1308         struct lmv_desc     *desc;
1309         int                      rc;
1310
1311         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1312                 CERROR("LMV setup requires a descriptor\n");
1313                 return -EINVAL;
1314         }
1315
1316         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1317         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1318                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1319                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1320                 return -EINVAL;
1321         }
1322
1323         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1324         if (lmv->tgts == NULL)
1325                 return -ENOMEM;
1326         lmv->tgts_size = 32;
1327
1328         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1329         lmv->desc.ld_tgt_count = 0;
1330         lmv->desc.ld_active_tgt_count = 0;
1331         lmv->max_cookiesize = 0;
1332         lmv->max_def_easize = 0;
1333         lmv->max_easize = 0;
1334         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1335
1336         spin_lock_init(&lmv->lmv_lock);
1337         mutex_init(&lmv->init_mutex);
1338
1339         lprocfs_lmv_init_vars(&lvars);
1340
1341         lprocfs_obd_setup(obd, lvars.obd_vars);
1342 #if defined (CONFIG_PROC_FS)
1343         {
1344                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1345                                         0444, &lmv_proc_target_fops, obd);
1346                 if (rc)
1347                         CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1348                                obd->obd_name, rc);
1349        }
1350 #endif
1351         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1352                              LUSTRE_CLI_FLD_HASH_DHT);
1353         if (rc) {
1354                 CERROR("Can't init FLD, err %d\n", rc);
1355                 GOTO(out, rc);
1356         }
1357
1358         return 0;
1359
1360 out:
1361         return rc;
1362 }
1363
1364 static int lmv_cleanup(struct obd_device *obd)
1365 {
1366         struct lmv_obd   *lmv = &obd->u.lmv;
1367
1368         fld_client_fini(&lmv->lmv_fld);
1369         if (lmv->tgts != NULL) {
1370                 int i;
1371                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1372                         if (lmv->tgts[i] == NULL)
1373                                 continue;
1374                         lmv_del_target(lmv, i);
1375                 }
1376                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1377                 lmv->tgts_size = 0;
1378         }
1379         return 0;
1380 }
1381
1382 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1383 {
1384         struct lustre_cfg       *lcfg = buf;
1385         struct obd_uuid         obd_uuid;
1386         int                     gen;
1387         __u32                   index;
1388         int                     rc;
1389
1390         switch (lcfg->lcfg_command) {
1391         case LCFG_ADD_MDC:
1392                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1393                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1394                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1395                         GOTO(out, rc = -EINVAL);
1396
1397                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1398
1399                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1400                         GOTO(out, rc = -EINVAL);
1401                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1402                         GOTO(out, rc = -EINVAL);
1403                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1404                 GOTO(out, rc);
1405         default:
1406                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1407                 GOTO(out, rc = -EINVAL);
1408         }
1409 out:
1410         return rc;
1411 }
1412
1413 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1414                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1415 {
1416         struct obd_device     *obd = class_exp2obd(exp);
1417         struct lmv_obd  *lmv = &obd->u.lmv;
1418         struct obd_statfs     *temp;
1419         int                 rc = 0;
1420         int                 i;
1421
1422         rc = lmv_check_connect(obd);
1423         if (rc)
1424                 return rc;
1425
1426         OBD_ALLOC(temp, sizeof(*temp));
1427         if (temp == NULL)
1428                 return -ENOMEM;
1429
1430         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1431                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1432                         continue;
1433
1434                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1435                                 max_age, flags);
1436                 if (rc) {
1437                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1438                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1439                                rc);
1440                         GOTO(out_free_temp, rc);
1441                 }
1442
1443                 if (i == 0) {
1444                         *osfs = *temp;
1445                         /* If the statfs is from mount, it will needs
1446                          * retrieve necessary information from MDT0.
1447                          * i.e. mount does not need the merged osfs
1448                          * from all of MDT.
1449                          * And also clients can be mounted as long as
1450                          * MDT0 is in service*/
1451                         if (flags & OBD_STATFS_FOR_MDT0)
1452                                 GOTO(out_free_temp, rc);
1453                 } else {
1454                         osfs->os_bavail += temp->os_bavail;
1455                         osfs->os_blocks += temp->os_blocks;
1456                         osfs->os_ffree += temp->os_ffree;
1457                         osfs->os_files += temp->os_files;
1458                 }
1459         }
1460
1461 out_free_temp:
1462         OBD_FREE(temp, sizeof(*temp));
1463         return rc;
1464 }
1465
1466 static int lmv_getstatus(struct obd_export *exp,
1467                          struct lu_fid *fid,
1468                          struct obd_capa **pc)
1469 {
1470         struct obd_device    *obd = exp->exp_obd;
1471         struct lmv_obd       *lmv = &obd->u.lmv;
1472         int                rc;
1473
1474         rc = lmv_check_connect(obd);
1475         if (rc)
1476                 return rc;
1477
1478         rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1479         return rc;
1480 }
1481
1482 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1483                         struct obd_capa *oc, obd_valid valid, const char *name,
1484                         const char *input, int input_size, int output_size,
1485                         int flags, struct ptlrpc_request **request)
1486 {
1487         struct obd_device      *obd = exp->exp_obd;
1488         struct lmv_obd   *lmv = &obd->u.lmv;
1489         struct lmv_tgt_desc    *tgt;
1490         int                  rc;
1491
1492         rc = lmv_check_connect(obd);
1493         if (rc)
1494                 return rc;
1495
1496         tgt = lmv_find_target(lmv, fid);
1497         if (IS_ERR(tgt))
1498                 return PTR_ERR(tgt);
1499
1500         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1501                          input_size, output_size, flags, request);
1502
1503         return rc;
1504 }
1505
1506 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1507                         struct obd_capa *oc, obd_valid valid, const char *name,
1508                         const char *input, int input_size, int output_size,
1509                         int flags, __u32 suppgid,
1510                         struct ptlrpc_request **request)
1511 {
1512         struct obd_device      *obd = exp->exp_obd;
1513         struct lmv_obd   *lmv = &obd->u.lmv;
1514         struct lmv_tgt_desc    *tgt;
1515         int                  rc;
1516
1517         rc = lmv_check_connect(obd);
1518         if (rc)
1519                 return rc;
1520
1521         tgt = lmv_find_target(lmv, fid);
1522         if (IS_ERR(tgt))
1523                 return PTR_ERR(tgt);
1524
1525         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1526                          input_size, output_size, flags, suppgid,
1527                          request);
1528
1529         return rc;
1530 }
1531
1532 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1533                        struct ptlrpc_request **request)
1534 {
1535         struct obd_device       *obd = exp->exp_obd;
1536         struct lmv_obd    *lmv = &obd->u.lmv;
1537         struct lmv_tgt_desc     *tgt;
1538         int                   rc;
1539
1540         rc = lmv_check_connect(obd);
1541         if (rc)
1542                 return rc;
1543
1544         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1545         if (IS_ERR(tgt))
1546                 return PTR_ERR(tgt);
1547
1548         if (op_data->op_flags & MF_GET_MDT_IDX) {
1549                 op_data->op_mds = tgt->ltd_idx;
1550                 return 0;
1551         }
1552
1553         rc = md_getattr(tgt->ltd_exp, op_data, request);
1554
1555         return rc;
1556 }
1557
1558 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1559 {
1560         struct obd_device   *obd = exp->exp_obd;
1561         struct lmv_obd      *lmv = &obd->u.lmv;
1562         int               i;
1563         int               rc;
1564
1565         rc = lmv_check_connect(obd);
1566         if (rc)
1567                 return rc;
1568
1569         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1570
1571         /*
1572          * With DNE every object can have two locks in different namespaces:
1573          * lookup lock in space of MDT storing direntry and update/open lock in
1574          * space of MDT storing inode.
1575          */
1576         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1577                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1578                         continue;
1579                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1580         }
1581
1582         return 0;
1583 }
1584
1585 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1586                            ldlm_iterator_t it, void *data)
1587 {
1588         struct obd_device   *obd = exp->exp_obd;
1589         struct lmv_obd      *lmv = &obd->u.lmv;
1590         int               i;
1591         int               rc;
1592
1593         rc = lmv_check_connect(obd);
1594         if (rc)
1595                 return rc;
1596
1597         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1598
1599         /*
1600          * With DNE every object can have two locks in different namespaces:
1601          * lookup lock in space of MDT storing direntry and update/open lock in
1602          * space of MDT storing inode.
1603          */
1604         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1605                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1606                         continue;
1607                 rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1608                 if (rc)
1609                         return rc;
1610         }
1611
1612         return rc;
1613 }
1614
1615
1616 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1617                      struct md_open_data *mod, struct ptlrpc_request **request)
1618 {
1619         struct obd_device     *obd = exp->exp_obd;
1620         struct lmv_obd  *lmv = &obd->u.lmv;
1621         struct lmv_tgt_desc   *tgt;
1622         int                 rc;
1623
1624         rc = lmv_check_connect(obd);
1625         if (rc)
1626                 return rc;
1627
1628         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1629         if (IS_ERR(tgt))
1630                 return PTR_ERR(tgt);
1631
1632         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1633         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1634         return rc;
1635 }
1636
1637 struct lmv_tgt_desc
1638 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1639                 struct lu_fid *fid)
1640 {
1641         struct lmv_tgt_desc *tgt;
1642
1643         tgt = lmv_find_target(lmv, fid);
1644         if (IS_ERR(tgt))
1645                 return tgt;
1646
1647         op_data->op_mds = tgt->ltd_idx;
1648
1649         return tgt;
1650 }
1651
1652 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1653                const void *data, int datalen, int mode, __u32 uid,
1654                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1655                struct ptlrpc_request **request)
1656 {
1657         struct obd_device       *obd = exp->exp_obd;
1658         struct lmv_obd    *lmv = &obd->u.lmv;
1659         struct lmv_tgt_desc     *tgt;
1660         int                   rc;
1661
1662         rc = lmv_check_connect(obd);
1663         if (rc)
1664                 return rc;
1665
1666         if (!lmv->desc.ld_active_tgt_count)
1667                 return -EIO;
1668
1669         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1670         if (IS_ERR(tgt))
1671                 return PTR_ERR(tgt);
1672
1673         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1674         if (rc)
1675                 return rc;
1676
1677         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1678                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1679                op_data->op_mds);
1680
1681         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1682         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1683                        cap_effective, rdev, request);
1684
1685         if (rc == 0) {
1686                 if (*request == NULL)
1687                         return rc;
1688                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1689         }
1690         return rc;
1691 }
1692
1693 static int lmv_done_writing(struct obd_export *exp,
1694                             struct md_op_data *op_data,
1695                             struct md_open_data *mod)
1696 {
1697         struct obd_device     *obd = exp->exp_obd;
1698         struct lmv_obd  *lmv = &obd->u.lmv;
1699         struct lmv_tgt_desc   *tgt;
1700         int                 rc;
1701
1702         rc = lmv_check_connect(obd);
1703         if (rc)
1704                 return rc;
1705
1706         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1707         if (IS_ERR(tgt))
1708                 return PTR_ERR(tgt);
1709
1710         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1711         return rc;
1712 }
1713
1714 static int
1715 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1716                    struct lookup_intent *it, struct md_op_data *op_data,
1717                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1718                    __u64 extra_lock_flags)
1719 {
1720         struct ptlrpc_request      *req = it->d.lustre.it_data;
1721         struct obd_device         *obd = exp->exp_obd;
1722         struct lmv_obd       *lmv = &obd->u.lmv;
1723         struct lustre_handle    plock;
1724         struct lmv_tgt_desc     *tgt;
1725         struct md_op_data         *rdata;
1726         struct lu_fid          fid1;
1727         struct mdt_body     *body;
1728         int                      rc = 0;
1729         int                      pmode;
1730
1731         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1732         LASSERT(body != NULL);
1733
1734         if (!(body->valid & OBD_MD_MDS))
1735                 return 0;
1736
1737         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1738                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1739
1740         /*
1741          * We got LOOKUP lock, but we really need attrs.
1742          */
1743         pmode = it->d.lustre.it_lock_mode;
1744         LASSERT(pmode != 0);
1745         memcpy(&plock, lockh, sizeof(plock));
1746         it->d.lustre.it_lock_mode = 0;
1747         it->d.lustre.it_data = NULL;
1748         fid1 = body->fid1;
1749
1750         ptlrpc_req_finished(req);
1751
1752         tgt = lmv_find_target(lmv, &fid1);
1753         if (IS_ERR(tgt))
1754                 GOTO(out, rc = PTR_ERR(tgt));
1755
1756         OBD_ALLOC_PTR(rdata);
1757         if (rdata == NULL)
1758                 GOTO(out, rc = -ENOMEM);
1759
1760         rdata->op_fid1 = fid1;
1761         rdata->op_bias = MDS_CROSS_REF;
1762
1763         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1764                         lmm, lmmsize, NULL, extra_lock_flags);
1765         OBD_FREE_PTR(rdata);
1766 out:
1767         ldlm_lock_decref(&plock, pmode);
1768         return rc;
1769 }
1770
1771 static int
1772 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1773             struct lookup_intent *it, struct md_op_data *op_data,
1774             struct lustre_handle *lockh, void *lmm, int lmmsize,
1775             struct ptlrpc_request **req, __u64 extra_lock_flags)
1776 {
1777         struct obd_device       *obd = exp->exp_obd;
1778         struct lmv_obd     *lmv = &obd->u.lmv;
1779         struct lmv_tgt_desc      *tgt;
1780         int                    rc;
1781
1782         rc = lmv_check_connect(obd);
1783         if (rc)
1784                 return rc;
1785
1786         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1787                LL_IT2STR(it), PFID(&op_data->op_fid1));
1788
1789         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1790         if (IS_ERR(tgt))
1791                 return PTR_ERR(tgt);
1792
1793         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1794                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1795
1796         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1797                         lmm, lmmsize, req, extra_lock_flags);
1798
1799         if (rc == 0 && it && it->it_op == IT_OPEN) {
1800                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1801                                         lmm, lmmsize, extra_lock_flags);
1802         }
1803         return rc;
1804 }
1805
1806 static int
1807 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1808                  struct ptlrpc_request **request)
1809 {
1810         struct ptlrpc_request   *req = NULL;
1811         struct obd_device       *obd = exp->exp_obd;
1812         struct lmv_obd    *lmv = &obd->u.lmv;
1813         struct lmv_tgt_desc     *tgt;
1814         struct mdt_body  *body;
1815         int                   rc;
1816
1817         rc = lmv_check_connect(obd);
1818         if (rc)
1819                 return rc;
1820
1821         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1822         if (IS_ERR(tgt))
1823                 return PTR_ERR(tgt);
1824
1825         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1826                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1827                tgt->ltd_idx);
1828
1829         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1830         if (rc != 0)
1831                 return rc;
1832
1833         body = req_capsule_server_get(&(*request)->rq_pill,
1834                                       &RMF_MDT_BODY);
1835         LASSERT(body != NULL);
1836
1837         if (body->valid & OBD_MD_MDS) {
1838                 struct lu_fid rid = body->fid1;
1839                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1840                        PFID(&rid));
1841
1842                 tgt = lmv_find_target(lmv, &rid);
1843                 if (IS_ERR(tgt)) {
1844                         ptlrpc_req_finished(*request);
1845                         return PTR_ERR(tgt);
1846                 }
1847
1848                 op_data->op_fid1 = rid;
1849                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1850                 op_data->op_namelen = 0;
1851                 op_data->op_name = NULL;
1852                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1853                 ptlrpc_req_finished(*request);
1854                 *request = req;
1855         }
1856
1857         return rc;
1858 }
1859
1860 #define md_op_data_fid(op_data, fl)                  \
1861         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1862          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1863          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1864          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1865          NULL)
1866
1867 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1868                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1869 {
1870         struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1871         struct obd_device      *obd = exp->exp_obd;
1872         struct lmv_obd   *lmv = &obd->u.lmv;
1873         struct lmv_tgt_desc    *tgt;
1874         ldlm_policy_data_t      policy = {{0}};
1875         int                  rc = 0;
1876
1877         if (!fid_is_sane(fid))
1878                 return 0;
1879
1880         tgt = lmv_find_target(lmv, fid);
1881         if (IS_ERR(tgt))
1882                 return PTR_ERR(tgt);
1883
1884         if (tgt->ltd_idx != op_tgt) {
1885                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1886                 policy.l_inodebits.bits = bits;
1887                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1888                                       mode, LCF_ASYNC, NULL);
1889         } else {
1890                 CDEBUG(D_INODE,
1891                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1892                        op_tgt, PFID(fid));
1893                 op_data->op_flags |= flag;
1894                 rc = 0;
1895         }
1896
1897         return rc;
1898 }
1899
1900 /*
1901  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1902  * op_data->op_fid2
1903  */
1904 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1905                     struct ptlrpc_request **request)
1906 {
1907         struct obd_device       *obd = exp->exp_obd;
1908         struct lmv_obd    *lmv = &obd->u.lmv;
1909         struct lmv_tgt_desc     *tgt;
1910         int                   rc;
1911
1912         rc = lmv_check_connect(obd);
1913         if (rc)
1914                 return rc;
1915
1916         LASSERT(op_data->op_namelen != 0);
1917
1918         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1919                PFID(&op_data->op_fid2), op_data->op_namelen,
1920                op_data->op_name, PFID(&op_data->op_fid1));
1921
1922         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1923         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1924         op_data->op_cap = cfs_curproc_cap_pack();
1925         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1926         if (IS_ERR(tgt))
1927                 return PTR_ERR(tgt);
1928
1929         /*
1930          * Cancel UPDATE lock on child (fid1).
1931          */
1932         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1933         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1934                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1935         if (rc != 0)
1936                 return rc;
1937
1938         rc = md_link(tgt->ltd_exp, op_data, request);
1939
1940         return rc;
1941 }
1942
1943 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1944                       const char *old, int oldlen, const char *new, int newlen,
1945                       struct ptlrpc_request **request)
1946 {
1947         struct obd_device       *obd = exp->exp_obd;
1948         struct lmv_obd    *lmv = &obd->u.lmv;
1949         struct lmv_tgt_desc     *src_tgt;
1950         struct lmv_tgt_desc     *tgt_tgt;
1951         int                     rc;
1952
1953         LASSERT(oldlen != 0);
1954
1955         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1956                oldlen, old, PFID(&op_data->op_fid1),
1957                newlen, new, PFID(&op_data->op_fid2));
1958
1959         rc = lmv_check_connect(obd);
1960         if (rc)
1961                 return rc;
1962
1963         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1964         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1965         op_data->op_cap = cfs_curproc_cap_pack();
1966         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1967         if (IS_ERR(src_tgt))
1968                 return PTR_ERR(src_tgt);
1969
1970         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1971         if (IS_ERR(tgt_tgt))
1972                 return PTR_ERR(tgt_tgt);
1973         /*
1974          * LOOKUP lock on src child (fid3) should also be cancelled for
1975          * src_tgt in mdc_rename.
1976          */
1977         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1978
1979         /*
1980          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1981          * own target.
1982          */
1983         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1984                               LCK_EX, MDS_INODELOCK_UPDATE,
1985                               MF_MDC_CANCEL_FID2);
1986
1987         /*
1988          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1989          */
1990         if (rc == 0) {
1991                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1992                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1993                                       MF_MDC_CANCEL_FID4);
1994         }
1995
1996         /*
1997          * Cancel all the locks on tgt child (fid4).
1998          */
1999         if (rc == 0)
2000                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
2001                                       LCK_EX, MDS_INODELOCK_FULL,
2002                                       MF_MDC_CANCEL_FID4);
2003
2004         if (rc == 0)
2005                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
2006                                new, newlen, request);
2007         return rc;
2008 }
2009
2010 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2011                        void *ea, int ealen, void *ea2, int ea2len,
2012                        struct ptlrpc_request **request,
2013                        struct md_open_data **mod)
2014 {
2015         struct obd_device       *obd = exp->exp_obd;
2016         struct lmv_obd    *lmv = &obd->u.lmv;
2017         struct lmv_tgt_desc     *tgt;
2018         int                   rc = 0;
2019
2020         rc = lmv_check_connect(obd);
2021         if (rc)
2022                 return rc;
2023
2024         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2025                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2026
2027         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2028         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2029         if (IS_ERR(tgt))
2030                 return PTR_ERR(tgt);
2031
2032         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2033                         ea2len, request, mod);
2034
2035         return rc;
2036 }
2037
2038 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2039                     struct obd_capa *oc, struct ptlrpc_request **request)
2040 {
2041         struct obd_device        *obd = exp->exp_obd;
2042         struct lmv_obd      *lmv = &obd->u.lmv;
2043         struct lmv_tgt_desc       *tgt;
2044         int                     rc;
2045
2046         rc = lmv_check_connect(obd);
2047         if (rc)
2048                 return rc;
2049
2050         tgt = lmv_find_target(lmv, fid);
2051         if (IS_ERR(tgt))
2052                 return PTR_ERR(tgt);
2053
2054         rc = md_sync(tgt->ltd_exp, fid, oc, request);
2055         return rc;
2056 }
2057
2058 /*
2059  * Adjust a set of pages, each page containing an array of lu_dirpages,
2060  * so that each page can be used as a single logical lu_dirpage.
2061  *
2062  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2063  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2064  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2065  * value is used as a cookie to request the next lu_dirpage in a
2066  * directory listing that spans multiple pages (two in this example):
2067  *   ________
2068  *  |   |
2069  * .|--------v-------   -----.
2070  * |s|e|f|p|ent|ent| ... |ent|
2071  * '--|--------------   -----'   Each CFS_PAGE contains a single
2072  *    '------.             lu_dirpage.
2073  * .---------v-------   -----.
2074  * |s|e|f|p|ent| 0 | ... | 0 |
2075  * '-----------------   -----'
2076  *
2077  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2078  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2079  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2080  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2081  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2082  * in general e0==s1, e1==s2, etc.):
2083  *
2084  * .--------------------   -----.
2085  * |s0|e0|f0|p|ent|ent| ... |ent|
2086  * |---v----------------   -----|
2087  * |s1|e1|f1|p|ent|ent| ... |ent|
2088  * |---v----------------   -----|  Here, each CFS_PAGE contains
2089  *           ...                 multiple lu_dirpages.
2090  * |---v----------------   -----|
2091  * |s'|e'|f'|p|ent|ent| ... |ent|
2092  * '---|----------------   -----'
2093  *     v
2094  * .----------------------------.
2095  * |    next CFS_PAGE       |
2096  *
2097  * This structure is transformed into a single logical lu_dirpage as follows:
2098  *
2099  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2100  *   labeled 'next CFS_PAGE'.
2101  *
2102  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2103  *   a hash collision with the next page exists.
2104  *
2105  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2106  *   to the first entry of the next lu_dirpage.
2107  */
2108 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
2109 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2110 {
2111         int i;
2112
2113         for (i = 0; i < ncfspgs; i++) {
2114                 struct lu_dirpage       *dp = kmap(pages[i]);
2115                 struct lu_dirpage       *first = dp;
2116                 struct lu_dirent        *end_dirent = NULL;
2117                 struct lu_dirent        *ent;
2118                 __u64                   hash_end = dp->ldp_hash_end;
2119                 __u32                   flags = dp->ldp_flags;
2120
2121                 while (--nlupgs > 0) {
2122                         ent = lu_dirent_start(dp);
2123                         for (end_dirent = ent; ent != NULL;
2124                              end_dirent = ent, ent = lu_dirent_next(ent));
2125
2126                         /* Advance dp to next lu_dirpage. */
2127                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2128
2129                         /* Check if we've reached the end of the CFS_PAGE. */
2130                         if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2131                                 break;
2132
2133                         /* Save the hash and flags of this lu_dirpage. */
2134                         hash_end = dp->ldp_hash_end;
2135                         flags = dp->ldp_flags;
2136
2137                         /* Check if lu_dirpage contains no entries. */
2138                         if (!end_dirent)
2139                                 break;
2140
2141                         /* Enlarge the end entry lde_reclen from 0 to
2142                          * first entry of next lu_dirpage. */
2143                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2144                         end_dirent->lde_reclen =
2145                                 cpu_to_le16((char *)(dp->ldp_entries) -
2146                                             (char *)end_dirent);
2147                 }
2148
2149                 first->ldp_hash_end = hash_end;
2150                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2151                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2152
2153                 kunmap(pages[i]);
2154         }
2155         LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2156 }
2157 #else
2158 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2159 #endif  /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2160
2161 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2162                         struct page **pages, struct ptlrpc_request **request)
2163 {
2164         struct obd_device       *obd = exp->exp_obd;
2165         struct lmv_obd          *lmv = &obd->u.lmv;
2166         __u64                   offset = op_data->op_offset;
2167         int                     rc;
2168         int                     ncfspgs; /* pages read in PAGE_CACHE_SIZE */
2169         int                     nlupgs; /* pages read in LU_PAGE_SIZE */
2170         struct lmv_tgt_desc     *tgt;
2171
2172         rc = lmv_check_connect(obd);
2173         if (rc)
2174                 return rc;
2175
2176         CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n",
2177                offset, PFID(&op_data->op_fid1));
2178
2179         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2180         if (IS_ERR(tgt))
2181                 return PTR_ERR(tgt);
2182
2183         rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2184         if (rc != 0)
2185                 return rc;
2186
2187         ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_CACHE_SIZE - 1)
2188                  >> PAGE_CACHE_SHIFT;
2189         nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2190         LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2191         LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2192
2193         CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2194                op_data->op_npages);
2195
2196         lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2197
2198         return rc;
2199 }
2200
2201 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2202                       struct ptlrpc_request **request)
2203 {
2204         struct obd_device       *obd = exp->exp_obd;
2205         struct lmv_obd    *lmv = &obd->u.lmv;
2206         struct lmv_tgt_desc     *tgt = NULL;
2207         struct mdt_body         *body;
2208         int                  rc;
2209
2210         rc = lmv_check_connect(obd);
2211         if (rc)
2212                 return rc;
2213 retry:
2214         /* Send unlink requests to the MDT where the child is located */
2215         if (likely(!fid_is_zero(&op_data->op_fid2)))
2216                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2217         else
2218                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2219         if (IS_ERR(tgt))
2220                 return PTR_ERR(tgt);
2221
2222         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2223         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2224         op_data->op_cap = cfs_curproc_cap_pack();
2225
2226         /*
2227          * If child's fid is given, cancel unused locks for it if it is from
2228          * another export than parent.
2229          *
2230          * LOOKUP lock for child (fid3) should also be cancelled on parent
2231          * tgt_tgt in mdc_unlink().
2232          */
2233         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2234
2235         /*
2236          * Cancel FULL locks on child (fid3).
2237          */
2238         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2239                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2240
2241         if (rc != 0)
2242                 return rc;
2243
2244         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2245                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2246
2247         rc = md_unlink(tgt->ltd_exp, op_data, request);
2248         if (rc != 0 && rc != -EREMOTE)
2249                 return rc;
2250
2251         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2252         if (body == NULL)
2253                 return -EPROTO;
2254
2255         /* Not cross-ref case, just get out of here. */
2256         if (likely(!(body->valid & OBD_MD_MDS)))
2257                 return 0;
2258
2259         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2260                exp->exp_obd->obd_name, PFID(&body->fid1));
2261
2262         /* This is a remote object, try remote MDT, Note: it may
2263          * try more than 1 time here, Considering following case
2264          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2265          * 1. Initially A does not know where remote1 is, it send
2266          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2267          *    resend unlink RPC to MDT1 (retry 1st time).
2268          *
2269          * 2. During the unlink RPC in flight,
2270          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2271          *    and create new remote1, but on MDT0
2272          *
2273          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2274          *    /mnt/lustre, then lookup get fid of remote1, and find
2275          *    it is remote dir again, and replay -EREMOTE again.
2276          *
2277          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2278          *
2279          * In theory, it might try unlimited time here, but it should
2280          * be very rare case.  */
2281         op_data->op_fid2 = body->fid1;
2282         ptlrpc_req_finished(*request);
2283         *request = NULL;
2284
2285         goto retry;
2286 }
2287
2288 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2289 {
2290         struct lmv_obd *lmv = &obd->u.lmv;
2291         int rc = 0;
2292
2293         switch (stage) {
2294         case OBD_CLEANUP_EARLY:
2295                 /* XXX: here should be calling obd_precleanup() down to
2296                  * stack. */
2297                 break;
2298         case OBD_CLEANUP_EXPORTS:
2299                 fld_client_proc_fini(&lmv->lmv_fld);
2300                 lprocfs_obd_cleanup(obd);
2301                 break;
2302         default:
2303                 break;
2304         }
2305         return rc;
2306 }
2307
2308 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2309                         __u32 keylen, void *key, __u32 *vallen, void *val,
2310                         struct lov_stripe_md *lsm)
2311 {
2312         struct obd_device       *obd;
2313         struct lmv_obd    *lmv;
2314         int                   rc = 0;
2315
2316         obd = class_exp2obd(exp);
2317         if (obd == NULL) {
2318                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2319                        exp->exp_handle.h_cookie);
2320                 return -EINVAL;
2321         }
2322
2323         lmv = &obd->u.lmv;
2324         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2325                 struct lmv_tgt_desc *tgt;
2326                 int i;
2327
2328                 rc = lmv_check_connect(obd);
2329                 if (rc)
2330                         return rc;
2331
2332                 LASSERT(*vallen == sizeof(__u32));
2333                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2334                         tgt = lmv->tgts[i];
2335                         /*
2336                          * All tgts should be connected when this gets called.
2337                          */
2338                         if (tgt == NULL || tgt->ltd_exp == NULL)
2339                                 continue;
2340
2341                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2342                                           vallen, val, NULL))
2343                                 return 0;
2344                 }
2345                 return -EINVAL;
2346         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2347                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2348                    KEY_IS(KEY_MAX_COOKIESIZE) ||
2349                    KEY_IS(KEY_DEFAULT_COOKIESIZE) ||
2350                    KEY_IS(KEY_CONN_DATA)) {
2351                 rc = lmv_check_connect(obd);
2352                 if (rc)
2353                         return rc;
2354
2355                 /*
2356                  * Forwarding this request to first MDS, it should know LOV
2357                  * desc.
2358                  */
2359                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2360                                   vallen, val, NULL);
2361                 if (!rc && KEY_IS(KEY_CONN_DATA))
2362                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2363                 return rc;
2364         } else if (KEY_IS(KEY_TGT_COUNT)) {
2365                 *((int *)val) = lmv->desc.ld_tgt_count;
2366                 return 0;
2367         }
2368
2369         CDEBUG(D_IOCTL, "Invalid key\n");
2370         return -EINVAL;
2371 }
2372
2373 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2374                        obd_count keylen, void *key, obd_count vallen,
2375                        void *val, struct ptlrpc_request_set *set)
2376 {
2377         struct lmv_tgt_desc    *tgt;
2378         struct obd_device      *obd;
2379         struct lmv_obd   *lmv;
2380         int rc = 0;
2381
2382         obd = class_exp2obd(exp);
2383         if (obd == NULL) {
2384                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2385                        exp->exp_handle.h_cookie);
2386                 return -EINVAL;
2387         }
2388         lmv = &obd->u.lmv;
2389
2390         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2391                 int i, err = 0;
2392
2393                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2394                         tgt = lmv->tgts[i];
2395
2396                         if (tgt == NULL || tgt->ltd_exp == NULL)
2397                                 continue;
2398
2399                         err = obd_set_info_async(env, tgt->ltd_exp,
2400                                                  keylen, key, vallen, val, set);
2401                         if (err && rc == 0)
2402                                 rc = err;
2403                 }
2404
2405                 return rc;
2406         }
2407
2408         return -EINVAL;
2409 }
2410
2411 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2412                struct lov_stripe_md *lsm)
2413 {
2414         struct obd_device        *obd = class_exp2obd(exp);
2415         struct lmv_obd      *lmv = &obd->u.lmv;
2416         struct lmv_stripe_md      *meap;
2417         struct lmv_stripe_md      *lsmp;
2418         int                     mea_size;
2419         int                     i;
2420
2421         mea_size = lmv_get_easize(lmv);
2422         if (!lmmp)
2423                 return mea_size;
2424
2425         if (*lmmp && !lsm) {
2426                 OBD_FREE_LARGE(*lmmp, mea_size);
2427                 *lmmp = NULL;
2428                 return 0;
2429         }
2430
2431         if (*lmmp == NULL) {
2432                 OBD_ALLOC_LARGE(*lmmp, mea_size);
2433                 if (*lmmp == NULL)
2434                         return -ENOMEM;
2435         }
2436
2437         if (!lsm)
2438                 return mea_size;
2439
2440         lsmp = (struct lmv_stripe_md *)lsm;
2441         meap = (struct lmv_stripe_md *)*lmmp;
2442
2443         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2444             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2445                 return -EINVAL;
2446
2447         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2448         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2449         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2450
2451         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2452                 meap->mea_ids[i] = lsmp->mea_ids[i];
2453                 fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2454         }
2455
2456         return mea_size;
2457 }
2458
2459 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2460                  struct lov_mds_md *lmm, int lmm_size)
2461 {
2462         struct obd_device         *obd = class_exp2obd(exp);
2463         struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2464         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2465         struct lmv_obd       *lmv = &obd->u.lmv;
2466         int                      mea_size;
2467         int                      i;
2468         __u32                  magic;
2469
2470         mea_size = lmv_get_easize(lmv);
2471         if (lsmp == NULL)
2472                 return mea_size;
2473
2474         if (*lsmp != NULL && lmm == NULL) {
2475                 OBD_FREE_LARGE(*tmea, mea_size);
2476                 *lsmp = NULL;
2477                 return 0;
2478         }
2479
2480         LASSERT(mea_size == lmm_size);
2481
2482         OBD_ALLOC_LARGE(*tmea, mea_size);
2483         if (*tmea == NULL)
2484                 return -ENOMEM;
2485
2486         if (!lmm)
2487                 return mea_size;
2488
2489         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2490             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2491             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2492         {
2493                 magic = le32_to_cpu(mea->mea_magic);
2494         } else {
2495                 /*
2496                  * Old mea is not handled here.
2497                  */
2498                 CERROR("Old not supportable EA is found\n");
2499                 LBUG();
2500         }
2501
2502         (*tmea)->mea_magic = magic;
2503         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2504         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2505
2506         for (i = 0; i < (*tmea)->mea_count; i++) {
2507                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2508                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2509         }
2510         return mea_size;
2511 }
2512
2513 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2514                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2515                              ldlm_cancel_flags_t flags, void *opaque)
2516 {
2517         struct obd_device       *obd = exp->exp_obd;
2518         struct lmv_obd    *lmv = &obd->u.lmv;
2519         int                   rc = 0;
2520         int                   err;
2521         int                   i;
2522
2523         LASSERT(fid != NULL);
2524
2525         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2526                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
2527                     lmv->tgts[i]->ltd_active == 0)
2528                         continue;
2529
2530                 err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2531                                        policy, mode, flags, opaque);
2532                 if (!rc)
2533                         rc = err;
2534         }
2535         return rc;
2536 }
2537
2538 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2539                       __u64 *bits)
2540 {
2541         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2542         int                   rc;
2543
2544         rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2545         return rc;
2546 }
2547
2548 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2549                            const struct lu_fid *fid, ldlm_type_t type,
2550                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2551                            struct lustre_handle *lockh)
2552 {
2553         struct obd_device       *obd = exp->exp_obd;
2554         struct lmv_obd    *lmv = &obd->u.lmv;
2555         ldlm_mode_t           rc;
2556         int                   i;
2557
2558         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2559
2560         /*
2561          * With CMD every object can have two locks in different namespaces:
2562          * lookup lock in space of mds storing direntry and update/open lock in
2563          * space of mds storing inode. Thus we check all targets, not only that
2564          * one fid was created in.
2565          */
2566         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2567                 if (lmv->tgts[i] == NULL ||
2568                     lmv->tgts[i]->ltd_exp == NULL ||
2569                     lmv->tgts[i]->ltd_active == 0)
2570                         continue;
2571
2572                 rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2573                                    type, policy, mode, lockh);
2574                 if (rc)
2575                         return rc;
2576         }
2577
2578         return 0;
2579 }
2580
2581 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2582                       struct obd_export *dt_exp, struct obd_export *md_exp,
2583                       struct lustre_md *md)
2584 {
2585         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2586
2587         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2588 }
2589
2590 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2591 {
2592         struct obd_device       *obd = exp->exp_obd;
2593         struct lmv_obd    *lmv = &obd->u.lmv;
2594
2595         if (md->mea)
2596                 obd_free_memmd(exp, (void *)&md->mea);
2597         return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2598 }
2599
2600 int lmv_set_open_replay_data(struct obd_export *exp,
2601                              struct obd_client_handle *och,
2602                              struct lookup_intent *it)
2603 {
2604         struct obd_device       *obd = exp->exp_obd;
2605         struct lmv_obd    *lmv = &obd->u.lmv;
2606         struct lmv_tgt_desc     *tgt;
2607
2608         tgt = lmv_find_target(lmv, &och->och_fid);
2609         if (IS_ERR(tgt))
2610                 return PTR_ERR(tgt);
2611
2612         return md_set_open_replay_data(tgt->ltd_exp, och, it);
2613 }
2614
2615 int lmv_clear_open_replay_data(struct obd_export *exp,
2616                                struct obd_client_handle *och)
2617 {
2618         struct obd_device       *obd = exp->exp_obd;
2619         struct lmv_obd    *lmv = &obd->u.lmv;
2620         struct lmv_tgt_desc     *tgt;
2621
2622         tgt = lmv_find_target(lmv, &och->och_fid);
2623         if (IS_ERR(tgt))
2624                 return PTR_ERR(tgt);
2625
2626         return md_clear_open_replay_data(tgt->ltd_exp, och);
2627 }
2628
2629 static int lmv_get_remote_perm(struct obd_export *exp,
2630                                const struct lu_fid *fid,
2631                                struct obd_capa *oc, __u32 suppgid,
2632                                struct ptlrpc_request **request)
2633 {
2634         struct obd_device       *obd = exp->exp_obd;
2635         struct lmv_obd    *lmv = &obd->u.lmv;
2636         struct lmv_tgt_desc     *tgt;
2637         int                   rc;
2638
2639         rc = lmv_check_connect(obd);
2640         if (rc)
2641                 return rc;
2642
2643         tgt = lmv_find_target(lmv, fid);
2644         if (IS_ERR(tgt))
2645                 return PTR_ERR(tgt);
2646
2647         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2648         return rc;
2649 }
2650
2651 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2652                           renew_capa_cb_t cb)
2653 {
2654         struct obd_device       *obd = exp->exp_obd;
2655         struct lmv_obd    *lmv = &obd->u.lmv;
2656         struct lmv_tgt_desc     *tgt;
2657         int                   rc;
2658
2659         rc = lmv_check_connect(obd);
2660         if (rc)
2661                 return rc;
2662
2663         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2664         if (IS_ERR(tgt))
2665                 return PTR_ERR(tgt);
2666
2667         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2668         return rc;
2669 }
2670
2671 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2672                     const struct req_msg_field *field, struct obd_capa **oc)
2673 {
2674         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2675
2676         return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
2677 }
2678
2679 int lmv_intent_getattr_async(struct obd_export *exp,
2680                              struct md_enqueue_info *minfo,
2681                              struct ldlm_enqueue_info *einfo)
2682 {
2683         struct md_op_data       *op_data = &minfo->mi_data;
2684         struct obd_device       *obd = exp->exp_obd;
2685         struct lmv_obd    *lmv = &obd->u.lmv;
2686         struct lmv_tgt_desc     *tgt = NULL;
2687         int                   rc;
2688
2689         rc = lmv_check_connect(obd);
2690         if (rc)
2691                 return rc;
2692
2693         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2694         if (IS_ERR(tgt))
2695                 return PTR_ERR(tgt);
2696
2697         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2698         return rc;
2699 }
2700
2701 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2702                         struct lu_fid *fid, __u64 *bits)
2703 {
2704         struct obd_device       *obd = exp->exp_obd;
2705         struct lmv_obd    *lmv = &obd->u.lmv;
2706         struct lmv_tgt_desc     *tgt;
2707         int                   rc;
2708
2709         rc = lmv_check_connect(obd);
2710         if (rc)
2711                 return rc;
2712
2713         tgt = lmv_find_target(lmv, fid);
2714         if (IS_ERR(tgt))
2715                 return PTR_ERR(tgt);
2716
2717         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2718         return rc;
2719 }
2720
2721 /**
2722  * For lmv, only need to send request to master MDT, and the master MDT will
2723  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2724  * we directly fetch data from the slave MDTs.
2725  */
2726 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2727                  struct obd_quotactl *oqctl)
2728 {
2729         struct obd_device   *obd = class_exp2obd(exp);
2730         struct lmv_obd      *lmv = &obd->u.lmv;
2731         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2732         int               rc = 0, i;
2733         __u64           curspace, curinodes;
2734
2735         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2736                 CERROR("master lmv inactive\n");
2737                 return -EIO;
2738         }
2739
2740         if (oqctl->qc_cmd != Q_GETOQUOTA) {
2741                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2742                 return rc;
2743         }
2744
2745         curspace = curinodes = 0;
2746         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2747                 int err;
2748                 tgt = lmv->tgts[i];
2749
2750                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
2751                         continue;
2752                 if (!tgt->ltd_active) {
2753                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2754                         continue;
2755                 }
2756
2757                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2758                 if (err) {
2759                         CERROR("getquota on mdt %d failed. %d\n", i, err);
2760                         if (!rc)
2761                                 rc = err;
2762                 } else {
2763                         curspace += oqctl->qc_dqblk.dqb_curspace;
2764                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
2765                 }
2766         }
2767         oqctl->qc_dqblk.dqb_curspace = curspace;
2768         oqctl->qc_dqblk.dqb_curinodes = curinodes;
2769
2770         return rc;
2771 }
2772
2773 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2774                    struct obd_quotactl *oqctl)
2775 {
2776         struct obd_device   *obd = class_exp2obd(exp);
2777         struct lmv_obd      *lmv = &obd->u.lmv;
2778         struct lmv_tgt_desc *tgt;
2779         int               i, rc = 0;
2780
2781         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2782                 int err;
2783                 tgt = lmv->tgts[i];
2784                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
2785                         CERROR("lmv idx %d inactive\n", i);
2786                         return -EIO;
2787                 }
2788
2789                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
2790                 if (err && !rc)
2791                         rc = err;
2792         }
2793
2794         return rc;
2795 }
2796
2797 struct obd_ops lmv_obd_ops = {
2798         .o_owner                = THIS_MODULE,
2799         .o_setup                = lmv_setup,
2800         .o_cleanup            = lmv_cleanup,
2801         .o_precleanup      = lmv_precleanup,
2802         .o_process_config       = lmv_process_config,
2803         .o_connect            = lmv_connect,
2804         .o_disconnect      = lmv_disconnect,
2805         .o_statfs              = lmv_statfs,
2806         .o_get_info          = lmv_get_info,
2807         .o_set_info_async       = lmv_set_info_async,
2808         .o_packmd              = lmv_packmd,
2809         .o_unpackmd          = lmv_unpackmd,
2810         .o_notify              = lmv_notify,
2811         .o_get_uuid          = lmv_get_uuid,
2812         .o_iocontrol        = lmv_iocontrol,
2813         .o_quotacheck      = lmv_quotacheck,
2814         .o_quotactl          = lmv_quotactl
2815 };
2816
2817 struct md_ops lmv_md_ops = {
2818         .m_getstatus        = lmv_getstatus,
2819         .m_null_inode           = lmv_null_inode,
2820         .m_find_cbdata    = lmv_find_cbdata,
2821         .m_close                = lmv_close,
2822         .m_create              = lmv_create,
2823         .m_done_writing  = lmv_done_writing,
2824         .m_enqueue            = lmv_enqueue,
2825         .m_getattr            = lmv_getattr,
2826         .m_getxattr          = lmv_getxattr,
2827         .m_getattr_name  = lmv_getattr_name,
2828         .m_intent_lock    = lmv_intent_lock,
2829         .m_link          = lmv_link,
2830         .m_rename              = lmv_rename,
2831         .m_setattr            = lmv_setattr,
2832         .m_setxattr          = lmv_setxattr,
2833         .m_sync          = lmv_sync,
2834         .m_readpage          = lmv_readpage,
2835         .m_unlink              = lmv_unlink,
2836         .m_init_ea_size  = lmv_init_ea_size,
2837         .m_cancel_unused        = lmv_cancel_unused,
2838         .m_set_lock_data        = lmv_set_lock_data,
2839         .m_lock_match      = lmv_lock_match,
2840         .m_get_lustre_md        = lmv_get_lustre_md,
2841         .m_free_lustre_md       = lmv_free_lustre_md,
2842         .m_set_open_replay_data = lmv_set_open_replay_data,
2843         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2844         .m_renew_capa      = lmv_renew_capa,
2845         .m_unpack_capa    = lmv_unpack_capa,
2846         .m_get_remote_perm      = lmv_get_remote_perm,
2847         .m_intent_getattr_async = lmv_intent_getattr_async,
2848         .m_revalidate_lock      = lmv_revalidate_lock
2849 };
2850
2851 int __init lmv_init(void)
2852 {
2853         struct lprocfs_static_vars lvars;
2854         int                     rc;
2855
2856         lprocfs_lmv_init_vars(&lvars);
2857
2858         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2859                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2860         return rc;
2861 }
2862
2863 static void lmv_exit(void)
2864 {
2865         class_unregister_type(LUSTRE_LMV_NAME);
2866 }
2867
2868 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2869 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2870 MODULE_LICENSE("GPL");
2871
2872 module_init(lmv_init);
2873 module_exit(lmv_exit);