Merge branch 'parisc-4.8-1' of git://git.kernel.org/pub/scm/linux/kernel/git/deller...
[cascardo/linux.git] / drivers / staging / lustre / lustre / lov / lov_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lov/lov_obd.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Mike Shaver <shaver@clusterfs.com>
37  * Author: Nathan Rutman <nathan@clusterfs.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_LOV
41 #include "../../include/linux/libcfs/libcfs.h"
42
43 #include "../include/obd_support.h"
44 #include "../include/lustre_lib.h"
45 #include "../include/lustre_net.h"
46 #include "../include/lustre/lustre_idl.h"
47 #include "../include/lustre_dlm.h"
48 #include "../include/lustre_mds.h"
49 #include "../include/obd_class.h"
50 #include "../include/lprocfs_status.h"
51 #include "../include/lustre_param.h"
52 #include "../include/cl_object.h"
53 #include "../include/lustre/ll_fiemap.h"
54 #include "../include/lustre_fid.h"
55
56 #include "lov_internal.h"
57
58 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
59  * Any function that expects lov_tgts to remain stationary must take a ref.
60  */
61 static void lov_getref(struct obd_device *obd)
62 {
63         struct lov_obd *lov = &obd->u.lov;
64
65         /* nobody gets through here until lov_putref is done */
66         mutex_lock(&lov->lov_lock);
67         atomic_inc(&lov->lov_refcount);
68         mutex_unlock(&lov->lov_lock);
69         return;
70 }
71
72 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt);
73
74 static void lov_putref(struct obd_device *obd)
75 {
76         struct lov_obd *lov = &obd->u.lov;
77
78         mutex_lock(&lov->lov_lock);
79         /* ok to dec to 0 more than once -- ltd_exp's will be null */
80         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
81                 LIST_HEAD(kill);
82                 int i;
83                 struct lov_tgt_desc *tgt, *n;
84
85                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
86                        lov->lov_death_row);
87                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
88                         tgt = lov->lov_tgts[i];
89
90                         if (!tgt || !tgt->ltd_reap)
91                                 continue;
92                         list_add(&tgt->ltd_kill, &kill);
93                         /* XXX - right now there is a dependency on ld_tgt_count
94                          * being the maximum tgt index for computing the
95                          * mds_max_easize. So we can't shrink it.
96                          */
97                         lov_ost_pool_remove(&lov->lov_packed, i);
98                         lov->lov_tgts[i] = NULL;
99                         lov->lov_death_row--;
100                 }
101                 mutex_unlock(&lov->lov_lock);
102
103                 list_for_each_entry_safe(tgt, n, &kill, ltd_kill) {
104                         list_del(&tgt->ltd_kill);
105                         /* Disconnect */
106                         __lov_del_obd(obd, tgt);
107                 }
108
109                 if (lov->lov_tgts_kobj)
110                         kobject_put(lov->lov_tgts_kobj);
111
112         } else {
113                 mutex_unlock(&lov->lov_lock);
114         }
115 }
116
117 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
118                               enum obd_notify_event ev);
119 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
120                       enum obd_notify_event ev, void *data);
121
122 int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
123                     struct obd_connect_data *data)
124 {
125         struct lov_obd *lov = &obd->u.lov;
126         struct obd_uuid *tgt_uuid;
127         struct obd_device *tgt_obd;
128         static struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
129         struct obd_import *imp;
130         int rc;
131
132         if (!lov->lov_tgts[index])
133                 return -EINVAL;
134
135         tgt_uuid = &lov->lov_tgts[index]->ltd_uuid;
136         tgt_obd = lov->lov_tgts[index]->ltd_obd;
137
138         if (!tgt_obd->obd_set_up) {
139                 CERROR("Target %s not set up\n", obd_uuid2str(tgt_uuid));
140                 return -EINVAL;
141         }
142
143         /* override the sp_me from lov */
144         tgt_obd->u.cli.cl_sp_me = lov->lov_sp_me;
145
146         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
147                 data->ocd_index = index;
148
149         /*
150          * Divine LOV knows that OBDs under it are OSCs.
151          */
152         imp = tgt_obd->u.cli.cl_import;
153
154         if (activate) {
155                 tgt_obd->obd_no_recov = 0;
156                 /* FIXME this is probably supposed to be
157                  * ptlrpc_set_import_active.  Horrible naming.
158                  */
159                 ptlrpc_activate_import(imp);
160         }
161
162         rc = obd_register_observer(tgt_obd, obd);
163         if (rc) {
164                 CERROR("Target %s register_observer error %d\n",
165                        obd_uuid2str(tgt_uuid), rc);
166                 return rc;
167         }
168
169         if (imp->imp_invalid) {
170                 CDEBUG(D_CONFIG, "not connecting OSC %s; administratively disabled\n",
171                        obd_uuid2str(tgt_uuid));
172                 return 0;
173         }
174
175         rc = obd_connect(NULL, &lov->lov_tgts[index]->ltd_exp, tgt_obd,
176                          &lov_osc_uuid, data, NULL);
177         if (rc || !lov->lov_tgts[index]->ltd_exp) {
178                 CERROR("Target %s connect error %d\n",
179                        obd_uuid2str(tgt_uuid), rc);
180                 return -ENODEV;
181         }
182
183         lov->lov_tgts[index]->ltd_reap = 0;
184
185         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
186                obd_uuid2str(tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
187
188         if (lov->lov_tgts_kobj)
189                 /* Even if we failed, that's ok */
190                 rc = sysfs_create_link(lov->lov_tgts_kobj, &tgt_obd->obd_kobj,
191                                        tgt_obd->obd_name);
192
193         return 0;
194 }
195
196 static int lov_connect(const struct lu_env *env,
197                        struct obd_export **exp, struct obd_device *obd,
198                        struct obd_uuid *cluuid, struct obd_connect_data *data,
199                        void *localdata)
200 {
201         struct lov_obd *lov = &obd->u.lov;
202         struct lov_tgt_desc *tgt;
203         struct lustre_handle conn;
204         int i, rc;
205
206         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
207
208         rc = class_connect(&conn, obd, cluuid);
209         if (rc)
210                 return rc;
211
212         *exp = class_conn2export(&conn);
213
214         /* Why should there ever be more than 1 connect? */
215         lov->lov_connects++;
216         LASSERT(lov->lov_connects == 1);
217
218         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
219         if (data)
220                 lov->lov_ocd = *data;
221
222         obd_getref(obd);
223
224         lov->lov_tgts_kobj = kobject_create_and_add("target_obds",
225                                                     &obd->obd_kobj);
226
227         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
228                 tgt = lov->lov_tgts[i];
229                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
230                         continue;
231                 /* Flags will be lowest common denominator */
232                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
233                 if (rc) {
234                         CERROR("%s: lov connect tgt %d failed: %d\n",
235                                obd->obd_name, i, rc);
236                         continue;
237                 }
238                 /* connect to administrative disabled ost */
239                 if (!lov->lov_tgts[i]->ltd_exp)
240                         continue;
241
242                 rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
243                                 OBD_NOTIFY_CONNECT, (void *)&i);
244                 if (rc) {
245                         CERROR("%s error sending notify %d\n",
246                                obd->obd_name, rc);
247                 }
248         }
249         obd_putref(obd);
250
251         return 0;
252 }
253
254 static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
255 {
256         struct lov_obd *lov = &obd->u.lov;
257         struct obd_device *osc_obd;
258         int rc;
259
260         osc_obd = class_exp2obd(tgt->ltd_exp);
261         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
262                obd->obd_name, osc_obd ? osc_obd->obd_name : "NULL");
263
264         if (tgt->ltd_active) {
265                 tgt->ltd_active = 0;
266                 lov->desc.ld_active_tgt_count--;
267                 tgt->ltd_exp->exp_obd->obd_inactive = 1;
268         }
269
270         if (osc_obd) {
271                 if (lov->lov_tgts_kobj)
272                         sysfs_remove_link(lov->lov_tgts_kobj,
273                                           osc_obd->obd_name);
274
275                 /* Pass it on to our clients.
276                  * XXX This should be an argument to disconnect,
277                  * XXX not a back-door flag on the OBD.  Ah well.
278                  */
279                 osc_obd->obd_force = obd->obd_force;
280                 osc_obd->obd_fail = obd->obd_fail;
281                 osc_obd->obd_no_recov = obd->obd_no_recov;
282         }
283
284         obd_register_observer(osc_obd, NULL);
285
286         rc = obd_disconnect(tgt->ltd_exp);
287         if (rc) {
288                 CERROR("Target %s disconnect error %d\n",
289                        tgt->ltd_uuid.uuid, rc);
290                 rc = 0;
291         }
292
293         tgt->ltd_exp = NULL;
294         return 0;
295 }
296
297 static int lov_disconnect(struct obd_export *exp)
298 {
299         struct obd_device *obd = class_exp2obd(exp);
300         struct lov_obd *lov = &obd->u.lov;
301         int i, rc;
302
303         if (!lov->lov_tgts)
304                 goto out;
305
306         /* Only disconnect the underlying layers on the final disconnect. */
307         lov->lov_connects--;
308         if (lov->lov_connects != 0) {
309                 /* why should there be more than 1 connect? */
310                 CERROR("disconnect #%d\n", lov->lov_connects);
311                 goto out;
312         }
313
314         /* Let's hold another reference so lov_del_obd doesn't spin through
315          * putref every time
316          */
317         obd_getref(obd);
318
319         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
320                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
321                         /* Disconnection is the last we know about an obd */
322                         lov_del_target(obd, i, NULL, lov->lov_tgts[i]->ltd_gen);
323                 }
324         }
325
326         obd_putref(obd);
327
328 out:
329         rc = class_disconnect(exp); /* bz 9811 */
330         return rc;
331 }
332
333 /* Error codes:
334  *
335  *  -EINVAL  : UUID can't be found in the LOV's target list
336  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
337  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
338  *  any >= 0 : is log target index
339  */
340 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
341                               enum obd_notify_event ev)
342 {
343         struct lov_obd *lov = &obd->u.lov;
344         struct lov_tgt_desc *tgt;
345         int index, activate, active;
346
347         CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
348                lov, uuid->uuid, ev);
349
350         obd_getref(obd);
351         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
352                 tgt = lov->lov_tgts[index];
353                 if (!tgt)
354                         continue;
355                 /*
356                  * LU-642, initially inactive OSC could miss the obd_connect,
357                  * we make up for it here.
358                  */
359                 if (ev == OBD_NOTIFY_ACTIVATE && !tgt->ltd_exp &&
360                     obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
361                         struct obd_uuid lov_osc_uuid = {"LOV_OSC_UUID"};
362
363                         obd_connect(NULL, &tgt->ltd_exp, tgt->ltd_obd,
364                                     &lov_osc_uuid, &lov->lov_ocd, NULL);
365                 }
366                 if (!tgt->ltd_exp)
367                         continue;
368
369                 CDEBUG(D_INFO, "lov idx %d is %s conn %#llx\n",
370                        index, obd_uuid2str(&tgt->ltd_uuid),
371                        tgt->ltd_exp->exp_handle.h_cookie);
372                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
373                         break;
374         }
375
376         if (index == lov->desc.ld_tgt_count) {
377                 index = -EINVAL;
378                 goto out;
379         }
380
381         if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
382                 activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
383
384                 if (lov->lov_tgts[index]->ltd_activate == activate) {
385                         CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
386                                uuid->uuid, activate ? "" : "de");
387                 } else {
388                         lov->lov_tgts[index]->ltd_activate = activate;
389                         CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
390                                activate ? "" : "de", obd_uuid2str(uuid));
391                 }
392
393         } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
394                 active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
395
396                 if (lov->lov_tgts[index]->ltd_active == active) {
397                         CDEBUG(D_INFO, "OSC %s already %sactive!\n",
398                                uuid->uuid, active ? "" : "in");
399                         goto out;
400                 }
401                 CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
402                        obd_uuid2str(uuid), active ? "" : "in");
403
404                 lov->lov_tgts[index]->ltd_active = active;
405                 if (active) {
406                         lov->desc.ld_active_tgt_count++;
407                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
408                 } else {
409                         lov->desc.ld_active_tgt_count--;
410                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
411                 }
412         } else {
413                 CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid);
414         }
415
416  out:
417         obd_putref(obd);
418         return index;
419 }
420
421 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
422                       enum obd_notify_event ev, void *data)
423 {
424         int rc = 0;
425         struct lov_obd *lov = &obd->u.lov;
426
427         down_read(&lov->lov_notify_lock);
428         if (!lov->lov_connects) {
429                 up_read(&lov->lov_notify_lock);
430                 return rc;
431         }
432
433         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
434             ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
435                 struct obd_uuid *uuid;
436
437                 LASSERT(watched);
438
439                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
440                         up_read(&lov->lov_notify_lock);
441                         CERROR("unexpected notification of %s %s!\n",
442                                watched->obd_type->typ_name,
443                                watched->obd_name);
444                         return -EINVAL;
445                 }
446                 uuid = &watched->u.cli.cl_target_uuid;
447
448                 /* Set OSC as active before notifying the observer, so the
449                  * observer can use the OSC normally.
450                  */
451                 rc = lov_set_osc_active(obd, uuid, ev);
452                 if (rc < 0) {
453                         up_read(&lov->lov_notify_lock);
454                         CERROR("event(%d) of %s failed: %d\n", ev,
455                                obd_uuid2str(uuid), rc);
456                         return rc;
457                 }
458                 /* active event should be pass lov target index as data */
459                 data = &rc;
460         }
461
462         /* Pass the notification up the chain. */
463         if (watched) {
464                 rc = obd_notify_observer(obd, watched, ev, data);
465         } else {
466                 /* NULL watched means all osc's in the lov (only for syncs) */
467                 /* sync event should be send lov idx as data */
468                 struct lov_obd *lov = &obd->u.lov;
469                 int i, is_sync;
470
471                 data = &i;
472                 is_sync = (ev == OBD_NOTIFY_SYNC) ||
473                           (ev == OBD_NOTIFY_SYNC_NONBLOCK);
474
475                 obd_getref(obd);
476                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
477                         if (!lov->lov_tgts[i])
478                                 continue;
479
480                         /* don't send sync event if target not
481                          * connected/activated
482                          */
483                         if (is_sync &&  !lov->lov_tgts[i]->ltd_active)
484                                 continue;
485
486                         rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd,
487                                                  ev, data);
488                         if (rc) {
489                                 CERROR("%s: notify %s of %s failed %d\n",
490                                        obd->obd_name,
491                                        obd->obd_observer->obd_name,
492                                        lov->lov_tgts[i]->ltd_obd->obd_name,
493                                        rc);
494                         }
495                 }
496                 obd_putref(obd);
497         }
498
499         up_read(&lov->lov_notify_lock);
500         return rc;
501 }
502
503 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
504                           __u32 index, int gen, int active)
505 {
506         struct lov_obd *lov = &obd->u.lov;
507         struct lov_tgt_desc *tgt;
508         struct obd_device *tgt_obd;
509         int rc;
510
511         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
512                uuidp->uuid, index, gen, active);
513
514         if (gen <= 0) {
515                 CERROR("request to add OBD %s with invalid generation: %d\n",
516                        uuidp->uuid, gen);
517                 return -EINVAL;
518         }
519
520         tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME,
521                                         &obd->obd_uuid);
522         if (!tgt_obd)
523                 return -EINVAL;
524
525         mutex_lock(&lov->lov_lock);
526
527         if ((index < lov->lov_tgt_size) && lov->lov_tgts[index]) {
528                 tgt = lov->lov_tgts[index];
529                 CERROR("UUID %s already assigned at LOV target index %d\n",
530                        obd_uuid2str(&tgt->ltd_uuid), index);
531                 mutex_unlock(&lov->lov_lock);
532                 return -EEXIST;
533         }
534
535         if (index >= lov->lov_tgt_size) {
536                 /* We need to reallocate the lov target array. */
537                 struct lov_tgt_desc **newtgts, **old = NULL;
538                 __u32 newsize, oldsize = 0;
539
540                 newsize = max_t(__u32, lov->lov_tgt_size, 2);
541                 while (newsize < index + 1)
542                         newsize <<= 1;
543                 newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
544                 if (!newtgts) {
545                         mutex_unlock(&lov->lov_lock);
546                         return -ENOMEM;
547                 }
548
549                 if (lov->lov_tgt_size) {
550                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
551                                lov->lov_tgt_size);
552                         old = lov->lov_tgts;
553                         oldsize = lov->lov_tgt_size;
554                 }
555
556                 lov->lov_tgts = newtgts;
557                 lov->lov_tgt_size = newsize;
558                 smp_rmb();
559                 kfree(old);
560
561                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
562                        lov->lov_tgts, lov->lov_tgt_size);
563         }
564
565         tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
566         if (!tgt) {
567                 mutex_unlock(&lov->lov_lock);
568                 return -ENOMEM;
569         }
570
571         rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
572         if (rc) {
573                 mutex_unlock(&lov->lov_lock);
574                 kfree(tgt);
575                 return rc;
576         }
577
578         tgt->ltd_uuid = *uuidp;
579         tgt->ltd_obd = tgt_obd;
580         /* XXX - add a sanity check on the generation number. */
581         tgt->ltd_gen = gen;
582         tgt->ltd_index = index;
583         tgt->ltd_activate = active;
584         lov->lov_tgts[index] = tgt;
585         if (index >= lov->desc.ld_tgt_count)
586                 lov->desc.ld_tgt_count = index + 1;
587
588         mutex_unlock(&lov->lov_lock);
589
590         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
591                index, tgt->ltd_gen, lov->desc.ld_tgt_count);
592
593         rc = obd_notify(obd, tgt_obd, OBD_NOTIFY_CREATE, &index);
594
595         if (lov->lov_connects == 0) {
596                 /* lov_connect hasn't been called yet. We'll do the
597                  * lov_connect_obd on this target when that fn first runs,
598                  * because we don't know the connect flags yet.
599                  */
600                 return 0;
601         }
602
603         obd_getref(obd);
604
605         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
606         if (rc)
607                 goto out;
608
609         /* connect to administrative disabled ost */
610         if (!tgt->ltd_exp) {
611                 rc = 0;
612                 goto out;
613         }
614
615         if (lov->lov_cache) {
616                 rc = obd_set_info_async(NULL, tgt->ltd_exp,
617                                         sizeof(KEY_CACHE_SET), KEY_CACHE_SET,
618                                         sizeof(struct cl_client_cache),
619                                         lov->lov_cache, NULL);
620                 if (rc < 0)
621                         goto out;
622         }
623
624         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
625                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
626                         (void *)&index);
627
628 out:
629         if (rc) {
630                 CERROR("add failed (%d), deleting %s\n", rc,
631                        obd_uuid2str(&tgt->ltd_uuid));
632                 lov_del_target(obd, index, NULL, 0);
633         }
634         obd_putref(obd);
635         return rc;
636 }
637
638 /* Schedule a target for deletion */
639 int lov_del_target(struct obd_device *obd, __u32 index,
640                    struct obd_uuid *uuidp, int gen)
641 {
642         struct lov_obd *lov = &obd->u.lov;
643         int count = lov->desc.ld_tgt_count;
644         int rc = 0;
645
646         if (index >= count) {
647                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
648                        index, count);
649                 return -EINVAL;
650         }
651
652         /* to make sure there's no ongoing lov_notify() now */
653         down_write(&lov->lov_notify_lock);
654         obd_getref(obd);
655
656         if (!lov->lov_tgts[index]) {
657                 CERROR("LOV target at index %d is not setup.\n", index);
658                 rc = -EINVAL;
659                 goto out;
660         }
661
662         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
663                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
664                        lov_uuid2str(lov, index), index,
665                        obd_uuid2str(uuidp));
666                 rc = -EINVAL;
667                 goto out;
668         }
669
670         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
671                lov_uuid2str(lov, index), index,
672                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
673                lov->lov_tgts[index]->ltd_active);
674
675         lov->lov_tgts[index]->ltd_reap = 1;
676         lov->lov_death_row++;
677         /* we really delete it from obd_putref */
678 out:
679         obd_putref(obd);
680         up_write(&lov->lov_notify_lock);
681
682         return rc;
683 }
684
685 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
686 {
687         struct obd_device *osc_obd;
688
689         LASSERT(tgt);
690         LASSERT(tgt->ltd_reap);
691
692         osc_obd = class_exp2obd(tgt->ltd_exp);
693
694         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
695                tgt->ltd_uuid.uuid,
696                osc_obd ? osc_obd->obd_name : "<no obd>");
697
698         if (tgt->ltd_exp)
699                 lov_disconnect_obd(obd, tgt);
700
701         kfree(tgt);
702
703         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
704          * do it ourselves. And we can't do it from lov_cleanup,
705          * because we just lost our only reference to it.
706          */
707         if (osc_obd)
708                 class_manual_cleanup(osc_obd);
709 }
710
711 void lov_fix_desc_stripe_size(__u64 *val)
712 {
713         if (*val < LOV_MIN_STRIPE_SIZE) {
714                 if (*val != 0)
715                         LCONSOLE_INFO("Increasing default stripe size to minimum %u\n",
716                                       LOV_DESC_STRIPE_SIZE_DEFAULT);
717                 *val = LOV_DESC_STRIPE_SIZE_DEFAULT;
718         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
719                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
720                 LCONSOLE_WARN("Changing default stripe size to %llu (a multiple of %u)\n",
721                               *val, LOV_MIN_STRIPE_SIZE);
722         }
723 }
724
725 void lov_fix_desc_stripe_count(__u32 *val)
726 {
727         if (*val == 0)
728                 *val = 1;
729 }
730
731 void lov_fix_desc_pattern(__u32 *val)
732 {
733         /* from lov_setstripe */
734         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
735                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
736                 *val = 0;
737         }
738 }
739
740 void lov_fix_desc_qos_maxage(__u32 *val)
741 {
742         if (*val == 0)
743                 *val = LOV_DESC_QOS_MAXAGE_DEFAULT;
744 }
745
746 void lov_fix_desc(struct lov_desc *desc)
747 {
748         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
749         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
750         lov_fix_desc_pattern(&desc->ld_pattern);
751         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
752 }
753
754 int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
755 {
756         struct lprocfs_static_vars lvars = { NULL };
757         struct lov_desc *desc;
758         struct lov_obd *lov = &obd->u.lov;
759         int rc;
760
761         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
762                 CERROR("LOV setup requires a descriptor\n");
763                 return -EINVAL;
764         }
765
766         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
767
768         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
769                 CERROR("descriptor size wrong: %d > %d\n",
770                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
771                 return -EINVAL;
772         }
773
774         if (desc->ld_magic != LOV_DESC_MAGIC) {
775                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
776                         CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
777                                obd->obd_name, desc);
778                         lustre_swab_lov_desc(desc);
779                 } else {
780                         CERROR("%s: Bad lov desc magic: %#x\n",
781                                obd->obd_name, desc->ld_magic);
782                         return -EINVAL;
783                 }
784         }
785
786         lov_fix_desc(desc);
787
788         desc->ld_active_tgt_count = 0;
789         lov->desc = *desc;
790         lov->lov_tgt_size = 0;
791
792         mutex_init(&lov->lov_lock);
793         atomic_set(&lov->lov_refcount, 0);
794         lov->lov_sp_me = LUSTRE_SP_CLI;
795
796         init_rwsem(&lov->lov_notify_lock);
797
798         lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS,
799                                                    HASH_POOLS_MAX_BITS,
800                                                    HASH_POOLS_BKT_BITS, 0,
801                                                    CFS_HASH_MIN_THETA,
802                                                    CFS_HASH_MAX_THETA,
803                                                    &pool_hash_operations,
804                                                    CFS_HASH_DEFAULT);
805         INIT_LIST_HEAD(&lov->lov_pool_list);
806         lov->lov_pool_count = 0;
807         rc = lov_ost_pool_init(&lov->lov_packed, 0);
808         if (rc)
809                 goto out;
810
811         lprocfs_lov_init_vars(&lvars);
812         lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
813
814         rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
815                                  0444, &lov_proc_target_fops, obd);
816         if (rc)
817                 CWARN("Error adding the target_obd file\n");
818
819         lov->lov_pool_debugfs_entry = ldebugfs_register("pools",
820                                                      obd->obd_debugfs_entry,
821                                                      NULL, NULL);
822         return 0;
823
824 out:
825         return rc;
826 }
827
828 static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
829 {
830         struct lov_obd *lov = &obd->u.lov;
831
832         switch (stage) {
833         case OBD_CLEANUP_EARLY: {
834                 int i;
835
836                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
837                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
838                                 continue;
839                         obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
840                                        OBD_CLEANUP_EARLY);
841                 }
842                 break;
843         }
844         default:
845                 break;
846         }
847
848         return 0;
849 }
850
851 static int lov_cleanup(struct obd_device *obd)
852 {
853         struct lov_obd *lov = &obd->u.lov;
854         struct list_head *pos, *tmp;
855         struct pool_desc *pool;
856
857         list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
858                 pool = list_entry(pos, struct pool_desc, pool_list);
859                 /* free pool structs */
860                 CDEBUG(D_INFO, "delete pool %p\n", pool);
861                 /* In the function below, .hs_keycmp resolves to
862                  * pool_hashkey_keycmp()
863                  */
864                 /* coverity[overrun-buffer-val] */
865                 lov_pool_del(obd, pool->pool_name);
866         }
867         cfs_hash_putref(lov->lov_pools_hash_body);
868         lov_ost_pool_free(&lov->lov_packed);
869
870         lprocfs_obd_cleanup(obd);
871         if (lov->lov_tgts) {
872                 int i;
873
874                 obd_getref(obd);
875                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
876                         if (!lov->lov_tgts[i])
877                                 continue;
878
879                         /* Inactive targets may never have connected */
880                         if (lov->lov_tgts[i]->ltd_active ||
881                             atomic_read(&lov->lov_refcount))
882                             /* We should never get here - these
883                              * should have been removed in the
884                              * disconnect.
885                              */
886                                 CERROR("lov tgt %d not cleaned! deathrow=%d, lovrc=%d\n",
887                                        i, lov->lov_death_row,
888                                        atomic_read(&lov->lov_refcount));
889                         lov_del_target(obd, i, NULL, 0);
890                 }
891                 obd_putref(obd);
892                 kfree(lov->lov_tgts);
893                 lov->lov_tgt_size = 0;
894         }
895
896         if (lov->lov_cache) {
897                 cl_cache_decref(lov->lov_cache);
898                 lov->lov_cache = NULL;
899         }
900
901         return 0;
902 }
903
904 int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
905                             __u32 *indexp, int *genp)
906 {
907         struct obd_uuid obd_uuid;
908         int cmd;
909         int rc = 0;
910
911         switch (cmd = lcfg->lcfg_command) {
912         case LCFG_LOV_ADD_OBD:
913         case LCFG_LOV_ADD_INA:
914         case LCFG_LOV_DEL_OBD: {
915                 __u32 index;
916                 int gen;
917                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
918                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
919                         rc = -EINVAL;
920                         goto out;
921                 }
922
923                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
924
925                 rc = kstrtoint(lustre_cfg_buf(lcfg, 2), 10, indexp);
926                 if (rc < 0)
927                         goto out;
928                 rc = kstrtoint(lustre_cfg_buf(lcfg, 3), 10, genp);
929                 if (rc < 0)
930                         goto out;
931                 index = *indexp;
932                 gen = *genp;
933                 if (cmd == LCFG_LOV_ADD_OBD)
934                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
935                 else if (cmd == LCFG_LOV_ADD_INA)
936                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
937                 else
938                         rc = lov_del_target(obd, index, &obd_uuid, gen);
939                 goto out;
940         }
941         case LCFG_PARAM: {
942                 struct lprocfs_static_vars lvars = { NULL };
943                 struct lov_desc *desc = &(obd->u.lov.desc);
944
945                 if (!desc) {
946                         rc = -EINVAL;
947                         goto out;
948                 }
949
950                 lprocfs_lov_init_vars(&lvars);
951
952                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
953                                               lcfg, obd);
954                 if (rc > 0)
955                         rc = 0;
956                 goto out;
957         }
958         case LCFG_POOL_NEW:
959         case LCFG_POOL_ADD:
960         case LCFG_POOL_DEL:
961         case LCFG_POOL_REM:
962                 goto out;
963
964         default: {
965                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
966                 rc = -EINVAL;
967                 goto out;
968         }
969         }
970 out:
971         return rc;
972 }
973
974 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
975                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
976 {
977         struct lov_stripe_md *obj_mdp, *lsm;
978         struct lov_obd *lov = &exp->exp_obd->u.lov;
979         unsigned ost_idx;
980         int rc, i;
981
982         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
983                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
984
985         obj_mdp = kzalloc(sizeof(*obj_mdp), GFP_NOFS);
986         if (!obj_mdp)
987                 return -ENOMEM;
988
989         ost_idx = src_oa->o_nlink;
990         lsm = *ea;
991         if (!lsm) {
992                 rc = -EINVAL;
993                 goto out;
994         }
995         if (ost_idx >= lov->desc.ld_tgt_count ||
996             !lov->lov_tgts[ost_idx]) {
997                 rc = -EINVAL;
998                 goto out;
999         }
1000
1001         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1002                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1003
1004                 if (lov_oinfo_is_dummy(loi))
1005                         continue;
1006
1007                 if (loi->loi_ost_idx == ost_idx) {
1008                         if (ostid_id(&loi->loi_oi) != ostid_id(&src_oa->o_oi)) {
1009                                 rc = -EINVAL;
1010                                 goto out;
1011                         }
1012                         break;
1013                 }
1014         }
1015         if (i == lsm->lsm_stripe_count) {
1016                 rc = -EINVAL;
1017                 goto out;
1018         }
1019
1020         rc = obd_create(NULL, lov->lov_tgts[ost_idx]->ltd_exp,
1021                         src_oa, &obj_mdp, oti);
1022 out:
1023         kfree(obj_mdp);
1024         return rc;
1025 }
1026
1027 /* the LOV expects oa->o_id to be set to the LOV object id */
1028 static int lov_create(const struct lu_env *env, struct obd_export *exp,
1029                       struct obdo *src_oa, struct lov_stripe_md **ea,
1030                       struct obd_trans_info *oti)
1031 {
1032         struct lov_obd *lov;
1033         int rc = 0;
1034
1035         LASSERT(ea);
1036         if (!exp)
1037                 return -EINVAL;
1038
1039         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1040             src_oa->o_flags == OBD_FL_DELORPHAN) {
1041                 /* should be used with LOV anymore */
1042                 LBUG();
1043         }
1044
1045         lov = &exp->exp_obd->u.lov;
1046         if (!lov->desc.ld_active_tgt_count)
1047                 return -EIO;
1048
1049         obd_getref(exp->exp_obd);
1050         /* Recreate a specific object id at the given OST index */
1051         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1052             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1053                 rc = lov_recreate(exp, src_oa, ea, oti);
1054         }
1055
1056         obd_putref(exp->exp_obd);
1057         return rc;
1058 }
1059
1060 #define ASSERT_LSM_MAGIC(lsmp)                                            \
1061 do {                                                                        \
1062         LASSERT((lsmp));                                                \
1063         LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC_V1 ||                    \
1064                  (lsmp)->lsm_magic == LOV_MAGIC_V3),                        \
1065                  "%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic);            \
1066 } while (0)
1067
1068 static int lov_destroy(const struct lu_env *env, struct obd_export *exp,
1069                        struct obdo *oa, struct lov_stripe_md *lsm,
1070                        struct obd_trans_info *oti, struct obd_export *md_exp)
1071 {
1072         struct lov_request_set *set;
1073         struct obd_info oinfo;
1074         struct lov_request *req;
1075         struct lov_obd *lov;
1076         int rc = 0, err = 0;
1077
1078         ASSERT_LSM_MAGIC(lsm);
1079
1080         if (!exp || !exp->exp_obd)
1081                 return -ENODEV;
1082
1083         if (oa->o_valid & OBD_MD_FLCOOKIE) {
1084                 LASSERT(oti);
1085                 LASSERT(oti->oti_logcookies);
1086         }
1087
1088         lov = &exp->exp_obd->u.lov;
1089         obd_getref(exp->exp_obd);
1090         rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1091         if (rc)
1092                 goto out;
1093
1094         list_for_each_entry(req, &set->set_list, rq_link) {
1095                 if (oa->o_valid & OBD_MD_FLCOOKIE)
1096                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1097
1098                 err = obd_destroy(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1099                                   req->rq_oi.oi_oa, NULL, oti, NULL);
1100                 err = lov_update_common_set(set, req, err);
1101                 if (err) {
1102                         CERROR("%s: destroying objid "DOSTID" subobj "
1103                                DOSTID" on OST idx %d: rc = %d\n",
1104                                exp->exp_obd->obd_name, POSTID(&oa->o_oi),
1105                                POSTID(&req->rq_oi.oi_oa->o_oi),
1106                                req->rq_idx, err);
1107                         if (!rc)
1108                                 rc = err;
1109                 }
1110         }
1111
1112         if (rc == 0)
1113                 rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1114
1115         err = lov_fini_destroy_set(set);
1116 out:
1117         obd_putref(exp->exp_obd);
1118         return rc ? rc : err;
1119 }
1120
1121 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1122                                  void *data, int rc)
1123 {
1124         struct lov_request_set *lovset = (struct lov_request_set *)data;
1125         int err;
1126
1127         /* don't do attribute merge if this async op failed */
1128         if (rc)
1129                 atomic_set(&lovset->set_completes, 0);
1130         err = lov_fini_getattr_set(lovset);
1131         return rc ? rc : err;
1132 }
1133
1134 static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1135                              struct ptlrpc_request_set *rqset)
1136 {
1137         struct lov_request_set *lovset;
1138         struct lov_obd *lov;
1139         struct lov_request *req;
1140         int rc = 0, err;
1141
1142         LASSERT(oinfo);
1143         ASSERT_LSM_MAGIC(oinfo->oi_md);
1144
1145         if (!exp || !exp->exp_obd)
1146                 return -ENODEV;
1147
1148         lov = &exp->exp_obd->u.lov;
1149
1150         rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1151         if (rc)
1152                 return rc;
1153
1154         CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1155                POSTID(&oinfo->oi_md->lsm_oi), oinfo->oi_md->lsm_stripe_count,
1156                oinfo->oi_md->lsm_stripe_size);
1157
1158         list_for_each_entry(req, &lovset->set_list, rq_link) {
1159                 CDEBUG(D_INFO, "objid " DOSTID "[%d] has subobj " DOSTID " at idx%u\n",
1160                        POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1161                        POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1162                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1163                                        &req->rq_oi, rqset);
1164                 if (rc) {
1165                         CERROR("%s: getattr objid "DOSTID" subobj"
1166                                DOSTID" on OST idx %d: rc = %d\n",
1167                                exp->exp_obd->obd_name,
1168                                POSTID(&oinfo->oi_oa->o_oi),
1169                                POSTID(&req->rq_oi.oi_oa->o_oi),
1170                                req->rq_idx, rc);
1171                         goto out;
1172                 }
1173         }
1174
1175         if (!list_empty(&rqset->set_requests)) {
1176                 LASSERT(rc == 0);
1177                 LASSERT(!rqset->set_interpret);
1178                 rqset->set_interpret = lov_getattr_interpret;
1179                 rqset->set_arg = (void *)lovset;
1180                 return rc;
1181         }
1182 out:
1183         if (rc)
1184                 atomic_set(&lovset->set_completes, 0);
1185         err = lov_fini_getattr_set(lovset);
1186         return rc ? rc : err;
1187 }
1188
1189 static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1190                                  void *data, int rc)
1191 {
1192         struct lov_request_set *lovset = (struct lov_request_set *)data;
1193         int err;
1194
1195         if (rc)
1196                 atomic_set(&lovset->set_completes, 0);
1197         err = lov_fini_setattr_set(lovset);
1198         return rc ? rc : err;
1199 }
1200
1201 /* If @oti is given, the request goes from MDS and responses from OSTs are not
1202  * needed. Otherwise, a client is waiting for responses.
1203  */
1204 static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1205                              struct obd_trans_info *oti,
1206                              struct ptlrpc_request_set *rqset)
1207 {
1208         struct lov_request_set *set;
1209         struct lov_request *req;
1210         struct lov_obd *lov;
1211         int rc = 0;
1212
1213         LASSERT(oinfo);
1214         ASSERT_LSM_MAGIC(oinfo->oi_md);
1215         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1216                 LASSERT(oti);
1217                 LASSERT(oti->oti_logcookies);
1218         }
1219
1220         if (!exp || !exp->exp_obd)
1221                 return -ENODEV;
1222
1223         lov = &exp->exp_obd->u.lov;
1224         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1225         if (rc)
1226                 return rc;
1227
1228         CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1229                POSTID(&oinfo->oi_md->lsm_oi),
1230                oinfo->oi_md->lsm_stripe_count,
1231                oinfo->oi_md->lsm_stripe_size);
1232
1233         list_for_each_entry(req, &set->set_list, rq_link) {
1234                 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1235                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1236
1237                 CDEBUG(D_INFO, "objid " DOSTID "[%d] has subobj " DOSTID " at idx%u\n",
1238                        POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1239                        POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1240
1241                 rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1242                                        &req->rq_oi, oti, rqset);
1243                 if (rc) {
1244                         CERROR("error: setattr objid "DOSTID" subobj"
1245                                DOSTID" on OST idx %d: rc = %d\n",
1246                                POSTID(&set->set_oi->oi_oa->o_oi),
1247                                POSTID(&req->rq_oi.oi_oa->o_oi),
1248                                req->rq_idx, rc);
1249                         break;
1250                 }
1251         }
1252
1253         /* If we are not waiting for responses on async requests, return. */
1254         if (rc || !rqset || list_empty(&rqset->set_requests)) {
1255                 int err;
1256
1257                 if (rc)
1258                         atomic_set(&set->set_completes, 0);
1259                 err = lov_fini_setattr_set(set);
1260                 return rc ? rc : err;
1261         }
1262
1263         LASSERT(!rqset->set_interpret);
1264         rqset->set_interpret = lov_setattr_interpret;
1265         rqset->set_arg = (void *)set;
1266
1267         return 0;
1268 }
1269
1270 /* find any ldlm lock of the inode in lov
1271  * return 0    not find
1272  *      1    find one
1273  *      < 0    error
1274  */
1275 static int lov_find_cbdata(struct obd_export *exp,
1276                            struct lov_stripe_md *lsm, ldlm_iterator_t it,
1277                            void *data)
1278 {
1279         struct lov_obd *lov;
1280         int rc = 0, i;
1281
1282         ASSERT_LSM_MAGIC(lsm);
1283
1284         if (!exp || !exp->exp_obd)
1285                 return -ENODEV;
1286
1287         lov = &exp->exp_obd->u.lov;
1288         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1289                 struct lov_stripe_md submd;
1290                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1291
1292                 if (lov_oinfo_is_dummy(loi))
1293                         continue;
1294
1295                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
1296                         CDEBUG(D_HA, "lov idx %d NULL\n", loi->loi_ost_idx);
1297                         continue;
1298                 }
1299
1300                 submd.lsm_oi = loi->loi_oi;
1301                 submd.lsm_stripe_count = 0;
1302                 rc = obd_find_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1303                                      &submd, it, data);
1304                 if (rc != 0)
1305                         return rc;
1306         }
1307         return rc;
1308 }
1309
1310 int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
1311 {
1312         struct lov_request_set *lovset = (struct lov_request_set *)data;
1313         int err;
1314
1315         if (rc)
1316                 atomic_set(&lovset->set_completes, 0);
1317
1318         err = lov_fini_statfs_set(lovset);
1319         return rc ? rc : err;
1320 }
1321
1322 static int lov_statfs_async(struct obd_export *exp, struct obd_info *oinfo,
1323                             __u64 max_age, struct ptlrpc_request_set *rqset)
1324 {
1325         struct obd_device      *obd = class_exp2obd(exp);
1326         struct lov_request_set *set;
1327         struct lov_request *req;
1328         struct lov_obd *lov;
1329         int rc = 0;
1330
1331         LASSERT(oinfo->oi_osfs);
1332
1333         lov = &obd->u.lov;
1334         rc = lov_prep_statfs_set(obd, oinfo, &set);
1335         if (rc)
1336                 return rc;
1337
1338         list_for_each_entry(req, &set->set_list, rq_link) {
1339                 rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1340                                       &req->rq_oi, max_age, rqset);
1341                 if (rc)
1342                         break;
1343         }
1344
1345         if (rc || list_empty(&rqset->set_requests)) {
1346                 int err;
1347
1348                 if (rc)
1349                         atomic_set(&set->set_completes, 0);
1350                 err = lov_fini_statfs_set(set);
1351                 return rc ? rc : err;
1352         }
1353
1354         LASSERT(!rqset->set_interpret);
1355         rqset->set_interpret = lov_statfs_interpret;
1356         rqset->set_arg = (void *)set;
1357         return 0;
1358 }
1359
1360 static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
1361                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1362 {
1363         struct ptlrpc_request_set *set = NULL;
1364         struct obd_info oinfo = { };
1365         int rc = 0;
1366
1367         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
1368          * statfs requests
1369          */
1370         set = ptlrpc_prep_set();
1371         if (!set)
1372                 return -ENOMEM;
1373
1374         oinfo.oi_osfs = osfs;
1375         oinfo.oi_flags = flags;
1376         rc = lov_statfs_async(exp, &oinfo, max_age, set);
1377         if (rc == 0)
1378                 rc = ptlrpc_set_wait(set);
1379         ptlrpc_set_destroy(set);
1380
1381         return rc;
1382 }
1383
1384 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1385                          void *karg, void __user *uarg)
1386 {
1387         struct obd_device *obddev = class_exp2obd(exp);
1388         struct lov_obd *lov = &obddev->u.lov;
1389         int i = 0, rc = 0, count = lov->desc.ld_tgt_count;
1390         struct obd_uuid *uuidp;
1391
1392         switch (cmd) {
1393         case IOC_OBD_STATFS: {
1394                 struct obd_ioctl_data *data = karg;
1395                 struct obd_device *osc_obd;
1396                 struct obd_statfs stat_buf = {0};
1397                 __u32 index;
1398                 __u32 flags;
1399
1400                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1401                 if (index >= count)
1402                         return -ENODEV;
1403
1404                 if (!lov->lov_tgts[index])
1405                         /* Try again with the next index */
1406                         return -EAGAIN;
1407                 if (!lov->lov_tgts[index]->ltd_active)
1408                         return -ENODATA;
1409
1410                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
1411                 if (!osc_obd)
1412                         return -EINVAL;
1413
1414                 /* copy UUID */
1415                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
1416                                  min((int)data->ioc_plen2,
1417                                      (int)sizeof(struct obd_uuid))))
1418                         return -EFAULT;
1419
1420                 memcpy(&flags, data->ioc_inlbuf1, sizeof(__u32));
1421                 flags = flags & LL_STATFS_NODELAY ? OBD_STATFS_NODELAY : 0;
1422
1423                 /* got statfs data */
1424                 rc = obd_statfs(NULL, lov->lov_tgts[index]->ltd_exp, &stat_buf,
1425                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1426                                 flags);
1427                 if (rc)
1428                         return rc;
1429                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1430                                  min((int)data->ioc_plen1,
1431                                      (int)sizeof(stat_buf))))
1432                         return -EFAULT;
1433                 break;
1434         }
1435         case OBD_IOC_LOV_GET_CONFIG: {
1436                 struct obd_ioctl_data *data;
1437                 struct lov_desc *desc;
1438                 char *buf = NULL;
1439                 __u32 *genp;
1440
1441                 len = 0;
1442                 if (obd_ioctl_getdata(&buf, &len, uarg))
1443                         return -EINVAL;
1444
1445                 data = (struct obd_ioctl_data *)buf;
1446
1447                 if (sizeof(*desc) > data->ioc_inllen1) {
1448                         obd_ioctl_freedata(buf, len);
1449                         return -EINVAL;
1450                 }
1451
1452                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1453                         obd_ioctl_freedata(buf, len);
1454                         return -EINVAL;
1455                 }
1456
1457                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1458                         obd_ioctl_freedata(buf, len);
1459                         return -EINVAL;
1460                 }
1461
1462                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1463                 memcpy(desc, &(lov->desc), sizeof(*desc));
1464
1465                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1466                 genp = (__u32 *)data->ioc_inlbuf3;
1467                 /* the uuid will be empty for deleted OSTs */
1468                 for (i = 0; i < count; i++, uuidp++, genp++) {
1469                         if (!lov->lov_tgts[i])
1470                                 continue;
1471                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
1472                         *genp = lov->lov_tgts[i]->ltd_gen;
1473                 }
1474
1475                 if (copy_to_user(uarg, buf, len))
1476                         rc = -EFAULT;
1477                 obd_ioctl_freedata(buf, len);
1478                 break;
1479         }
1480         case LL_IOC_LOV_GETSTRIPE:
1481                 rc = lov_getstripe(exp, karg, uarg);
1482                 break;
1483         case OBD_IOC_QUOTACTL: {
1484                 struct if_quotactl *qctl = karg;
1485                 struct lov_tgt_desc *tgt = NULL;
1486                 struct obd_quotactl *oqctl;
1487
1488                 if (qctl->qc_valid == QC_OSTIDX) {
1489                         if (count <= qctl->qc_idx)
1490                                 return -EINVAL;
1491
1492                         tgt = lov->lov_tgts[qctl->qc_idx];
1493                         if (!tgt || !tgt->ltd_exp)
1494                                 return -EINVAL;
1495                 } else if (qctl->qc_valid == QC_UUID) {
1496                         for (i = 0; i < count; i++) {
1497                                 tgt = lov->lov_tgts[i];
1498                                 if (!tgt ||
1499                                     !obd_uuid_equals(&tgt->ltd_uuid,
1500                                                      &qctl->obd_uuid))
1501                                         continue;
1502
1503                                 if (!tgt->ltd_exp)
1504                                         return -EINVAL;
1505
1506                                 break;
1507                         }
1508                 } else {
1509                         return -EINVAL;
1510                 }
1511
1512                 if (i >= count)
1513                         return -EAGAIN;
1514
1515                 LASSERT(tgt && tgt->ltd_exp);
1516                 oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
1517                 if (!oqctl)
1518                         return -ENOMEM;
1519
1520                 QCTL_COPY(oqctl, qctl);
1521                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
1522                 if (rc == 0) {
1523                         QCTL_COPY(qctl, oqctl);
1524                         qctl->qc_valid = QC_OSTIDX;
1525                         qctl->obd_uuid = tgt->ltd_uuid;
1526                 }
1527                 kfree(oqctl);
1528                 break;
1529         }
1530         default: {
1531                 int set = 0;
1532
1533                 if (count == 0)
1534                         return -ENOTTY;
1535
1536                 for (i = 0; i < count; i++) {
1537                         int err;
1538                         struct obd_device *osc_obd;
1539
1540                         /* OST was disconnected */
1541                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
1542                                 continue;
1543
1544                         /* ll_umount_begin() sets force flag but for lov, not
1545                          * osc. Let's pass it through
1546                          */
1547                         osc_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
1548                         osc_obd->obd_force = obddev->obd_force;
1549                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
1550                                             len, karg, uarg);
1551                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK)
1552                                 return err;
1553                         if (err) {
1554                                 if (lov->lov_tgts[i]->ltd_active) {
1555                                         CDEBUG(err == -ENOTTY ?
1556                                                D_IOCTL : D_WARNING,
1557                                                "iocontrol OSC %s on OST idx %d cmd %x: err = %d\n",
1558                                                lov_uuid2str(lov, i),
1559                                                i, cmd, err);
1560                                         if (!rc)
1561                                                 rc = err;
1562                                 }
1563                         } else {
1564                                 set = 1;
1565                         }
1566                 }
1567                 if (!set && !rc)
1568                         rc = -EIO;
1569         }
1570         }
1571
1572         return rc;
1573 }
1574
1575 #define FIEMAP_BUFFER_SIZE 4096
1576
1577 /**
1578  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1579  * call. The local end offset and the device are sent in the first
1580  * fm_extent. This function calculates the stripe number from the index.
1581  * This function returns a stripe_no on which mapping is to be restarted.
1582  *
1583  * This function returns fm_end_offset which is the in-OST offset at which
1584  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1585  * will re-calculate proper offset in next stripe.
1586  * Note that the first extent is passed to lov_get_info via the value field.
1587  *
1588  * \param fiemap fiemap request header
1589  * \param lsm striping information for the file
1590  * \param fm_start logical start of mapping
1591  * \param fm_end logical end of mapping
1592  * \param start_stripe starting stripe will be returned in this
1593  */
1594 static u64 fiemap_calc_fm_end_offset(struct ll_user_fiemap *fiemap,
1595                                      struct lov_stripe_md *lsm, u64 fm_start,
1596                                      u64 fm_end, int *start_stripe)
1597 {
1598         u64 local_end = fiemap->fm_extents[0].fe_logical;
1599         u64 lun_start, lun_end;
1600         u64 fm_end_offset;
1601         int stripe_no = -1, i;
1602
1603         if (fiemap->fm_extent_count == 0 ||
1604             fiemap->fm_extents[0].fe_logical == 0)
1605                 return 0;
1606
1607         /* Find out stripe_no from ost_index saved in the fe_device */
1608         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1609                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1610
1611                 if (lov_oinfo_is_dummy(oinfo))
1612                         continue;
1613
1614                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1615                         stripe_no = i;
1616                         break;
1617                 }
1618         }
1619         if (stripe_no == -1)
1620                 return -EINVAL;
1621
1622         /* If we have finished mapping on previous device, shift logical
1623          * offset to start of next device
1624          */
1625         if ((lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1626                                    &lun_start, &lun_end)) != 0 &&
1627                                    local_end < lun_end) {
1628                 fm_end_offset = local_end;
1629                 *start_stripe = stripe_no;
1630         } else {
1631                 /* This is a special value to indicate that caller should
1632                  * calculate offset in next stripe.
1633                  */
1634                 fm_end_offset = 0;
1635                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1636         }
1637
1638         return fm_end_offset;
1639 }
1640
1641 /**
1642  * We calculate on which OST the mapping will end. If the length of mapping
1643  * is greater than (stripe_size * stripe_count) then the last_stripe will
1644  * will be one just before start_stripe. Else we check if the mapping
1645  * intersects each OST and find last_stripe.
1646  * This function returns the last_stripe and also sets the stripe_count
1647  * over which the mapping is spread
1648  *
1649  * \param lsm striping information for the file
1650  * \param fm_start logical start of mapping
1651  * \param fm_end logical end of mapping
1652  * \param start_stripe starting stripe of the mapping
1653  * \param stripe_count the number of stripes across which to map is returned
1654  *
1655  * \retval last_stripe return the last stripe of the mapping
1656  */
1657 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, u64 fm_start,
1658                                    u64 fm_end, int start_stripe,
1659                                    int *stripe_count)
1660 {
1661         int last_stripe;
1662         u64 obd_start, obd_end;
1663         int i, j;
1664
1665         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1666                 last_stripe = start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1667                                                               start_stripe - 1;
1668                 *stripe_count = lsm->lsm_stripe_count;
1669         } else {
1670                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1671                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
1672                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1673                                                    &obd_start, &obd_end)) == 0)
1674                                 break;
1675                 }
1676                 *stripe_count = j;
1677                 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1678         }
1679
1680         return last_stripe;
1681 }
1682
1683 /**
1684  * Set fe_device and copy extents from local buffer into main return buffer.
1685  *
1686  * \param fiemap fiemap request header
1687  * \param lcl_fm_ext array of local fiemap extents to be copied
1688  * \param ost_index OST index to be written into the fm_device field for each
1689                     extent
1690  * \param ext_count number of extents to be copied
1691  * \param current_extent where to start copying in main extent array
1692  */
1693 static void fiemap_prepare_and_copy_exts(struct ll_user_fiemap *fiemap,
1694                                          struct ll_fiemap_extent *lcl_fm_ext,
1695                                          int ost_index, unsigned int ext_count,
1696                                          int current_extent)
1697 {
1698         char *to;
1699         int ext;
1700
1701         for (ext = 0; ext < ext_count; ext++) {
1702                 lcl_fm_ext[ext].fe_device = ost_index;
1703                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1704         }
1705
1706         /* Copy fm_extent's from fm_local to return buffer */
1707         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1708         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
1709 }
1710
1711 /**
1712  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1713  * This also handles the restarting of FIEMAP calls in case mapping overflows
1714  * the available number of extents in single call.
1715  */
1716 static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
1717                       __u32 *vallen, void *val, struct lov_stripe_md *lsm)
1718 {
1719         struct ll_fiemap_info_key *fm_key = key;
1720         struct ll_user_fiemap *fiemap = val;
1721         struct ll_user_fiemap *fm_local = NULL;
1722         struct ll_fiemap_extent *lcl_fm_ext;
1723         int count_local;
1724         unsigned int get_num_extents = 0;
1725         int ost_index = 0, actual_start_stripe, start_stripe;
1726         u64 fm_start, fm_end, fm_length, fm_end_offset;
1727         u64 curr_loc;
1728         int current_extent = 0, rc = 0, i;
1729         int ost_eof = 0; /* EOF for object */
1730         int ost_done = 0; /* done with required mapping for this OST? */
1731         int last_stripe;
1732         int cur_stripe = 0, cur_stripe_wrap = 0, stripe_count;
1733         unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
1734
1735         if (!lsm_has_objects(lsm)) {
1736                 if (lsm && lsm_is_released(lsm) && (fm_key->fiemap.fm_start <
1737                     fm_key->oa.o_size)) {
1738                         /*
1739                          * released file, return a minimal FIEMAP if
1740                          * request fits in file-size.
1741                          */
1742                         fiemap->fm_mapped_extents = 1;
1743                         fiemap->fm_extents[0].fe_logical =
1744                                         fm_key->fiemap.fm_start;
1745                         if (fm_key->fiemap.fm_start + fm_key->fiemap.fm_length <
1746                             fm_key->oa.o_size) {
1747                                 fiemap->fm_extents[0].fe_length =
1748                                         fm_key->fiemap.fm_length;
1749                         } else {
1750                                 fiemap->fm_extents[0].fe_length =
1751                                         fm_key->oa.o_size - fm_key->fiemap.fm_start;
1752                                 fiemap->fm_extents[0].fe_flags |=
1753                                                 (FIEMAP_EXTENT_UNKNOWN |
1754                                                  FIEMAP_EXTENT_LAST);
1755                         }
1756                 }
1757                 rc = 0;
1758                 goto out;
1759         }
1760
1761         if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
1762                 buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
1763
1764         fm_local = libcfs_kvzalloc(buffer_size, GFP_NOFS);
1765         if (!fm_local) {
1766                 rc = -ENOMEM;
1767                 goto out;
1768         }
1769         lcl_fm_ext = &fm_local->fm_extents[0];
1770
1771         count_local = fiemap_size_to_count(buffer_size);
1772
1773         memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap));
1774         fm_start = fiemap->fm_start;
1775         fm_length = fiemap->fm_length;
1776         /* Calculate start stripe, last stripe and length of mapping */
1777         start_stripe = lov_stripe_number(lsm, fm_start);
1778         actual_start_stripe = start_stripe;
1779         fm_end = (fm_length == ~0ULL ? fm_key->oa.o_size :
1780                                                 fm_start + fm_length - 1);
1781         /* If fm_length != ~0ULL but fm_start+fm_length-1 exceeds file size */
1782         if (fm_end > fm_key->oa.o_size)
1783                 fm_end = fm_key->oa.o_size;
1784
1785         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1786                                               actual_start_stripe,
1787                                               &stripe_count);
1788
1789         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start,
1790                                                   fm_end, &start_stripe);
1791         if (fm_end_offset == -EINVAL) {
1792                 rc = -EINVAL;
1793                 goto out;
1794         }
1795
1796         if (fiemap_count_to_size(fiemap->fm_extent_count) > *vallen)
1797                 fiemap->fm_extent_count = fiemap_size_to_count(*vallen);
1798         if (fiemap->fm_extent_count == 0) {
1799                 get_num_extents = 1;
1800                 count_local = 0;
1801         }
1802         /* Check each stripe */
1803         for (cur_stripe = start_stripe, i = 0; i < stripe_count;
1804              i++, cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1805                 u64 req_fm_len; /* Stores length of required mapping */
1806                 u64 len_mapped_single_call;
1807                 u64 lun_start, lun_end, obd_object_end;
1808                 unsigned int ext_count;
1809
1810                 cur_stripe_wrap = cur_stripe;
1811
1812                 /* Find out range of mapping on this stripe */
1813                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1814                                            &lun_start, &obd_object_end)) == 0)
1815                         continue;
1816
1817                 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe])) {
1818                         rc = -EIO;
1819                         goto out;
1820                 }
1821
1822                 /* If this is a continuation FIEMAP call and we are on
1823                  * starting stripe then lun_start needs to be set to
1824                  * fm_end_offset
1825                  */
1826                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
1827                         lun_start = fm_end_offset;
1828
1829                 if (fm_length != ~0ULL) {
1830                         /* Handle fm_start + fm_length overflow */
1831                         if (fm_start + fm_length < fm_start)
1832                                 fm_length = ~0ULL - fm_start;
1833                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1834                                                      cur_stripe);
1835                 } else {
1836                         lun_end = ~0ULL;
1837                 }
1838
1839                 if (lun_start == lun_end)
1840                         continue;
1841
1842                 req_fm_len = obd_object_end - lun_start;
1843                 fm_local->fm_length = 0;
1844                 len_mapped_single_call = 0;
1845
1846                 /* If the output buffer is very large and the objects have many
1847                  * extents we may need to loop on a single OST repeatedly
1848                  */
1849                 ost_eof = 0;
1850                 ost_done = 0;
1851                 do {
1852                         if (get_num_extents == 0) {
1853                                 /* Don't get too many extents. */
1854                                 if (current_extent + count_local >
1855                                     fiemap->fm_extent_count)
1856                                         count_local = fiemap->fm_extent_count -
1857                                                                  current_extent;
1858                         }
1859
1860                         lun_start += len_mapped_single_call;
1861                         fm_local->fm_length = req_fm_len - len_mapped_single_call;
1862                         req_fm_len = fm_local->fm_length;
1863                         fm_local->fm_extent_count = count_local;
1864                         fm_local->fm_mapped_extents = 0;
1865                         fm_local->fm_flags = fiemap->fm_flags;
1866
1867                         fm_key->oa.o_oi = lsm->lsm_oinfo[cur_stripe]->loi_oi;
1868                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1869
1870                         if (ost_index < 0 ||
1871                             ost_index >= lov->desc.ld_tgt_count) {
1872                                 rc = -EINVAL;
1873                                 goto out;
1874                         }
1875
1876                         /* If OST is inactive, return extent with UNKNOWN flag */
1877                         if (!lov->lov_tgts[ost_index]->ltd_active) {
1878                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1879                                 fm_local->fm_mapped_extents = 1;
1880
1881                                 lcl_fm_ext[0].fe_logical = lun_start;
1882                                 lcl_fm_ext[0].fe_length = obd_object_end -
1883                                                                       lun_start;
1884                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1885
1886                                 goto inactive_tgt;
1887                         }
1888
1889                         fm_local->fm_start = lun_start;
1890                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1891                         memcpy(&fm_key->fiemap, fm_local, sizeof(*fm_local));
1892                         *vallen = fiemap_count_to_size(fm_local->fm_extent_count);
1893                         rc = obd_get_info(NULL,
1894                                           lov->lov_tgts[ost_index]->ltd_exp,
1895                                           keylen, key, vallen, fm_local, lsm);
1896                         if (rc != 0)
1897                                 goto out;
1898
1899 inactive_tgt:
1900                         ext_count = fm_local->fm_mapped_extents;
1901                         if (ext_count == 0) {
1902                                 ost_done = 1;
1903                                 /* If last stripe has hole at the end,
1904                                  * then we need to return
1905                                  */
1906                                 if (cur_stripe_wrap == last_stripe) {
1907                                         fiemap->fm_mapped_extents = 0;
1908                                         goto finish;
1909                                 }
1910                                 break;
1911                         }
1912
1913                         /* If we just need num of extents then go to next device */
1914                         if (get_num_extents) {
1915                                 current_extent += ext_count;
1916                                 break;
1917                         }
1918
1919                         len_mapped_single_call = lcl_fm_ext[ext_count-1].fe_logical -
1920                                   lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1921
1922                         /* Have we finished mapping on this device? */
1923                         if (req_fm_len <= len_mapped_single_call)
1924                                 ost_done = 1;
1925
1926                         /* Clear the EXTENT_LAST flag which can be present on
1927                          * last extent
1928                          */
1929                         if (lcl_fm_ext[ext_count-1].fe_flags & FIEMAP_EXTENT_LAST)
1930                                 lcl_fm_ext[ext_count - 1].fe_flags &=
1931                                                             ~FIEMAP_EXTENT_LAST;
1932
1933                         curr_loc = lov_stripe_size(lsm,
1934                                            lcl_fm_ext[ext_count - 1].fe_logical+
1935                                            lcl_fm_ext[ext_count - 1].fe_length,
1936                                            cur_stripe);
1937                         if (curr_loc >= fm_key->oa.o_size)
1938                                 ost_eof = 1;
1939
1940                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1941                                                      ost_index, ext_count,
1942                                                      current_extent);
1943
1944                         current_extent += ext_count;
1945
1946                         /* Ran out of available extents? */
1947                         if (current_extent >= fiemap->fm_extent_count)
1948                                 goto finish;
1949                 } while (ost_done == 0 && ost_eof == 0);
1950
1951                 if (cur_stripe_wrap == last_stripe)
1952                         goto finish;
1953         }
1954
1955 finish:
1956         /* Indicate that we are returning device offsets unless file just has
1957          * single stripe
1958          */
1959         if (lsm->lsm_stripe_count > 1)
1960                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1961
1962         if (get_num_extents)
1963                 goto skip_last_device_calc;
1964
1965         /* Check if we have reached the last stripe and whether mapping for that
1966          * stripe is done.
1967          */
1968         if (cur_stripe_wrap == last_stripe) {
1969                 if (ost_done || ost_eof)
1970                         fiemap->fm_extents[current_extent - 1].fe_flags |=
1971                                                              FIEMAP_EXTENT_LAST;
1972         }
1973
1974 skip_last_device_calc:
1975         fiemap->fm_mapped_extents = current_extent;
1976
1977 out:
1978         kvfree(fm_local);
1979         return rc;
1980 }
1981
1982 static int lov_get_info(const struct lu_env *env, struct obd_export *exp,
1983                         __u32 keylen, void *key, __u32 *vallen, void *val,
1984                         struct lov_stripe_md *lsm)
1985 {
1986         struct obd_device *obddev = class_exp2obd(exp);
1987         struct lov_obd *lov = &obddev->u.lov;
1988         int i, rc;
1989
1990         if (!vallen || !val)
1991                 return -EFAULT;
1992
1993         obd_getref(obddev);
1994
1995         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
1996                 struct {
1997                         char name[16];
1998                         struct ldlm_lock *lock;
1999                 } *data = key;
2000                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2001                 struct lov_oinfo *loi;
2002                 __u32 *stripe = val;
2003
2004                 if (*vallen < sizeof(*stripe)) {
2005                         rc = -EFAULT;
2006                         goto out;
2007                 }
2008                 *vallen = sizeof(*stripe);
2009
2010                 /* XXX This is another one of those bits that will need to
2011                  * change if we ever actually support nested LOVs.  It uses
2012                  * the lock's export to find out which stripe it is.
2013                  */
2014                 /* XXX - it's assumed all the locks for deleted OSTs have
2015                  * been cancelled. Also, the export for deleted OSTs will
2016                  * be NULL and won't match the lock's export.
2017                  */
2018                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2019                         loi = lsm->lsm_oinfo[i];
2020                         if (lov_oinfo_is_dummy(loi))
2021                                 continue;
2022
2023                         if (!lov->lov_tgts[loi->loi_ost_idx])
2024                                 continue;
2025                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2026                             data->lock->l_conn_export &&
2027                             ostid_res_name_eq(&loi->loi_oi, res_id)) {
2028                                 *stripe = i;
2029                                 rc = 0;
2030                                 goto out;
2031                         }
2032                 }
2033                 LDLM_ERROR(data->lock, "lock on inode without such object");
2034                 dump_lsm(D_ERROR, lsm);
2035                 rc = -ENXIO;
2036                 goto out;
2037         } else if (KEY_IS(KEY_LAST_ID)) {
2038                 struct obd_id_info *info = val;
2039                 __u32 size = sizeof(u64);
2040                 struct lov_tgt_desc *tgt;
2041
2042                 LASSERT(*vallen == sizeof(struct obd_id_info));
2043                 tgt = lov->lov_tgts[info->idx];
2044
2045                 if (!tgt || !tgt->ltd_active) {
2046                         rc = -ESRCH;
2047                         goto out;
2048                 }
2049
2050                 rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2051                                   &size, info->data, NULL);
2052                 rc = 0;
2053                 goto out;
2054         } else if (KEY_IS(KEY_LOVDESC)) {
2055                 struct lov_desc *desc_ret = val;
2056                 *desc_ret = lov->desc;
2057
2058                 rc = 0;
2059                 goto out;
2060         } else if (KEY_IS(KEY_FIEMAP)) {
2061                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
2062                 goto out;
2063         } else if (KEY_IS(KEY_CONNECT_FLAG)) {
2064                 struct lov_tgt_desc *tgt;
2065                 __u64 ost_idx = *((__u64 *)val);
2066
2067                 LASSERT(*vallen == sizeof(__u64));
2068                 LASSERT(ost_idx < lov->desc.ld_tgt_count);
2069                 tgt = lov->lov_tgts[ost_idx];
2070
2071                 if (!tgt || !tgt->ltd_exp) {
2072                         rc = -ESRCH;
2073                         goto out;
2074                 }
2075
2076                 *((__u64 *)val) = exp_connect_flags(tgt->ltd_exp);
2077                 rc = 0;
2078                 goto out;
2079         } else if (KEY_IS(KEY_TGT_COUNT)) {
2080                 *((int *)val) = lov->desc.ld_tgt_count;
2081                 rc = 0;
2082                 goto out;
2083         }
2084
2085         rc = -EINVAL;
2086
2087 out:
2088         obd_putref(obddev);
2089         return rc;
2090 }
2091
2092 static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
2093                               u32 keylen, void *key, u32 vallen,
2094                               void *val, struct ptlrpc_request_set *set)
2095 {
2096         struct obd_device *obddev = class_exp2obd(exp);
2097         struct lov_obd *lov = &obddev->u.lov;
2098         u32 count;
2099         int i, rc = 0, err;
2100         struct lov_tgt_desc *tgt;
2101         unsigned int incr = 0, check_uuid = 0, do_inactive = 0, no_set = 0;
2102         unsigned int next_id = 0, mds_con = 0;
2103
2104         if (!set) {
2105                 no_set = 1;
2106                 set = ptlrpc_prep_set();
2107                 if (!set)
2108                         return -ENOMEM;
2109         }
2110
2111         obd_getref(obddev);
2112         count = lov->desc.ld_tgt_count;
2113
2114         if (KEY_IS(KEY_NEXT_ID)) {
2115                 count = vallen / sizeof(struct obd_id_info);
2116                 vallen = sizeof(u64);
2117                 incr = sizeof(struct obd_id_info);
2118                 do_inactive = 1;
2119                 next_id = 1;
2120         } else if (KEY_IS(KEY_CHECKSUM)) {
2121                 do_inactive = 1;
2122         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2123                 /* use defaults:  do_inactive = incr = 0; */
2124         } else if (KEY_IS(KEY_MDS_CONN)) {
2125                 mds_con = 1;
2126         } else if (KEY_IS(KEY_CACHE_SET)) {
2127                 LASSERT(!lov->lov_cache);
2128                 lov->lov_cache = val;
2129                 do_inactive = 1;
2130                 cl_cache_incref(lov->lov_cache);
2131         }
2132
2133         for (i = 0; i < count; i++, val = (char *)val + incr) {
2134                 if (next_id)
2135                         tgt = lov->lov_tgts[((struct obd_id_info *)val)->idx];
2136                 else
2137                         tgt = lov->lov_tgts[i];
2138                 /* OST was disconnected */
2139                 if (!tgt || !tgt->ltd_exp)
2140                         continue;
2141
2142                 /* OST is inactive and we don't want inactive OSCs */
2143                 if (!tgt->ltd_active && !do_inactive)
2144                         continue;
2145
2146                 if (mds_con) {
2147                         struct mds_group_info *mgi;
2148
2149                         LASSERT(vallen == sizeof(*mgi));
2150                         mgi = (struct mds_group_info *)val;
2151
2152                         /* Only want a specific OSC */
2153                         if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
2154                                                           &tgt->ltd_uuid))
2155                                 continue;
2156
2157                         err = obd_set_info_async(env, tgt->ltd_exp,
2158                                                  keylen, key, sizeof(int),
2159                                                  &mgi->group, set);
2160                 } else if (next_id) {
2161                         err = obd_set_info_async(env, tgt->ltd_exp,
2162                                          keylen, key, vallen,
2163                                          ((struct obd_id_info *)val)->data, set);
2164                 } else {
2165                         /* Only want a specific OSC */
2166                         if (check_uuid &&
2167                             !obd_uuid_equals(val, &tgt->ltd_uuid))
2168                                 continue;
2169
2170                         err = obd_set_info_async(env, tgt->ltd_exp,
2171                                                  keylen, key, vallen, val, set);
2172                 }
2173
2174                 if (!rc)
2175                         rc = err;
2176         }
2177
2178         obd_putref(obddev);
2179         if (no_set) {
2180                 err = ptlrpc_set_wait(set);
2181                 if (!rc)
2182                         rc = err;
2183                 ptlrpc_set_destroy(set);
2184         }
2185         return rc;
2186 }
2187
2188 void lov_stripe_lock(struct lov_stripe_md *md)
2189                 __acquires(&md->lsm_lock)
2190 {
2191         LASSERT(md->lsm_lock_owner != current_pid());
2192         spin_lock(&md->lsm_lock);
2193         LASSERT(md->lsm_lock_owner == 0);
2194         md->lsm_lock_owner = current_pid();
2195 }
2196
2197 void lov_stripe_unlock(struct lov_stripe_md *md)
2198                 __releases(&md->lsm_lock)
2199 {
2200         LASSERT(md->lsm_lock_owner == current_pid());
2201         md->lsm_lock_owner = 0;
2202         spin_unlock(&md->lsm_lock);
2203 }
2204
2205 static int lov_quotactl(struct obd_device *obd, struct obd_export *exp,
2206                         struct obd_quotactl *oqctl)
2207 {
2208         struct lov_obd      *lov = &obd->u.lov;
2209         struct lov_tgt_desc *tgt;
2210         __u64           curspace = 0;
2211         __u64           bhardlimit = 0;
2212         int               i, rc = 0;
2213
2214         if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON &&
2215             oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF &&
2216             oqctl->qc_cmd != Q_GETOQUOTA &&
2217             oqctl->qc_cmd != Q_INITQUOTA &&
2218             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA &&
2219             oqctl->qc_cmd != Q_FINVALIDATE) {
2220                 CERROR("bad quota opc %x for lov obd\n", oqctl->qc_cmd);
2221                 return -EFAULT;
2222         }
2223
2224         /* for lov tgt */
2225         obd_getref(obd);
2226         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2227                 int err;
2228
2229                 tgt = lov->lov_tgts[i];
2230
2231                 if (!tgt)
2232                         continue;
2233
2234                 if (!tgt->ltd_active || tgt->ltd_reap) {
2235                         if (oqctl->qc_cmd == Q_GETOQUOTA &&
2236                             lov->lov_tgts[i]->ltd_activate) {
2237                                 rc = -EREMOTEIO;
2238                                 CERROR("ost %d is inactive\n", i);
2239                         } else {
2240                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
2241                         }
2242                         continue;
2243                 }
2244
2245                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2246                 if (err) {
2247                         if (tgt->ltd_active && !rc)
2248                                 rc = err;
2249                         continue;
2250                 }
2251
2252                 if (oqctl->qc_cmd == Q_GETOQUOTA) {
2253                         curspace += oqctl->qc_dqblk.dqb_curspace;
2254                         bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
2255                 }
2256         }
2257         obd_putref(obd);
2258
2259         if (oqctl->qc_cmd == Q_GETOQUOTA) {
2260                 oqctl->qc_dqblk.dqb_curspace = curspace;
2261                 oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
2262         }
2263         return rc;
2264 }
2265
2266 static int lov_quotacheck(struct obd_device *obd, struct obd_export *exp,
2267                           struct obd_quotactl *oqctl)
2268 {
2269         struct lov_obd *lov = &obd->u.lov;
2270         int          i, rc = 0;
2271
2272         obd_getref(obd);
2273
2274         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2275                 if (!lov->lov_tgts[i])
2276                         continue;
2277
2278                 /* Skip quota check on the administratively disabled OSTs. */
2279                 if (!lov->lov_tgts[i]->ltd_activate) {
2280                         CWARN("lov idx %d was administratively disabled, skip quotacheck on it.\n",
2281                               i);
2282                         continue;
2283                 }
2284
2285                 if (!lov->lov_tgts[i]->ltd_active) {
2286                         CERROR("lov idx %d inactive\n", i);
2287                         rc = -EIO;
2288                         goto out;
2289                 }
2290         }
2291
2292         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2293                 int err;
2294
2295                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_activate)
2296                         continue;
2297
2298                 err = obd_quotacheck(lov->lov_tgts[i]->ltd_exp, oqctl);
2299                 if (err && !rc)
2300                         rc = err;
2301         }
2302
2303 out:
2304         obd_putref(obd);
2305
2306         return rc;
2307 }
2308
2309 static struct obd_ops lov_obd_ops = {
2310         .owner          = THIS_MODULE,
2311         .setup          = lov_setup,
2312         .precleanup     = lov_precleanup,
2313         .cleanup        = lov_cleanup,
2314         /*.process_config       = lov_process_config,*/
2315         .connect        = lov_connect,
2316         .disconnect     = lov_disconnect,
2317         .statfs         = lov_statfs,
2318         .statfs_async   = lov_statfs_async,
2319         .packmd         = lov_packmd,
2320         .unpackmd       = lov_unpackmd,
2321         .create         = lov_create,
2322         .destroy        = lov_destroy,
2323         .getattr_async  = lov_getattr_async,
2324         .setattr_async  = lov_setattr_async,
2325         .adjust_kms     = lov_adjust_kms,
2326         .find_cbdata    = lov_find_cbdata,
2327         .iocontrol      = lov_iocontrol,
2328         .get_info       = lov_get_info,
2329         .set_info_async = lov_set_info_async,
2330         .notify         = lov_notify,
2331         .pool_new       = lov_pool_new,
2332         .pool_rem       = lov_pool_remove,
2333         .pool_add       = lov_pool_add,
2334         .pool_del       = lov_pool_del,
2335         .getref         = lov_getref,
2336         .putref         = lov_putref,
2337         .quotactl       = lov_quotactl,
2338         .quotacheck     = lov_quotacheck,
2339 };
2340
2341 struct kmem_cache *lov_oinfo_slab;
2342
2343 static int __init lov_init(void)
2344 {
2345         struct lprocfs_static_vars lvars = { NULL };
2346         int rc;
2347
2348         /* print an address of _any_ initialized kernel symbol from this
2349          * module, to allow debugging with gdb that doesn't support data
2350          * symbols from modules.
2351          */
2352         CDEBUG(D_INFO, "Lustre LOV module (%p).\n", &lov_caches);
2353
2354         rc = lu_kmem_init(lov_caches);
2355         if (rc)
2356                 return rc;
2357
2358         lov_oinfo_slab = kmem_cache_create("lov_oinfo",
2359                                            sizeof(struct lov_oinfo),
2360                                            0, SLAB_HWCACHE_ALIGN, NULL);
2361         if (!lov_oinfo_slab) {
2362                 lu_kmem_fini(lov_caches);
2363                 return -ENOMEM;
2364         }
2365         lprocfs_lov_init_vars(&lvars);
2366
2367         rc = class_register_type(&lov_obd_ops, NULL,
2368                                  LUSTRE_LOV_NAME, &lov_device_type);
2369
2370         if (rc) {
2371                 kmem_cache_destroy(lov_oinfo_slab);
2372                 lu_kmem_fini(lov_caches);
2373         }
2374
2375         return rc;
2376 }
2377
2378 static void /*__exit*/ lov_exit(void)
2379 {
2380         class_unregister_type(LUSTRE_LOV_NAME);
2381         kmem_cache_destroy(lov_oinfo_slab);
2382
2383         lu_kmem_fini(lov_caches);
2384 }
2385
2386 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2387 MODULE_DESCRIPTION("Lustre Logical Object Volume");
2388 MODULE_LICENSE("GPL");
2389 MODULE_VERSION(LUSTRE_VERSION_STRING);
2390
2391 module_init(lov_init);
2392 module_exit(lov_exit);