Merge branch 'linus' of git://git.kernel.org/pub/scm/linux/kernel/git/herbert/crypto-2.6
[cascardo/linux.git] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_LOV
34
35 #include "../../include/linux/libcfs/libcfs.h"
36
37 #include "../include/obd_class.h"
38 #include "../include/lustre/lustre_idl.h"
39 #include "lov_internal.h"
40
41 static void lov_init_set(struct lov_request_set *set)
42 {
43         set->set_count = 0;
44         atomic_set(&set->set_completes, 0);
45         atomic_set(&set->set_success, 0);
46         atomic_set(&set->set_finish_checked, 0);
47         set->set_cookies = NULL;
48         INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50         init_waitqueue_head(&set->set_waitq);
51 }
52
53 void lov_finish_set(struct lov_request_set *set)
54 {
55         struct list_head *pos, *n;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos,
60                                                          struct lov_request,
61                                                          rq_link);
62                 list_del_init(&req->rq_link);
63
64                 if (req->rq_oi.oi_oa)
65                         kmem_cache_free(obdo_cachep, req->rq_oi.oi_oa);
66                 kfree(req->rq_oi.oi_osfs);
67                 kfree(req);
68         }
69         kfree(set);
70 }
71
72 static int lov_set_finished(struct lov_request_set *set, int idempotent)
73 {
74         int completes = atomic_read(&set->set_completes);
75
76         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
77
78         if (completes == set->set_count) {
79                 if (idempotent)
80                         return 1;
81                 if (atomic_inc_return(&set->set_finish_checked) == 1)
82                         return 1;
83         }
84         return 0;
85 }
86
87 static void lov_update_set(struct lov_request_set *set,
88                            struct lov_request *req, int rc)
89 {
90         req->rq_complete = 1;
91         req->rq_rc = rc;
92
93         atomic_inc(&set->set_completes);
94         if (rc == 0)
95                 atomic_inc(&set->set_success);
96
97         wake_up(&set->set_waitq);
98 }
99
100 int lov_update_common_set(struct lov_request_set *set,
101                           struct lov_request *req, int rc)
102 {
103         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
104
105         lov_update_set(set, req, rc);
106
107         /* grace error on inactive ost */
108         if (rc && !(lov->lov_tgts[req->rq_idx] &&
109                     lov->lov_tgts[req->rq_idx]->ltd_active))
110                 rc = 0;
111
112         /* FIXME in raid1 regime, should return 0 */
113         return rc;
114 }
115
116 static void lov_set_add_req(struct lov_request *req,
117                             struct lov_request_set *set)
118 {
119         list_add_tail(&req->rq_link, &set->set_list);
120         set->set_count++;
121         req->rq_rqset = set;
122 }
123
124 static int lov_check_set(struct lov_obd *lov, int idx)
125 {
126         int rc;
127         struct lov_tgt_desc *tgt;
128
129         mutex_lock(&lov->lov_lock);
130         tgt = lov->lov_tgts[idx];
131         rc = !tgt || tgt->ltd_active ||
132                 (tgt->ltd_exp &&
133                  class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
134         mutex_unlock(&lov->lov_lock);
135
136         return rc;
137 }
138
139 /* Check if the OSC connection exists and is active.
140  * If the OSC has not yet had a chance to connect to the OST the first time,
141  * wait once for it to connect instead of returning an error.
142  */
143 static int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
144 {
145         wait_queue_head_t waitq;
146         struct l_wait_info lwi;
147         struct lov_tgt_desc *tgt;
148         int rc = 0;
149
150         mutex_lock(&lov->lov_lock);
151
152         tgt = lov->lov_tgts[ost_idx];
153
154         if (unlikely(!tgt)) {
155                 rc = 0;
156                 goto out;
157         }
158
159         if (likely(tgt->ltd_active)) {
160                 rc = 1;
161                 goto out;
162         }
163
164         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
165                 rc = 0;
166                 goto out;
167         }
168
169         mutex_unlock(&lov->lov_lock);
170
171         init_waitqueue_head(&waitq);
172         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
173                                    cfs_time_seconds(1), NULL, NULL);
174
175         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
176         if (tgt->ltd_active)
177                 return 1;
178
179         return 0;
180
181 out:
182         mutex_unlock(&lov->lov_lock);
183         return rc;
184 }
185
186 static int common_attr_done(struct lov_request_set *set)
187 {
188         struct lov_request *req;
189         struct obdo *tmp_oa;
190         int rc = 0, attrset = 0;
191
192         if (!set->set_oi->oi_oa)
193                 return 0;
194
195         if (!atomic_read(&set->set_success))
196                 return -EIO;
197
198         tmp_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
199         if (!tmp_oa) {
200                 rc = -ENOMEM;
201                 goto out;
202         }
203
204         list_for_each_entry(req, &set->set_list, rq_link) {
205                 if (!req->rq_complete || req->rq_rc)
206                         continue;
207                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
208                         continue;
209                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
210                                 req->rq_oi.oi_oa->o_valid,
211                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
212         }
213         if (!attrset) {
214                 CERROR("No stripes had valid attrs\n");
215                 rc = -EIO;
216         }
217         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
218             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
219                 /* When we take attributes of some epoch, we require all the
220                  * ost to be active.
221                  */
222                 CERROR("Not all the stripes had valid attrs\n");
223                 rc = -EIO;
224                 goto out;
225         }
226
227         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
228         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
229 out:
230         if (tmp_oa)
231                 kmem_cache_free(obdo_cachep, tmp_oa);
232         return rc;
233 }
234
235 int lov_fini_getattr_set(struct lov_request_set *set)
236 {
237         int rc = 0;
238
239         if (!set)
240                 return 0;
241         LASSERT(set->set_exp);
242         if (atomic_read(&set->set_completes))
243                 rc = common_attr_done(set);
244
245         lov_put_reqset(set);
246
247         return rc;
248 }
249
250 /* The callback for osc_getattr_async that finalizes a request info when a
251  * response is received.
252  */
253 static int cb_getattr_update(void *cookie, int rc)
254 {
255         struct obd_info *oinfo = cookie;
256         struct lov_request *lovreq;
257
258         lovreq = container_of(oinfo, struct lov_request, rq_oi);
259         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
260 }
261
262 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
263                          struct lov_request_set **reqset)
264 {
265         struct lov_request_set *set;
266         struct lov_obd *lov = &exp->exp_obd->u.lov;
267         int rc = 0, i;
268
269         set = kzalloc(sizeof(*set), GFP_NOFS);
270         if (!set)
271                 return -ENOMEM;
272         lov_init_set(set);
273
274         set->set_exp = exp;
275         set->set_oi = oinfo;
276
277         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
278                 struct lov_oinfo *loi;
279                 struct lov_request *req;
280
281                 loi = oinfo->oi_md->lsm_oinfo[i];
282                 if (lov_oinfo_is_dummy(loi))
283                         continue;
284
285                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
286                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
287                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
288                                 /* SOM requires all the OSTs to be active. */
289                                 rc = -EIO;
290                                 goto out_set;
291                         }
292                         continue;
293                 }
294
295                 req = kzalloc(sizeof(*req), GFP_NOFS);
296                 if (!req) {
297                         rc = -ENOMEM;
298                         goto out_set;
299                 }
300
301                 req->rq_stripe = i;
302                 req->rq_idx = loi->loi_ost_idx;
303
304                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
305                 if (!req->rq_oi.oi_oa) {
306                         kfree(req);
307                         rc = -ENOMEM;
308                         goto out_set;
309                 }
310                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
311                        sizeof(*req->rq_oi.oi_oa));
312                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
313                 req->rq_oi.oi_cb_up = cb_getattr_update;
314
315                 lov_set_add_req(req, set);
316         }
317         if (!set->set_count) {
318                 rc = -EIO;
319                 goto out_set;
320         }
321         *reqset = set;
322         return rc;
323 out_set:
324         lov_fini_getattr_set(set);
325         return rc;
326 }
327
328 int lov_fini_destroy_set(struct lov_request_set *set)
329 {
330         if (!set)
331                 return 0;
332         LASSERT(set->set_exp);
333         if (atomic_read(&set->set_completes)) {
334                 /* FIXME update qos data here */
335         }
336
337         lov_put_reqset(set);
338
339         return 0;
340 }
341
342 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
343                          struct obdo *src_oa, struct lov_stripe_md *lsm,
344                          struct obd_trans_info *oti,
345                          struct lov_request_set **reqset)
346 {
347         struct lov_request_set *set;
348         struct lov_obd *lov = &exp->exp_obd->u.lov;
349         int rc = 0, i;
350
351         set = kzalloc(sizeof(*set), GFP_NOFS);
352         if (!set)
353                 return -ENOMEM;
354         lov_init_set(set);
355
356         set->set_exp = exp;
357         set->set_oi = oinfo;
358         set->set_oi->oi_md = lsm;
359         set->set_oi->oi_oa = src_oa;
360         if (oti && src_oa->o_valid & OBD_MD_FLCOOKIE)
361                 set->set_cookies = oti->oti_logcookies;
362
363         for (i = 0; i < lsm->lsm_stripe_count; i++) {
364                 struct lov_oinfo *loi;
365                 struct lov_request *req;
366
367                 loi = lsm->lsm_oinfo[i];
368                 if (lov_oinfo_is_dummy(loi))
369                         continue;
370
371                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
372                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
373                         continue;
374                 }
375
376                 req = kzalloc(sizeof(*req), GFP_NOFS);
377                 if (!req) {
378                         rc = -ENOMEM;
379                         goto out_set;
380                 }
381
382                 req->rq_stripe = i;
383                 req->rq_idx = loi->loi_ost_idx;
384
385                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
386                 if (!req->rq_oi.oi_oa) {
387                         kfree(req);
388                         rc = -ENOMEM;
389                         goto out_set;
390                 }
391                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
392                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
393                 lov_set_add_req(req, set);
394         }
395         if (!set->set_count) {
396                 rc = -EIO;
397                 goto out_set;
398         }
399         *reqset = set;
400         return rc;
401 out_set:
402         lov_fini_destroy_set(set);
403         return rc;
404 }
405
406 int lov_fini_setattr_set(struct lov_request_set *set)
407 {
408         int rc = 0;
409
410         if (!set)
411                 return 0;
412         LASSERT(set->set_exp);
413         if (atomic_read(&set->set_completes)) {
414                 rc = common_attr_done(set);
415                 /* FIXME update qos data here */
416         }
417
418         lov_put_reqset(set);
419         return rc;
420 }
421
422 int lov_update_setattr_set(struct lov_request_set *set,
423                            struct lov_request *req, int rc)
424 {
425         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
426         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
427
428         lov_update_set(set, req, rc);
429
430         /* grace error on inactive ost */
431         if (rc && !(lov->lov_tgts[req->rq_idx] &&
432                     lov->lov_tgts[req->rq_idx]->ltd_active))
433                 rc = 0;
434
435         if (rc == 0) {
436                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
437                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
438                                 req->rq_oi.oi_oa->o_ctime;
439                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
440                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
441                                 req->rq_oi.oi_oa->o_mtime;
442                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
443                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
444                                 req->rq_oi.oi_oa->o_atime;
445         }
446
447         return rc;
448 }
449
450 /* The callback for osc_setattr_async that finalizes a request info when a
451  * response is received.
452  */
453 static int cb_setattr_update(void *cookie, int rc)
454 {
455         struct obd_info *oinfo = cookie;
456         struct lov_request *lovreq;
457
458         lovreq = container_of(oinfo, struct lov_request, rq_oi);
459         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
460 }
461
462 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
463                          struct obd_trans_info *oti,
464                          struct lov_request_set **reqset)
465 {
466         struct lov_request_set *set;
467         struct lov_obd *lov = &exp->exp_obd->u.lov;
468         int rc = 0, i;
469
470         set = kzalloc(sizeof(*set), GFP_NOFS);
471         if (!set)
472                 return -ENOMEM;
473         lov_init_set(set);
474
475         set->set_exp = exp;
476         set->set_oi = oinfo;
477         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
478                 set->set_cookies = oti->oti_logcookies;
479
480         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
481                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
482                 struct lov_request *req;
483
484                 if (lov_oinfo_is_dummy(loi))
485                         continue;
486
487                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
488                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
489                         continue;
490                 }
491
492                 req = kzalloc(sizeof(*req), GFP_NOFS);
493                 if (!req) {
494                         rc = -ENOMEM;
495                         goto out_set;
496                 }
497                 req->rq_stripe = i;
498                 req->rq_idx = loi->loi_ost_idx;
499
500                 req->rq_oi.oi_oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
501                 if (!req->rq_oi.oi_oa) {
502                         kfree(req);
503                         rc = -ENOMEM;
504                         goto out_set;
505                 }
506                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
507                        sizeof(*req->rq_oi.oi_oa));
508                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
509                 req->rq_oi.oi_oa->o_stripe_idx = i;
510                 req->rq_oi.oi_cb_up = cb_setattr_update;
511
512                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
513                         int off = lov_stripe_offset(oinfo->oi_md,
514                                                     oinfo->oi_oa->o_size, i,
515                                                     &req->rq_oi.oi_oa->o_size);
516
517                         if (off < 0 && req->rq_oi.oi_oa->o_size)
518                                 req->rq_oi.oi_oa->o_size--;
519
520                         CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
521                                i, req->rq_oi.oi_oa->o_size,
522                                oinfo->oi_oa->o_size);
523                 }
524                 lov_set_add_req(req, set);
525         }
526         if (!set->set_count) {
527                 rc = -EIO;
528                 goto out_set;
529         }
530         *reqset = set;
531         return rc;
532 out_set:
533         lov_fini_setattr_set(set);
534         return rc;
535 }
536
537 #define LOV_U64_MAX ((__u64)~0ULL)
538 #define LOV_SUM_MAX(tot, add)                                      \
539         do {                                                        \
540                 if ((tot) + (add) < (tot))                            \
541                         (tot) = LOV_U64_MAX;                        \
542                 else                                                \
543                         (tot) += (add);                          \
544         } while (0)
545
546 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
547                     int success)
548 {
549         if (success) {
550                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
551                                                            LOV_MAGIC, 0);
552                 if (osfs->os_files != LOV_U64_MAX)
553                         lov_do_div64(osfs->os_files, expected_stripes);
554                 if (osfs->os_ffree != LOV_U64_MAX)
555                         lov_do_div64(osfs->os_ffree, expected_stripes);
556
557                 spin_lock(&obd->obd_osfs_lock);
558                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
559                 obd->obd_osfs_age = cfs_time_current_64();
560                 spin_unlock(&obd->obd_osfs_lock);
561                 return 0;
562         }
563
564         return -EIO;
565 }
566
567 int lov_fini_statfs_set(struct lov_request_set *set)
568 {
569         int rc = 0;
570
571         if (!set)
572                 return 0;
573
574         if (atomic_read(&set->set_completes)) {
575                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
576                                      atomic_read(&set->set_success));
577         }
578         lov_put_reqset(set);
579         return rc;
580 }
581
582 static void lov_update_statfs(struct obd_statfs *osfs,
583                               struct obd_statfs *lov_sfs,
584                               int success)
585 {
586         int shift = 0, quit = 0;
587         __u64 tmp;
588
589         if (success == 0) {
590                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
591         } else {
592                 if (osfs->os_bsize != lov_sfs->os_bsize) {
593                         /* assume all block sizes are always powers of 2 */
594                         /* get the bits difference */
595                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
596                         for (shift = 0; shift <= 64; ++shift) {
597                                 if (tmp & 1) {
598                                         if (quit)
599                                                 break;
600                                         quit = 1;
601                                         shift = 0;
602                                 }
603                                 tmp >>= 1;
604                         }
605                 }
606
607                 if (osfs->os_bsize < lov_sfs->os_bsize) {
608                         osfs->os_bsize = lov_sfs->os_bsize;
609
610                         osfs->os_bfree  >>= shift;
611                         osfs->os_bavail >>= shift;
612                         osfs->os_blocks >>= shift;
613                 } else if (shift != 0) {
614                         lov_sfs->os_bfree  >>= shift;
615                         lov_sfs->os_bavail >>= shift;
616                         lov_sfs->os_blocks >>= shift;
617                 }
618                 osfs->os_bfree += lov_sfs->os_bfree;
619                 osfs->os_bavail += lov_sfs->os_bavail;
620                 osfs->os_blocks += lov_sfs->os_blocks;
621                 /* XXX not sure about this one - depends on policy.
622                  *   - could be minimum if we always stripe on all OBDs
623                  *     (but that would be wrong for any other policy,
624                  *     if one of the OBDs has no more objects left)
625                  *   - could be sum if we stripe whole objects
626                  *   - could be average, just to give a nice number
627                  *
628                  * To give a "reasonable" (if not wholly accurate)
629                  * number, we divide the total number of free objects
630                  * by expected stripe count (watch out for overflow).
631                  */
632                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
633                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
634         }
635 }
636
637 /* The callback for osc_statfs_async that finalizes a request info when a
638  * response is received.
639  */
640 static int cb_statfs_update(void *cookie, int rc)
641 {
642         struct obd_info *oinfo = cookie;
643         struct lov_request *lovreq;
644         struct lov_request_set *set;
645         struct obd_statfs *osfs, *lov_sfs;
646         struct lov_obd *lov;
647         struct lov_tgt_desc *tgt;
648         struct obd_device *lovobd, *tgtobd;
649         int success;
650
651         lovreq = container_of(oinfo, struct lov_request, rq_oi);
652         set = lovreq->rq_rqset;
653         lovobd = set->set_obd;
654         lov = &lovobd->u.lov;
655         osfs = set->set_oi->oi_osfs;
656         lov_sfs = oinfo->oi_osfs;
657         success = atomic_read(&set->set_success);
658         /* XXX: the same is done in lov_update_common_set, however
659          * lovset->set_exp is not initialized.
660          */
661         lov_update_set(set, lovreq, rc);
662         if (rc)
663                 goto out;
664
665         obd_getref(lovobd);
666         tgt = lov->lov_tgts[lovreq->rq_idx];
667         if (!tgt || !tgt->ltd_active)
668                 goto out_update;
669
670         tgtobd = class_exp2obd(tgt->ltd_exp);
671         spin_lock(&tgtobd->obd_osfs_lock);
672         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
673         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
674                 tgtobd->obd_osfs_age = cfs_time_current_64();
675         spin_unlock(&tgtobd->obd_osfs_lock);
676
677 out_update:
678         lov_update_statfs(osfs, lov_sfs, success);
679         obd_putref(lovobd);
680
681 out:
682         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
683             lov_set_finished(set, 0)) {
684                 lov_statfs_interpret(NULL, set, set->set_count !=
685                                      atomic_read(&set->set_success));
686         }
687
688         return 0;
689 }
690
691 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
692                         struct lov_request_set **reqset)
693 {
694         struct lov_request_set *set;
695         struct lov_obd *lov = &obd->u.lov;
696         int rc = 0, i;
697
698         set = kzalloc(sizeof(*set), GFP_NOFS);
699         if (!set)
700                 return -ENOMEM;
701         lov_init_set(set);
702
703         set->set_obd = obd;
704         set->set_oi = oinfo;
705
706         /* We only get block data from the OBD */
707         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
708                 struct lov_request *req;
709
710                 if (!lov->lov_tgts[i] ||
711                     (oinfo->oi_flags & OBD_STATFS_NODELAY &&
712                      !lov->lov_tgts[i]->ltd_active)) {
713                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
714                         continue;
715                 }
716
717                 if (!lov->lov_tgts[i]->ltd_active)
718                         lov_check_and_wait_active(lov, i);
719
720                 /* skip targets that have been explicitly disabled by the
721                  * administrator
722                  */
723                 if (!lov->lov_tgts[i]->ltd_exp) {
724                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
725                         continue;
726                 }
727
728                 req = kzalloc(sizeof(*req), GFP_NOFS);
729                 if (!req) {
730                         rc = -ENOMEM;
731                         goto out_set;
732                 }
733
734                 req->rq_oi.oi_osfs = kzalloc(sizeof(*req->rq_oi.oi_osfs),
735                                              GFP_NOFS);
736                 if (!req->rq_oi.oi_osfs) {
737                         kfree(req);
738                         rc = -ENOMEM;
739                         goto out_set;
740                 }
741
742                 req->rq_idx = i;
743                 req->rq_oi.oi_cb_up = cb_statfs_update;
744                 req->rq_oi.oi_flags = oinfo->oi_flags;
745
746                 lov_set_add_req(req, set);
747         }
748         if (!set->set_count) {
749                 rc = -EIO;
750                 goto out_set;
751         }
752         *reqset = set;
753         return rc;
754 out_set:
755         lov_fini_statfs_set(set);
756         return rc;
757 }