bd6490d0129cc7f5a56bba734f7d496e67ab4a30
[cascardo/linux.git] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #include <linux/libcfs/libcfs.h>
40
41 #include <obd_class.h>
42 #include <lustre/lustre_idl.h>
43
44 #include "lov_internal.h"
45
46 static void lov_init_set(struct lov_request_set *set)
47 {
48         set->set_count = 0;
49         atomic_set(&set->set_completes, 0);
50         atomic_set(&set->set_success, 0);
51         atomic_set(&set->set_finish_checked, 0);
52         set->set_cookies = NULL;
53         INIT_LIST_HEAD(&set->set_list);
54         atomic_set(&set->set_refcount, 1);
55         init_waitqueue_head(&set->set_waitq);
56         spin_lock_init(&set->set_lock);
57 }
58
59 void lov_finish_set(struct lov_request_set *set)
60 {
61         struct list_head *pos, *n;
62
63         LASSERT(set);
64         list_for_each_safe(pos, n, &set->set_list) {
65                 struct lov_request *req = list_entry(pos,
66                                                          struct lov_request,
67                                                          rq_link);
68                 list_del_init(&req->rq_link);
69
70                 if (req->rq_oi.oi_oa)
71                         OBDO_FREE(req->rq_oi.oi_oa);
72                 if (req->rq_oi.oi_md)
73                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
74                 if (req->rq_oi.oi_osfs)
75                         OBD_FREE(req->rq_oi.oi_osfs,
76                                  sizeof(*req->rq_oi.oi_osfs));
77                 OBD_FREE(req, sizeof(*req));
78         }
79
80         if (set->set_pga) {
81                 int len = set->set_oabufs * sizeof(*set->set_pga);
82                 OBD_FREE_LARGE(set->set_pga, len);
83         }
84         if (set->set_lockh)
85                 lov_llh_put(set->set_lockh);
86
87         OBD_FREE(set, sizeof(*set));
88 }
89
90 int lov_set_finished(struct lov_request_set *set, int idempotent)
91 {
92         int completes = atomic_read(&set->set_completes);
93
94         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
95
96         if (completes == set->set_count) {
97                 if (idempotent)
98                         return 1;
99                 if (atomic_inc_return(&set->set_finish_checked) == 1)
100                         return 1;
101         }
102         return 0;
103 }
104
105 void lov_update_set(struct lov_request_set *set,
106                     struct lov_request *req, int rc)
107 {
108         req->rq_complete = 1;
109         req->rq_rc = rc;
110
111         atomic_inc(&set->set_completes);
112         if (rc == 0)
113                 atomic_inc(&set->set_success);
114
115         wake_up(&set->set_waitq);
116 }
117
118 int lov_update_common_set(struct lov_request_set *set,
119                           struct lov_request *req, int rc)
120 {
121         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
122
123         lov_update_set(set, req, rc);
124
125         /* grace error on inactive ost */
126         if (rc && !(lov->lov_tgts[req->rq_idx] &&
127                     lov->lov_tgts[req->rq_idx]->ltd_active))
128                 rc = 0;
129
130         /* FIXME in raid1 regime, should return 0 */
131         return rc;
132 }
133
134 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
135 {
136         list_add_tail(&req->rq_link, &set->set_list);
137         set->set_count++;
138         req->rq_rqset = set;
139 }
140
141 static int lov_check_set(struct lov_obd *lov, int idx)
142 {
143         int rc = 0;
144         mutex_lock(&lov->lov_lock);
145
146         if (lov->lov_tgts[idx] == NULL ||
147             lov->lov_tgts[idx]->ltd_active ||
148             (lov->lov_tgts[idx]->ltd_exp != NULL &&
149              class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
150                 rc = 1;
151
152         mutex_unlock(&lov->lov_lock);
153         return rc;
154 }
155
156 /* Check if the OSC connection exists and is active.
157  * If the OSC has not yet had a chance to connect to the OST the first time,
158  * wait once for it to connect instead of returning an error.
159  */
160 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
161 {
162         wait_queue_head_t waitq;
163         struct l_wait_info lwi;
164         struct lov_tgt_desc *tgt;
165         int rc = 0;
166
167         mutex_lock(&lov->lov_lock);
168
169         tgt = lov->lov_tgts[ost_idx];
170
171         if (unlikely(tgt == NULL))
172                 GOTO(out, rc = 0);
173
174         if (likely(tgt->ltd_active))
175                 GOTO(out, rc = 1);
176
177         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
178                 GOTO(out, rc = 0);
179
180         mutex_unlock(&lov->lov_lock);
181
182         init_waitqueue_head(&waitq);
183         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
184                                    cfs_time_seconds(1), NULL, NULL);
185
186         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
187         if (tgt != NULL && tgt->ltd_active)
188                 return 1;
189
190         return 0;
191
192 out:
193         mutex_unlock(&lov->lov_lock);
194         return rc;
195 }
196
197 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
198                                struct lov_oinfo *loi, int flags,
199                                struct ost_lvb *lvb, __u32 mode, int rc);
200
201 static int lov_update_enqueue_lov(struct obd_export *exp,
202                                   struct lustre_handle *lov_lockhp,
203                                   struct lov_oinfo *loi, int flags, int idx,
204                                   struct ost_id *oi, int rc)
205 {
206         struct lov_obd *lov = &exp->exp_obd->u.lov;
207
208         if (rc != ELDLM_OK &&
209             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
210                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
211                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
212                         /* -EUSERS used by OST to report file contention */
213                         if (rc != -EINTR && rc != -EUSERS)
214                                 CERROR("%s: enqueue objid "DOSTID" subobj"
215                                        DOSTID" on OST idx %d: rc %d\n",
216                                        exp->exp_obd->obd_name,
217                                        POSTID(oi), POSTID(&loi->loi_oi),
218                                        loi->loi_ost_idx, rc);
219                 } else
220                         rc = ELDLM_OK;
221         }
222         return rc;
223 }
224
225 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
226 {
227         struct lov_request_set *set = req->rq_rqset;
228         struct lustre_handle *lov_lockhp;
229         struct obd_info *oi = set->set_oi;
230         struct lov_oinfo *loi;
231
232         LASSERT(oi != NULL);
233
234         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
235         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
236
237         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
238          * and that copy can be arbitrarily out of date.
239          *
240          * The LOV API is due for a serious rewriting anyways, and this
241          * can be addressed then. */
242
243         lov_stripe_lock(oi->oi_md);
244         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
245                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
246         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
247                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
248         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
249                                     req->rq_idx, &oi->oi_md->lsm_oi, rc);
250         lov_stripe_unlock(oi->oi_md);
251         lov_update_set(set, req, rc);
252         return rc;
253 }
254
255 /* The callback for osc_enqueue that updates lov info for every OSC request. */
256 static int cb_update_enqueue(void *cookie, int rc)
257 {
258         struct obd_info *oinfo = cookie;
259         struct ldlm_enqueue_info *einfo;
260         struct lov_request *lovreq;
261
262         lovreq = container_of(oinfo, struct lov_request, rq_oi);
263         einfo = lovreq->rq_rqset->set_ei;
264         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
265 }
266
267 static int enqueue_done(struct lov_request_set *set, __u32 mode)
268 {
269         struct lov_request *req;
270         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
271         int completes = atomic_read(&set->set_completes);
272         int rc = 0;
273
274         /* enqueue/match success, just return */
275         if (completes && completes == atomic_read(&set->set_success))
276                 return 0;
277
278         /* cancel enqueued/matched locks */
279         list_for_each_entry(req, &set->set_list, rq_link) {
280                 struct lustre_handle *lov_lockhp;
281
282                 if (!req->rq_complete || req->rq_rc)
283                         continue;
284
285                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
286                 LASSERT(lov_lockhp);
287                 if (!lustre_handle_is_used(lov_lockhp))
288                         continue;
289
290                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
291                                 req->rq_oi.oi_md, mode, lov_lockhp);
292                 if (rc && lov->lov_tgts[req->rq_idx] &&
293                     lov->lov_tgts[req->rq_idx]->ltd_active)
294                         CERROR("%s: cancelling obdjid "DOSTID" on OST"
295                                "idx %d error: rc = %d\n",
296                                set->set_exp->exp_obd->obd_name,
297                                POSTID(&req->rq_oi.oi_md->lsm_oi),
298                                req->rq_idx, rc);
299         }
300         if (set->set_lockh)
301                 lov_llh_put(set->set_lockh);
302         return rc;
303 }
304
305 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
306                          struct ptlrpc_request_set *rqset)
307 {
308         int ret = 0;
309
310         if (set == NULL)
311                 return 0;
312         LASSERT(set->set_exp);
313         /* Do enqueue_done only for sync requests and if any request
314          * succeeded. */
315         if (!rqset) {
316                 if (rc)
317                         atomic_set(&set->set_completes, 0);
318                 ret = enqueue_done(set, mode);
319         } else if (set->set_lockh)
320                 lov_llh_put(set->set_lockh);
321
322         lov_put_reqset(set);
323
324         return rc ? rc : ret;
325 }
326
327 static void lov_llh_addref(void *llhp)
328 {
329         struct lov_lock_handles *llh = llhp;
330
331         atomic_inc(&llh->llh_refcount);
332         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
333                atomic_read(&llh->llh_refcount));
334 }
335
336 static struct portals_handle_ops lov_handle_ops = {
337         .hop_addref = lov_llh_addref,
338         .hop_free   = NULL,
339 };
340
341 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
342 {
343         struct lov_lock_handles *llh;
344
345         OBD_ALLOC(llh, sizeof(*llh) +
346                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
347         if (llh == NULL)
348                 return NULL;
349
350         atomic_set(&llh->llh_refcount, 2);
351         llh->llh_stripe_count = lsm->lsm_stripe_count;
352         INIT_LIST_HEAD(&llh->llh_handle.h_link);
353         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
354
355         return llh;
356 }
357
358 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
359                          struct ldlm_enqueue_info *einfo,
360                          struct lov_request_set **reqset)
361 {
362         struct lov_obd *lov = &exp->exp_obd->u.lov;
363         struct lov_request_set *set;
364         int i, rc = 0;
365
366         OBD_ALLOC(set, sizeof(*set));
367         if (set == NULL)
368                 return -ENOMEM;
369         lov_init_set(set);
370
371         set->set_exp = exp;
372         set->set_oi = oinfo;
373         set->set_ei = einfo;
374         set->set_lockh = lov_llh_new(oinfo->oi_md);
375         if (set->set_lockh == NULL)
376                 GOTO(out_set, rc = -ENOMEM);
377         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
378
379         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
380                 struct lov_oinfo *loi;
381                 struct lov_request *req;
382                 obd_off start, end;
383
384                 loi = oinfo->oi_md->lsm_oinfo[i];
385                 if (!lov_stripe_intersects(oinfo->oi_md, i,
386                                            oinfo->oi_policy.l_extent.start,
387                                            oinfo->oi_policy.l_extent.end,
388                                            &start, &end))
389                         continue;
390
391                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
392                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
393                         continue;
394                 }
395
396                 OBD_ALLOC(req, sizeof(*req));
397                 if (req == NULL)
398                         GOTO(out_set, rc = -ENOMEM);
399
400                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
401                         sizeof(struct lov_oinfo *) +
402                         sizeof(struct lov_oinfo);
403                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
404                 if (req->rq_oi.oi_md == NULL) {
405                         OBD_FREE(req, sizeof(*req));
406                         GOTO(out_set, rc = -ENOMEM);
407                 }
408                 req->rq_oi.oi_md->lsm_oinfo[0] =
409                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
410                         sizeof(struct lov_oinfo *);
411
412                 /* Set lov request specific parameters. */
413                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
414                 req->rq_oi.oi_cb_up = cb_update_enqueue;
415                 req->rq_oi.oi_flags = oinfo->oi_flags;
416
417                 LASSERT(req->rq_oi.oi_lockh);
418
419                 req->rq_oi.oi_policy.l_extent.gid =
420                         oinfo->oi_policy.l_extent.gid;
421                 req->rq_oi.oi_policy.l_extent.start = start;
422                 req->rq_oi.oi_policy.l_extent.end = end;
423
424                 req->rq_idx = loi->loi_ost_idx;
425                 req->rq_stripe = i;
426
427                 /* XXX LOV STACKING: submd should be from the subobj */
428                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
429                 req->rq_oi.oi_md->lsm_stripe_count = 0;
430                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
431                         loi->loi_kms_valid;
432                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
433                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
434
435                 lov_set_add_req(req, set);
436         }
437         if (!set->set_count)
438                 GOTO(out_set, rc = -EIO);
439         *reqset = set;
440         return 0;
441 out_set:
442         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
443         return rc;
444 }
445
446 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
447 {
448         int rc = 0;
449
450         if (set == NULL)
451                 return 0;
452         LASSERT(set->set_exp);
453         rc = enqueue_done(set, mode);
454         if ((set->set_count == atomic_read(&set->set_success)) &&
455             (flags & LDLM_FL_TEST_LOCK))
456                 lov_llh_put(set->set_lockh);
457
458         lov_put_reqset(set);
459
460         return rc;
461 }
462
463 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
464                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
465                        __u32 mode, struct lustre_handle *lockh,
466                        struct lov_request_set **reqset)
467 {
468         struct lov_obd *lov = &exp->exp_obd->u.lov;
469         struct lov_request_set *set;
470         int i, rc = 0;
471
472         OBD_ALLOC(set, sizeof(*set));
473         if (set == NULL)
474                 return -ENOMEM;
475         lov_init_set(set);
476
477         set->set_exp = exp;
478         set->set_oi = oinfo;
479         set->set_oi->oi_md = lsm;
480         set->set_lockh = lov_llh_new(lsm);
481         if (set->set_lockh == NULL)
482                 GOTO(out_set, rc = -ENOMEM);
483         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
484
485         for (i = 0; i < lsm->lsm_stripe_count; i++){
486                 struct lov_oinfo *loi;
487                 struct lov_request *req;
488                 obd_off start, end;
489
490                 loi = lsm->lsm_oinfo[i];
491                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
492                                            policy->l_extent.end, &start, &end))
493                         continue;
494
495                 /* FIXME raid1 should grace this error */
496                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
497                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
498                         GOTO(out_set, rc = -EIO);
499                 }
500
501                 OBD_ALLOC(req, sizeof(*req));
502                 if (req == NULL)
503                         GOTO(out_set, rc = -ENOMEM);
504
505                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
506                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
507                 if (req->rq_oi.oi_md == NULL) {
508                         OBD_FREE(req, sizeof(*req));
509                         GOTO(out_set, rc = -ENOMEM);
510                 }
511
512                 req->rq_oi.oi_policy.l_extent.start = start;
513                 req->rq_oi.oi_policy.l_extent.end = end;
514                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
515
516                 req->rq_idx = loi->loi_ost_idx;
517                 req->rq_stripe = i;
518
519                 /* XXX LOV STACKING: submd should be from the subobj */
520                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
521                 req->rq_oi.oi_md->lsm_stripe_count = 0;
522
523                 lov_set_add_req(req, set);
524         }
525         if (!set->set_count)
526                 GOTO(out_set, rc = -EIO);
527         *reqset = set;
528         return rc;
529 out_set:
530         lov_fini_match_set(set, mode, 0);
531         return rc;
532 }
533
534 int lov_fini_cancel_set(struct lov_request_set *set)
535 {
536         int rc = 0;
537
538         if (set == NULL)
539                 return 0;
540
541         LASSERT(set->set_exp);
542         if (set->set_lockh)
543                 lov_llh_put(set->set_lockh);
544
545         lov_put_reqset(set);
546
547         return rc;
548 }
549
550 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
551                         struct lov_stripe_md *lsm, __u32 mode,
552                         struct lustre_handle *lockh,
553                         struct lov_request_set **reqset)
554 {
555         struct lov_request_set *set;
556         int i, rc = 0;
557
558         OBD_ALLOC(set, sizeof(*set));
559         if (set == NULL)
560                 return -ENOMEM;
561         lov_init_set(set);
562
563         set->set_exp = exp;
564         set->set_oi = oinfo;
565         set->set_oi->oi_md = lsm;
566         set->set_lockh = lov_handle2llh(lockh);
567         if (set->set_lockh == NULL) {
568                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
569                 GOTO(out_set, rc = -EINVAL);
570         }
571         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
572
573         for (i = 0; i < lsm->lsm_stripe_count; i++){
574                 struct lov_request *req;
575                 struct lustre_handle *lov_lockhp;
576                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
577
578                 lov_lockhp = set->set_lockh->llh_handles + i;
579                 if (!lustre_handle_is_used(lov_lockhp)) {
580                         CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
581                                loi->loi_ost_idx, POSTID(&loi->loi_oi));
582                         continue;
583                 }
584
585                 OBD_ALLOC(req, sizeof(*req));
586                 if (req == NULL)
587                         GOTO(out_set, rc = -ENOMEM);
588
589                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
590                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
591                 if (req->rq_oi.oi_md == NULL) {
592                         OBD_FREE(req, sizeof(*req));
593                         GOTO(out_set, rc = -ENOMEM);
594                 }
595
596                 req->rq_idx = loi->loi_ost_idx;
597                 req->rq_stripe = i;
598
599                 /* XXX LOV STACKING: submd should be from the subobj */
600                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
601                 req->rq_oi.oi_md->lsm_stripe_count = 0;
602
603                 lov_set_add_req(req, set);
604         }
605         if (!set->set_count)
606                 GOTO(out_set, rc = -EIO);
607         *reqset = set;
608         return rc;
609 out_set:
610         lov_fini_cancel_set(set);
611         return rc;
612 }
613 static int common_attr_done(struct lov_request_set *set)
614 {
615         struct list_head *pos;
616         struct lov_request *req;
617         struct obdo *tmp_oa;
618         int rc = 0, attrset = 0;
619
620         LASSERT(set->set_oi != NULL);
621
622         if (set->set_oi->oi_oa == NULL)
623                 return 0;
624
625         if (!atomic_read(&set->set_success))
626                 return -EIO;
627
628         OBDO_ALLOC(tmp_oa);
629         if (tmp_oa == NULL)
630                 GOTO(out, rc = -ENOMEM);
631
632         list_for_each(pos, &set->set_list) {
633                 req = list_entry(pos, struct lov_request, rq_link);
634
635                 if (!req->rq_complete || req->rq_rc)
636                         continue;
637                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
638                         continue;
639                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
640                                 req->rq_oi.oi_oa->o_valid,
641                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
642         }
643         if (!attrset) {
644                 CERROR("No stripes had valid attrs\n");
645                 rc = -EIO;
646         }
647         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
648             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
649                 /* When we take attributes of some epoch, we require all the
650                  * ost to be active. */
651                 CERROR("Not all the stripes had valid attrs\n");
652                 GOTO(out, rc = -EIO);
653         }
654
655         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
656         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
657 out:
658         if (tmp_oa)
659                 OBDO_FREE(tmp_oa);
660         return rc;
661
662 }
663
664 static int brw_done(struct lov_request_set *set)
665 {
666         struct lov_stripe_md *lsm = set->set_oi->oi_md;
667         struct lov_oinfo     *loi = NULL;
668         struct list_head *pos;
669         struct lov_request *req;
670
671         list_for_each(pos, &set->set_list) {
672                 req = list_entry(pos, struct lov_request, rq_link);
673
674                 if (!req->rq_complete || req->rq_rc)
675                         continue;
676
677                 loi = lsm->lsm_oinfo[req->rq_stripe];
678
679                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
680                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
681         }
682
683         return 0;
684 }
685
686 int lov_fini_brw_set(struct lov_request_set *set)
687 {
688         int rc = 0;
689
690         if (set == NULL)
691                 return 0;
692         LASSERT(set->set_exp);
693         if (atomic_read(&set->set_completes)) {
694                 rc = brw_done(set);
695                 /* FIXME update qos data here */
696         }
697         lov_put_reqset(set);
698
699         return rc;
700 }
701
702 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
703                      obd_count oa_bufs, struct brw_page *pga,
704                      struct obd_trans_info *oti,
705                      struct lov_request_set **reqset)
706 {
707         struct {
708                 obd_count       index;
709                 obd_count       count;
710                 obd_count       off;
711         } *info = NULL;
712         struct lov_request_set *set;
713         struct lov_obd *lov = &exp->exp_obd->u.lov;
714         int rc = 0, i, shift;
715
716         OBD_ALLOC(set, sizeof(*set));
717         if (set == NULL)
718                 return -ENOMEM;
719         lov_init_set(set);
720
721         set->set_exp = exp;
722         set->set_oti = oti;
723         set->set_oi = oinfo;
724         set->set_oabufs = oa_bufs;
725         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
726         if (!set->set_pga)
727                 GOTO(out, rc = -ENOMEM);
728
729         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
730         if (!info)
731                 GOTO(out, rc = -ENOMEM);
732
733         /* calculate the page count for each stripe */
734         for (i = 0; i < oa_bufs; i++) {
735                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
736                 info[stripe].count++;
737         }
738
739         /* alloc and initialize lov request */
740         shift = 0;
741         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
742                 struct lov_oinfo *loi = NULL;
743                 struct lov_request *req;
744
745                 if (info[i].count == 0)
746                         continue;
747
748                 loi = oinfo->oi_md->lsm_oinfo[i];
749                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
750                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
751                         GOTO(out, rc = -EIO);
752                 }
753
754                 OBD_ALLOC(req, sizeof(*req));
755                 if (req == NULL)
756                         GOTO(out, rc = -ENOMEM);
757
758                 OBDO_ALLOC(req->rq_oi.oi_oa);
759                 if (req->rq_oi.oi_oa == NULL) {
760                         OBD_FREE(req, sizeof(*req));
761                         GOTO(out, rc = -ENOMEM);
762                 }
763
764                 if (oinfo->oi_oa) {
765                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
766                                sizeof(*req->rq_oi.oi_oa));
767                 }
768                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
769                 req->rq_oi.oi_oa->o_stripe_idx = i;
770
771                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
772                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
773                 if (req->rq_oi.oi_md == NULL) {
774                         OBDO_FREE(req->rq_oi.oi_oa);
775                         OBD_FREE(req, sizeof(*req));
776                         GOTO(out, rc = -ENOMEM);
777                 }
778
779                 req->rq_idx = loi->loi_ost_idx;
780                 req->rq_stripe = i;
781
782                 /* XXX LOV STACKING */
783                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
784                 req->rq_oabufs = info[i].count;
785                 req->rq_pgaidx = shift;
786                 shift += req->rq_oabufs;
787
788                 /* remember the index for sort brw_page array */
789                 info[i].index = req->rq_pgaidx;
790
791                 req->rq_oi.oi_capa = oinfo->oi_capa;
792
793                 lov_set_add_req(req, set);
794         }
795         if (!set->set_count)
796                 GOTO(out, rc = -EIO);
797
798         /* rotate & sort the brw_page array */
799         for (i = 0; i < oa_bufs; i++) {
800                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
801
802                 shift = info[stripe].index + info[stripe].off;
803                 LASSERT(shift < oa_bufs);
804                 set->set_pga[shift] = pga[i];
805                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
806                                   &set->set_pga[shift].off);
807                 info[stripe].off++;
808         }
809 out:
810         if (info)
811                 OBD_FREE_LARGE(info,
812                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
813
814         if (rc == 0)
815                 *reqset = set;
816         else
817                 lov_fini_brw_set(set);
818
819         return rc;
820 }
821
822 int lov_fini_getattr_set(struct lov_request_set *set)
823 {
824         int rc = 0;
825
826         if (set == NULL)
827                 return 0;
828         LASSERT(set->set_exp);
829         if (atomic_read(&set->set_completes))
830                 rc = common_attr_done(set);
831
832         lov_put_reqset(set);
833
834         return rc;
835 }
836
837 /* The callback for osc_getattr_async that finalizes a request info when a
838  * response is received. */
839 static int cb_getattr_update(void *cookie, int rc)
840 {
841         struct obd_info *oinfo = cookie;
842         struct lov_request *lovreq;
843         lovreq = container_of(oinfo, struct lov_request, rq_oi);
844         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
845 }
846
847 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
848                          struct lov_request_set **reqset)
849 {
850         struct lov_request_set *set;
851         struct lov_obd *lov = &exp->exp_obd->u.lov;
852         int rc = 0, i;
853
854         OBD_ALLOC(set, sizeof(*set));
855         if (set == NULL)
856                 return -ENOMEM;
857         lov_init_set(set);
858
859         set->set_exp = exp;
860         set->set_oi = oinfo;
861
862         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
863                 struct lov_oinfo *loi;
864                 struct lov_request *req;
865
866                 loi = oinfo->oi_md->lsm_oinfo[i];
867                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
868                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
869                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
870                                 /* SOM requires all the OSTs to be active. */
871                                 GOTO(out_set, rc = -EIO);
872                         continue;
873                 }
874
875                 OBD_ALLOC(req, sizeof(*req));
876                 if (req == NULL)
877                         GOTO(out_set, rc = -ENOMEM);
878
879                 req->rq_stripe = i;
880                 req->rq_idx = loi->loi_ost_idx;
881
882                 OBDO_ALLOC(req->rq_oi.oi_oa);
883                 if (req->rq_oi.oi_oa == NULL) {
884                         OBD_FREE(req, sizeof(*req));
885                         GOTO(out_set, rc = -ENOMEM);
886                 }
887                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
888                        sizeof(*req->rq_oi.oi_oa));
889                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
890                 req->rq_oi.oi_cb_up = cb_getattr_update;
891                 req->rq_oi.oi_capa = oinfo->oi_capa;
892
893                 lov_set_add_req(req, set);
894         }
895         if (!set->set_count)
896                 GOTO(out_set, rc = -EIO);
897         *reqset = set;
898         return rc;
899 out_set:
900         lov_fini_getattr_set(set);
901         return rc;
902 }
903
904 int lov_fini_destroy_set(struct lov_request_set *set)
905 {
906         if (set == NULL)
907                 return 0;
908         LASSERT(set->set_exp);
909         if (atomic_read(&set->set_completes)) {
910                 /* FIXME update qos data here */
911         }
912
913         lov_put_reqset(set);
914
915         return 0;
916 }
917
918 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
919                          struct obdo *src_oa, struct lov_stripe_md *lsm,
920                          struct obd_trans_info *oti,
921                          struct lov_request_set **reqset)
922 {
923         struct lov_request_set *set;
924         struct lov_obd *lov = &exp->exp_obd->u.lov;
925         int rc = 0, i;
926
927         OBD_ALLOC(set, sizeof(*set));
928         if (set == NULL)
929                 return -ENOMEM;
930         lov_init_set(set);
931
932         set->set_exp = exp;
933         set->set_oi = oinfo;
934         set->set_oi->oi_md = lsm;
935         set->set_oi->oi_oa = src_oa;
936         set->set_oti = oti;
937         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
938                 set->set_cookies = oti->oti_logcookies;
939
940         for (i = 0; i < lsm->lsm_stripe_count; i++) {
941                 struct lov_oinfo *loi;
942                 struct lov_request *req;
943
944                 loi = lsm->lsm_oinfo[i];
945                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
946                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
947                         continue;
948                 }
949
950                 OBD_ALLOC(req, sizeof(*req));
951                 if (req == NULL)
952                         GOTO(out_set, rc = -ENOMEM);
953
954                 req->rq_stripe = i;
955                 req->rq_idx = loi->loi_ost_idx;
956
957                 OBDO_ALLOC(req->rq_oi.oi_oa);
958                 if (req->rq_oi.oi_oa == NULL) {
959                         OBD_FREE(req, sizeof(*req));
960                         GOTO(out_set, rc = -ENOMEM);
961                 }
962                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
963                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
964                 lov_set_add_req(req, set);
965         }
966         if (!set->set_count)
967                 GOTO(out_set, rc = -EIO);
968         *reqset = set;
969         return rc;
970 out_set:
971         lov_fini_destroy_set(set);
972         return rc;
973 }
974
975 int lov_fini_setattr_set(struct lov_request_set *set)
976 {
977         int rc = 0;
978
979         if (set == NULL)
980                 return 0;
981         LASSERT(set->set_exp);
982         if (atomic_read(&set->set_completes)) {
983                 rc = common_attr_done(set);
984                 /* FIXME update qos data here */
985         }
986
987         lov_put_reqset(set);
988         return rc;
989 }
990
991 int lov_update_setattr_set(struct lov_request_set *set,
992                            struct lov_request *req, int rc)
993 {
994         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
995         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
996
997         lov_update_set(set, req, rc);
998
999         /* grace error on inactive ost */
1000         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1001                     lov->lov_tgts[req->rq_idx]->ltd_active))
1002                 rc = 0;
1003
1004         if (rc == 0) {
1005                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1006                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1007                                 req->rq_oi.oi_oa->o_ctime;
1008                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1009                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1010                                 req->rq_oi.oi_oa->o_mtime;
1011                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1012                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1013                                 req->rq_oi.oi_oa->o_atime;
1014         }
1015
1016         return rc;
1017 }
1018
1019 /* The callback for osc_setattr_async that finalizes a request info when a
1020  * response is received. */
1021 static int cb_setattr_update(void *cookie, int rc)
1022 {
1023         struct obd_info *oinfo = cookie;
1024         struct lov_request *lovreq;
1025         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1026         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1027 }
1028
1029 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1030                          struct obd_trans_info *oti,
1031                          struct lov_request_set **reqset)
1032 {
1033         struct lov_request_set *set;
1034         struct lov_obd *lov = &exp->exp_obd->u.lov;
1035         int rc = 0, i;
1036
1037         OBD_ALLOC(set, sizeof(*set));
1038         if (set == NULL)
1039                 return -ENOMEM;
1040         lov_init_set(set);
1041
1042         set->set_exp = exp;
1043         set->set_oti = oti;
1044         set->set_oi = oinfo;
1045         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1046                 set->set_cookies = oti->oti_logcookies;
1047
1048         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1049                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1050                 struct lov_request *req;
1051
1052                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1053                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1054                         continue;
1055                 }
1056
1057                 OBD_ALLOC(req, sizeof(*req));
1058                 if (req == NULL)
1059                         GOTO(out_set, rc = -ENOMEM);
1060                 req->rq_stripe = i;
1061                 req->rq_idx = loi->loi_ost_idx;
1062
1063                 OBDO_ALLOC(req->rq_oi.oi_oa);
1064                 if (req->rq_oi.oi_oa == NULL) {
1065                         OBD_FREE(req, sizeof(*req));
1066                         GOTO(out_set, rc = -ENOMEM);
1067                 }
1068                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1069                        sizeof(*req->rq_oi.oi_oa));
1070                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1071                 req->rq_oi.oi_oa->o_stripe_idx = i;
1072                 req->rq_oi.oi_cb_up = cb_setattr_update;
1073                 req->rq_oi.oi_capa = oinfo->oi_capa;
1074
1075                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1076                         int off = lov_stripe_offset(oinfo->oi_md,
1077                                                     oinfo->oi_oa->o_size, i,
1078                                                     &req->rq_oi.oi_oa->o_size);
1079
1080                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1081                                 req->rq_oi.oi_oa->o_size--;
1082
1083                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1084                                i, req->rq_oi.oi_oa->o_size,
1085                                oinfo->oi_oa->o_size);
1086                 }
1087                 lov_set_add_req(req, set);
1088         }
1089         if (!set->set_count)
1090                 GOTO(out_set, rc = -EIO);
1091         *reqset = set;
1092         return rc;
1093 out_set:
1094         lov_fini_setattr_set(set);
1095         return rc;
1096 }
1097
1098 int lov_fini_punch_set(struct lov_request_set *set)
1099 {
1100         int rc = 0;
1101
1102         if (set == NULL)
1103                 return 0;
1104         LASSERT(set->set_exp);
1105         if (atomic_read(&set->set_completes)) {
1106                 rc = -EIO;
1107                 /* FIXME update qos data here */
1108                 if (atomic_read(&set->set_success))
1109                         rc = common_attr_done(set);
1110         }
1111
1112         lov_put_reqset(set);
1113
1114         return rc;
1115 }
1116
1117 int lov_update_punch_set(struct lov_request_set *set,
1118                          struct lov_request *req, int rc)
1119 {
1120         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1121         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1122
1123         lov_update_set(set, req, rc);
1124
1125         /* grace error on inactive ost */
1126         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1127                 rc = 0;
1128
1129         if (rc == 0) {
1130                 lov_stripe_lock(lsm);
1131                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1132                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1133                                 req->rq_oi.oi_oa->o_blocks;
1134                 }
1135
1136                 lov_stripe_unlock(lsm);
1137         }
1138
1139         return rc;
1140 }
1141
1142 /* The callback for osc_punch that finalizes a request info when a response
1143  * is received. */
1144 static int cb_update_punch(void *cookie, int rc)
1145 {
1146         struct obd_info *oinfo = cookie;
1147         struct lov_request *lovreq;
1148         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1149         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1150 }
1151
1152 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1153                        struct obd_trans_info *oti,
1154                        struct lov_request_set **reqset)
1155 {
1156         struct lov_request_set *set;
1157         struct lov_obd *lov = &exp->exp_obd->u.lov;
1158         int rc = 0, i;
1159
1160         OBD_ALLOC(set, sizeof(*set));
1161         if (set == NULL)
1162                 return -ENOMEM;
1163         lov_init_set(set);
1164
1165         set->set_oi = oinfo;
1166         set->set_exp = exp;
1167
1168         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1169                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1170                 struct lov_request *req;
1171                 obd_off rs, re;
1172
1173                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1174                                            oinfo->oi_policy.l_extent.start,
1175                                            oinfo->oi_policy.l_extent.end,
1176                                            &rs, &re))
1177                         continue;
1178
1179                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1180                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1181                         GOTO(out_set, rc = -EIO);
1182                 }
1183
1184                 OBD_ALLOC(req, sizeof(*req));
1185                 if (req == NULL)
1186                         GOTO(out_set, rc = -ENOMEM);
1187                 req->rq_stripe = i;
1188                 req->rq_idx = loi->loi_ost_idx;
1189
1190                 OBDO_ALLOC(req->rq_oi.oi_oa);
1191                 if (req->rq_oi.oi_oa == NULL) {
1192                         OBD_FREE(req, sizeof(*req));
1193                         GOTO(out_set, rc = -ENOMEM);
1194                 }
1195                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1196                        sizeof(*req->rq_oi.oi_oa));
1197                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1198                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1199
1200                 req->rq_oi.oi_oa->o_stripe_idx = i;
1201                 req->rq_oi.oi_cb_up = cb_update_punch;
1202
1203                 req->rq_oi.oi_policy.l_extent.start = rs;
1204                 req->rq_oi.oi_policy.l_extent.end = re;
1205                 req->rq_oi.oi_policy.l_extent.gid = -1;
1206
1207                 req->rq_oi.oi_capa = oinfo->oi_capa;
1208
1209                 lov_set_add_req(req, set);
1210         }
1211         if (!set->set_count)
1212                 GOTO(out_set, rc = -EIO);
1213         *reqset = set;
1214         return rc;
1215 out_set:
1216         lov_fini_punch_set(set);
1217         return rc;
1218 }
1219
1220 int lov_fini_sync_set(struct lov_request_set *set)
1221 {
1222         int rc = 0;
1223
1224         if (set == NULL)
1225                 return 0;
1226         LASSERT(set->set_exp);
1227         if (atomic_read(&set->set_completes)) {
1228                 if (!atomic_read(&set->set_success))
1229                         rc = -EIO;
1230                 /* FIXME update qos data here */
1231         }
1232
1233         lov_put_reqset(set);
1234
1235         return rc;
1236 }
1237
1238 /* The callback for osc_sync that finalizes a request info when a
1239  * response is received. */
1240 static int cb_sync_update(void *cookie, int rc)
1241 {
1242         struct obd_info *oinfo = cookie;
1243         struct lov_request *lovreq;
1244
1245         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1246         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1247 }
1248
1249 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1250                       obd_off start, obd_off end,
1251                       struct lov_request_set **reqset)
1252 {
1253         struct lov_request_set *set;
1254         struct lov_obd *lov = &exp->exp_obd->u.lov;
1255         int rc = 0, i;
1256
1257         OBD_ALLOC_PTR(set);
1258         if (set == NULL)
1259                 return -ENOMEM;
1260         lov_init_set(set);
1261
1262         set->set_exp = exp;
1263         set->set_oi = oinfo;
1264
1265         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1266                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1267                 struct lov_request *req;
1268                 obd_off rs, re;
1269
1270                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1271                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1272                         continue;
1273                 }
1274
1275                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1276                                            &re))
1277                         continue;
1278
1279                 OBD_ALLOC_PTR(req);
1280                 if (req == NULL)
1281                         GOTO(out_set, rc = -ENOMEM);
1282                 req->rq_stripe = i;
1283                 req->rq_idx = loi->loi_ost_idx;
1284
1285                 OBDO_ALLOC(req->rq_oi.oi_oa);
1286                 if (req->rq_oi.oi_oa == NULL) {
1287                         OBD_FREE(req, sizeof(*req));
1288                         GOTO(out_set, rc = -ENOMEM);
1289                 }
1290                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1291                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1292                 req->rq_oi.oi_oa->o_stripe_idx = i;
1293
1294                 req->rq_oi.oi_policy.l_extent.start = rs;
1295                 req->rq_oi.oi_policy.l_extent.end = re;
1296                 req->rq_oi.oi_policy.l_extent.gid = -1;
1297                 req->rq_oi.oi_cb_up = cb_sync_update;
1298
1299                 lov_set_add_req(req, set);
1300         }
1301         if (!set->set_count)
1302                 GOTO(out_set, rc = -EIO);
1303         *reqset = set;
1304         return rc;
1305 out_set:
1306         lov_fini_sync_set(set);
1307         return rc;
1308 }
1309
1310 #define LOV_U64_MAX ((__u64)~0ULL)
1311 #define LOV_SUM_MAX(tot, add)                                      \
1312         do {                                                        \
1313                 if ((tot) + (add) < (tot))                            \
1314                         (tot) = LOV_U64_MAX;                        \
1315                 else                                                \
1316                         (tot) += (add);                          \
1317         } while (0)
1318
1319 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1320 {
1321         if (success) {
1322                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1323                                                            LOV_MAGIC, 0);
1324                 if (osfs->os_files != LOV_U64_MAX)
1325                         lov_do_div64(osfs->os_files, expected_stripes);
1326                 if (osfs->os_ffree != LOV_U64_MAX)
1327                         lov_do_div64(osfs->os_ffree, expected_stripes);
1328
1329                 spin_lock(&obd->obd_osfs_lock);
1330                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1331                 obd->obd_osfs_age = cfs_time_current_64();
1332                 spin_unlock(&obd->obd_osfs_lock);
1333                 return 0;
1334         }
1335
1336         return -EIO;
1337 }
1338
1339 int lov_fini_statfs_set(struct lov_request_set *set)
1340 {
1341         int rc = 0;
1342
1343         if (set == NULL)
1344                 return 0;
1345
1346         if (atomic_read(&set->set_completes)) {
1347                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1348                                      atomic_read(&set->set_success));
1349         }
1350         lov_put_reqset(set);
1351         return rc;
1352 }
1353
1354 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1355                        int success)
1356 {
1357         int shift = 0, quit = 0;
1358         __u64 tmp;
1359
1360         if (success == 0) {
1361                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1362         } else {
1363                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1364                         /* assume all block sizes are always powers of 2 */
1365                         /* get the bits difference */
1366                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1367                         for (shift = 0; shift <= 64; ++shift) {
1368                                 if (tmp & 1) {
1369                                         if (quit)
1370                                                 break;
1371                                         else
1372                                                 quit = 1;
1373                                         shift = 0;
1374                                 }
1375                                 tmp >>= 1;
1376                         }
1377                 }
1378
1379                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1380                         osfs->os_bsize = lov_sfs->os_bsize;
1381
1382                         osfs->os_bfree  >>= shift;
1383                         osfs->os_bavail >>= shift;
1384                         osfs->os_blocks >>= shift;
1385                 } else if (shift != 0) {
1386                         lov_sfs->os_bfree  >>= shift;
1387                         lov_sfs->os_bavail >>= shift;
1388                         lov_sfs->os_blocks >>= shift;
1389                 }
1390                 osfs->os_bfree += lov_sfs->os_bfree;
1391                 osfs->os_bavail += lov_sfs->os_bavail;
1392                 osfs->os_blocks += lov_sfs->os_blocks;
1393                 /* XXX not sure about this one - depends on policy.
1394                  *   - could be minimum if we always stripe on all OBDs
1395                  *     (but that would be wrong for any other policy,
1396                  *     if one of the OBDs has no more objects left)
1397                  *   - could be sum if we stripe whole objects
1398                  *   - could be average, just to give a nice number
1399                  *
1400                  * To give a "reasonable" (if not wholly accurate)
1401                  * number, we divide the total number of free objects
1402                  * by expected stripe count (watch out for overflow).
1403                  */
1404                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1405                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1406         }
1407 }
1408
1409 /* The callback for osc_statfs_async that finalizes a request info when a
1410  * response is received. */
1411 static int cb_statfs_update(void *cookie, int rc)
1412 {
1413         struct obd_info *oinfo = cookie;
1414         struct lov_request *lovreq;
1415         struct lov_request_set *set;
1416         struct obd_statfs *osfs, *lov_sfs;
1417         struct lov_obd *lov;
1418         struct lov_tgt_desc *tgt;
1419         struct obd_device *lovobd, *tgtobd;
1420         int success;
1421
1422         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1423         set = lovreq->rq_rqset;
1424         lovobd = set->set_obd;
1425         lov = &lovobd->u.lov;
1426         osfs = set->set_oi->oi_osfs;
1427         lov_sfs = oinfo->oi_osfs;
1428         success = atomic_read(&set->set_success);
1429         /* XXX: the same is done in lov_update_common_set, however
1430            lovset->set_exp is not initialized. */
1431         lov_update_set(set, lovreq, rc);
1432         if (rc)
1433                 GOTO(out, rc);
1434
1435         obd_getref(lovobd);
1436         tgt = lov->lov_tgts[lovreq->rq_idx];
1437         if (!tgt || !tgt->ltd_active)
1438                 GOTO(out_update, rc);
1439
1440         tgtobd = class_exp2obd(tgt->ltd_exp);
1441         spin_lock(&tgtobd->obd_osfs_lock);
1442         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1443         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1444                 tgtobd->obd_osfs_age = cfs_time_current_64();
1445         spin_unlock(&tgtobd->obd_osfs_lock);
1446
1447 out_update:
1448         lov_update_statfs(osfs, lov_sfs, success);
1449         obd_putref(lovobd);
1450
1451 out:
1452         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1453             lov_set_finished(set, 0)) {
1454                 lov_statfs_interpret(NULL, set, set->set_count !=
1455                                      atomic_read(&set->set_success));
1456         }
1457
1458         return 0;
1459 }
1460
1461 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1462                         struct lov_request_set **reqset)
1463 {
1464         struct lov_request_set *set;
1465         struct lov_obd *lov = &obd->u.lov;
1466         int rc = 0, i;
1467
1468         OBD_ALLOC(set, sizeof(*set));
1469         if (set == NULL)
1470                 return -ENOMEM;
1471         lov_init_set(set);
1472
1473         set->set_obd = obd;
1474         set->set_oi = oinfo;
1475
1476         /* We only get block data from the OBD */
1477         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1478                 struct lov_request *req;
1479
1480                 if (lov->lov_tgts[i] == NULL ||
1481                     (!lov_check_and_wait_active(lov, i) &&
1482                      (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1483                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1484                         continue;
1485                 }
1486
1487                 /* skip targets that have been explicitly disabled by the
1488                  * administrator */
1489                 if (!lov->lov_tgts[i]->ltd_exp) {
1490                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1491                         continue;
1492                 }
1493
1494                 OBD_ALLOC(req, sizeof(*req));
1495                 if (req == NULL)
1496                         GOTO(out_set, rc = -ENOMEM);
1497
1498                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1499                 if (req->rq_oi.oi_osfs == NULL) {
1500                         OBD_FREE(req, sizeof(*req));
1501                         GOTO(out_set, rc = -ENOMEM);
1502                 }
1503
1504                 req->rq_idx = i;
1505                 req->rq_oi.oi_cb_up = cb_statfs_update;
1506                 req->rq_oi.oi_flags = oinfo->oi_flags;
1507
1508                 lov_set_add_req(req, set);
1509         }
1510         if (!set->set_count)
1511                 GOTO(out_set, rc = -EIO);
1512         *reqset = set;
1513         return rc;
1514 out_set:
1515         lov_fini_statfs_set(set);
1516         return rc;
1517 }