4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_LOV
39 #include <linux/libcfs/libcfs.h>
41 #include <obd_class.h>
42 #include <lustre/lustre_idl.h>
44 #include "lov_internal.h"
46 static void lov_init_set(struct lov_request_set *set)
49 atomic_set(&set->set_completes, 0);
50 atomic_set(&set->set_success, 0);
51 atomic_set(&set->set_finish_checked, 0);
52 set->set_cookies = NULL;
53 INIT_LIST_HEAD(&set->set_list);
54 atomic_set(&set->set_refcount, 1);
55 init_waitqueue_head(&set->set_waitq);
56 spin_lock_init(&set->set_lock);
59 void lov_finish_set(struct lov_request_set *set)
61 struct list_head *pos, *n;
64 list_for_each_safe(pos, n, &set->set_list) {
65 struct lov_request *req = list_entry(pos,
68 list_del_init(&req->rq_link);
71 OBDO_FREE(req->rq_oi.oi_oa);
73 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
74 if (req->rq_oi.oi_osfs)
75 OBD_FREE(req->rq_oi.oi_osfs,
76 sizeof(*req->rq_oi.oi_osfs));
77 OBD_FREE(req, sizeof(*req));
81 int len = set->set_oabufs * sizeof(*set->set_pga);
82 OBD_FREE_LARGE(set->set_pga, len);
85 lov_llh_put(set->set_lockh);
87 OBD_FREE(set, sizeof(*set));
90 int lov_set_finished(struct lov_request_set *set, int idempotent)
92 int completes = atomic_read(&set->set_completes);
94 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
96 if (completes == set->set_count) {
99 if (atomic_inc_return(&set->set_finish_checked) == 1)
105 void lov_update_set(struct lov_request_set *set,
106 struct lov_request *req, int rc)
108 req->rq_complete = 1;
111 atomic_inc(&set->set_completes);
113 atomic_inc(&set->set_success);
115 wake_up(&set->set_waitq);
118 int lov_update_common_set(struct lov_request_set *set,
119 struct lov_request *req, int rc)
121 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123 lov_update_set(set, req, rc);
125 /* grace error on inactive ost */
126 if (rc && !(lov->lov_tgts[req->rq_idx] &&
127 lov->lov_tgts[req->rq_idx]->ltd_active))
130 /* FIXME in raid1 regime, should return 0 */
134 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136 list_add_tail(&req->rq_link, &set->set_list);
141 static int lov_check_set(struct lov_obd *lov, int idx)
144 mutex_lock(&lov->lov_lock);
146 if (lov->lov_tgts[idx] == NULL ||
147 lov->lov_tgts[idx]->ltd_active ||
148 (lov->lov_tgts[idx]->ltd_exp != NULL &&
149 class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
152 mutex_unlock(&lov->lov_lock);
156 /* Check if the OSC connection exists and is active.
157 * If the OSC has not yet had a chance to connect to the OST the first time,
158 * wait once for it to connect instead of returning an error.
160 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
162 wait_queue_head_t waitq;
163 struct l_wait_info lwi;
164 struct lov_tgt_desc *tgt;
167 mutex_lock(&lov->lov_lock);
169 tgt = lov->lov_tgts[ost_idx];
171 if (unlikely(tgt == NULL))
174 if (likely(tgt->ltd_active))
177 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
180 mutex_unlock(&lov->lov_lock);
182 init_waitqueue_head(&waitq);
183 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
184 cfs_time_seconds(1), NULL, NULL);
186 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
187 if (tgt != NULL && tgt->ltd_active)
193 mutex_unlock(&lov->lov_lock);
197 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
198 struct lov_oinfo *loi, int flags,
199 struct ost_lvb *lvb, __u32 mode, int rc);
201 static int lov_update_enqueue_lov(struct obd_export *exp,
202 struct lustre_handle *lov_lockhp,
203 struct lov_oinfo *loi, int flags, int idx,
204 struct ost_id *oi, int rc)
206 struct lov_obd *lov = &exp->exp_obd->u.lov;
208 if (rc != ELDLM_OK &&
209 !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
210 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
211 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
212 /* -EUSERS used by OST to report file contention */
213 if (rc != -EINTR && rc != -EUSERS)
214 CERROR("%s: enqueue objid "DOSTID" subobj"
215 DOSTID" on OST idx %d: rc %d\n",
216 exp->exp_obd->obd_name,
217 POSTID(oi), POSTID(&loi->loi_oi),
218 loi->loi_ost_idx, rc);
225 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
227 struct lov_request_set *set = req->rq_rqset;
228 struct lustre_handle *lov_lockhp;
229 struct obd_info *oi = set->set_oi;
230 struct lov_oinfo *loi;
234 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
235 loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
237 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
238 * and that copy can be arbitrarily out of date.
240 * The LOV API is due for a serious rewriting anyways, and this
241 * can be addressed then. */
243 lov_stripe_lock(oi->oi_md);
244 osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
245 &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
246 if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
247 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
248 rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
249 req->rq_idx, &oi->oi_md->lsm_oi, rc);
250 lov_stripe_unlock(oi->oi_md);
251 lov_update_set(set, req, rc);
255 /* The callback for osc_enqueue that updates lov info for every OSC request. */
256 static int cb_update_enqueue(void *cookie, int rc)
258 struct obd_info *oinfo = cookie;
259 struct ldlm_enqueue_info *einfo;
260 struct lov_request *lovreq;
262 lovreq = container_of(oinfo, struct lov_request, rq_oi);
263 einfo = lovreq->rq_rqset->set_ei;
264 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
267 static int enqueue_done(struct lov_request_set *set, __u32 mode)
269 struct lov_request *req;
270 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
271 int completes = atomic_read(&set->set_completes);
274 /* enqueue/match success, just return */
275 if (completes && completes == atomic_read(&set->set_success))
278 /* cancel enqueued/matched locks */
279 list_for_each_entry(req, &set->set_list, rq_link) {
280 struct lustre_handle *lov_lockhp;
282 if (!req->rq_complete || req->rq_rc)
285 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
287 if (!lustre_handle_is_used(lov_lockhp))
290 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
291 req->rq_oi.oi_md, mode, lov_lockhp);
292 if (rc && lov->lov_tgts[req->rq_idx] &&
293 lov->lov_tgts[req->rq_idx]->ltd_active)
294 CERROR("%s: cancelling obdjid "DOSTID" on OST"
295 "idx %d error: rc = %d\n",
296 set->set_exp->exp_obd->obd_name,
297 POSTID(&req->rq_oi.oi_md->lsm_oi),
301 lov_llh_put(set->set_lockh);
305 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
306 struct ptlrpc_request_set *rqset)
312 LASSERT(set->set_exp);
313 /* Do enqueue_done only for sync requests and if any request
317 atomic_set(&set->set_completes, 0);
318 ret = enqueue_done(set, mode);
319 } else if (set->set_lockh)
320 lov_llh_put(set->set_lockh);
324 return rc ? rc : ret;
327 static void lov_llh_addref(void *llhp)
329 struct lov_lock_handles *llh = llhp;
331 atomic_inc(&llh->llh_refcount);
332 CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
333 atomic_read(&llh->llh_refcount));
336 static struct portals_handle_ops lov_handle_ops = {
337 .hop_addref = lov_llh_addref,
341 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
343 struct lov_lock_handles *llh;
345 OBD_ALLOC(llh, sizeof(*llh) +
346 sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
350 atomic_set(&llh->llh_refcount, 2);
351 llh->llh_stripe_count = lsm->lsm_stripe_count;
352 INIT_LIST_HEAD(&llh->llh_handle.h_link);
353 class_handle_hash(&llh->llh_handle, &lov_handle_ops);
358 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
359 struct ldlm_enqueue_info *einfo,
360 struct lov_request_set **reqset)
362 struct lov_obd *lov = &exp->exp_obd->u.lov;
363 struct lov_request_set *set;
366 OBD_ALLOC(set, sizeof(*set));
374 set->set_lockh = lov_llh_new(oinfo->oi_md);
375 if (set->set_lockh == NULL)
376 GOTO(out_set, rc = -ENOMEM);
377 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
379 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
380 struct lov_oinfo *loi;
381 struct lov_request *req;
384 loi = oinfo->oi_md->lsm_oinfo[i];
385 if (!lov_stripe_intersects(oinfo->oi_md, i,
386 oinfo->oi_policy.l_extent.start,
387 oinfo->oi_policy.l_extent.end,
391 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
392 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
396 OBD_ALLOC(req, sizeof(*req));
398 GOTO(out_set, rc = -ENOMEM);
400 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
401 sizeof(struct lov_oinfo *) +
402 sizeof(struct lov_oinfo);
403 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
404 if (req->rq_oi.oi_md == NULL) {
405 OBD_FREE(req, sizeof(*req));
406 GOTO(out_set, rc = -ENOMEM);
408 req->rq_oi.oi_md->lsm_oinfo[0] =
409 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
410 sizeof(struct lov_oinfo *);
412 /* Set lov request specific parameters. */
413 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
414 req->rq_oi.oi_cb_up = cb_update_enqueue;
415 req->rq_oi.oi_flags = oinfo->oi_flags;
417 LASSERT(req->rq_oi.oi_lockh);
419 req->rq_oi.oi_policy.l_extent.gid =
420 oinfo->oi_policy.l_extent.gid;
421 req->rq_oi.oi_policy.l_extent.start = start;
422 req->rq_oi.oi_policy.l_extent.end = end;
424 req->rq_idx = loi->loi_ost_idx;
427 /* XXX LOV STACKING: submd should be from the subobj */
428 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
429 req->rq_oi.oi_md->lsm_stripe_count = 0;
430 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
432 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
433 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
435 lov_set_add_req(req, set);
438 GOTO(out_set, rc = -EIO);
442 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
446 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
452 LASSERT(set->set_exp);
453 rc = enqueue_done(set, mode);
454 if ((set->set_count == atomic_read(&set->set_success)) &&
455 (flags & LDLM_FL_TEST_LOCK))
456 lov_llh_put(set->set_lockh);
463 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
464 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
465 __u32 mode, struct lustre_handle *lockh,
466 struct lov_request_set **reqset)
468 struct lov_obd *lov = &exp->exp_obd->u.lov;
469 struct lov_request_set *set;
472 OBD_ALLOC(set, sizeof(*set));
479 set->set_oi->oi_md = lsm;
480 set->set_lockh = lov_llh_new(lsm);
481 if (set->set_lockh == NULL)
482 GOTO(out_set, rc = -ENOMEM);
483 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
485 for (i = 0; i < lsm->lsm_stripe_count; i++){
486 struct lov_oinfo *loi;
487 struct lov_request *req;
490 loi = lsm->lsm_oinfo[i];
491 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
492 policy->l_extent.end, &start, &end))
495 /* FIXME raid1 should grace this error */
496 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
497 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
498 GOTO(out_set, rc = -EIO);
501 OBD_ALLOC(req, sizeof(*req));
503 GOTO(out_set, rc = -ENOMEM);
505 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
506 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
507 if (req->rq_oi.oi_md == NULL) {
508 OBD_FREE(req, sizeof(*req));
509 GOTO(out_set, rc = -ENOMEM);
512 req->rq_oi.oi_policy.l_extent.start = start;
513 req->rq_oi.oi_policy.l_extent.end = end;
514 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
516 req->rq_idx = loi->loi_ost_idx;
519 /* XXX LOV STACKING: submd should be from the subobj */
520 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
521 req->rq_oi.oi_md->lsm_stripe_count = 0;
523 lov_set_add_req(req, set);
526 GOTO(out_set, rc = -EIO);
530 lov_fini_match_set(set, mode, 0);
534 int lov_fini_cancel_set(struct lov_request_set *set)
541 LASSERT(set->set_exp);
543 lov_llh_put(set->set_lockh);
550 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
551 struct lov_stripe_md *lsm, __u32 mode,
552 struct lustre_handle *lockh,
553 struct lov_request_set **reqset)
555 struct lov_request_set *set;
558 OBD_ALLOC(set, sizeof(*set));
565 set->set_oi->oi_md = lsm;
566 set->set_lockh = lov_handle2llh(lockh);
567 if (set->set_lockh == NULL) {
568 CERROR("LOV: invalid lov lock handle %p\n", lockh);
569 GOTO(out_set, rc = -EINVAL);
571 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
573 for (i = 0; i < lsm->lsm_stripe_count; i++){
574 struct lov_request *req;
575 struct lustre_handle *lov_lockhp;
576 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
578 lov_lockhp = set->set_lockh->llh_handles + i;
579 if (!lustre_handle_is_used(lov_lockhp)) {
580 CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
581 loi->loi_ost_idx, POSTID(&loi->loi_oi));
585 OBD_ALLOC(req, sizeof(*req));
587 GOTO(out_set, rc = -ENOMEM);
589 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
590 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
591 if (req->rq_oi.oi_md == NULL) {
592 OBD_FREE(req, sizeof(*req));
593 GOTO(out_set, rc = -ENOMEM);
596 req->rq_idx = loi->loi_ost_idx;
599 /* XXX LOV STACKING: submd should be from the subobj */
600 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
601 req->rq_oi.oi_md->lsm_stripe_count = 0;
603 lov_set_add_req(req, set);
606 GOTO(out_set, rc = -EIO);
610 lov_fini_cancel_set(set);
613 static int common_attr_done(struct lov_request_set *set)
615 struct list_head *pos;
616 struct lov_request *req;
618 int rc = 0, attrset = 0;
620 LASSERT(set->set_oi != NULL);
622 if (set->set_oi->oi_oa == NULL)
625 if (!atomic_read(&set->set_success))
630 GOTO(out, rc = -ENOMEM);
632 list_for_each(pos, &set->set_list) {
633 req = list_entry(pos, struct lov_request, rq_link);
635 if (!req->rq_complete || req->rq_rc)
637 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
639 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
640 req->rq_oi.oi_oa->o_valid,
641 set->set_oi->oi_md, req->rq_stripe, &attrset);
644 CERROR("No stripes had valid attrs\n");
647 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
648 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
649 /* When we take attributes of some epoch, we require all the
650 * ost to be active. */
651 CERROR("Not all the stripes had valid attrs\n");
652 GOTO(out, rc = -EIO);
655 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
656 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
664 static int brw_done(struct lov_request_set *set)
666 struct lov_stripe_md *lsm = set->set_oi->oi_md;
667 struct lov_oinfo *loi = NULL;
668 struct list_head *pos;
669 struct lov_request *req;
671 list_for_each(pos, &set->set_list) {
672 req = list_entry(pos, struct lov_request, rq_link);
674 if (!req->rq_complete || req->rq_rc)
677 loi = lsm->lsm_oinfo[req->rq_stripe];
679 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
680 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
686 int lov_fini_brw_set(struct lov_request_set *set)
692 LASSERT(set->set_exp);
693 if (atomic_read(&set->set_completes)) {
695 /* FIXME update qos data here */
702 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
703 obd_count oa_bufs, struct brw_page *pga,
704 struct obd_trans_info *oti,
705 struct lov_request_set **reqset)
712 struct lov_request_set *set;
713 struct lov_obd *lov = &exp->exp_obd->u.lov;
714 int rc = 0, i, shift;
716 OBD_ALLOC(set, sizeof(*set));
724 set->set_oabufs = oa_bufs;
725 OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
727 GOTO(out, rc = -ENOMEM);
729 OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
731 GOTO(out, rc = -ENOMEM);
733 /* calculate the page count for each stripe */
734 for (i = 0; i < oa_bufs; i++) {
735 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
736 info[stripe].count++;
739 /* alloc and initialize lov request */
741 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
742 struct lov_oinfo *loi = NULL;
743 struct lov_request *req;
745 if (info[i].count == 0)
748 loi = oinfo->oi_md->lsm_oinfo[i];
749 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
750 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
751 GOTO(out, rc = -EIO);
754 OBD_ALLOC(req, sizeof(*req));
756 GOTO(out, rc = -ENOMEM);
758 OBDO_ALLOC(req->rq_oi.oi_oa);
759 if (req->rq_oi.oi_oa == NULL) {
760 OBD_FREE(req, sizeof(*req));
761 GOTO(out, rc = -ENOMEM);
765 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
766 sizeof(*req->rq_oi.oi_oa));
768 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
769 req->rq_oi.oi_oa->o_stripe_idx = i;
771 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
772 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
773 if (req->rq_oi.oi_md == NULL) {
774 OBDO_FREE(req->rq_oi.oi_oa);
775 OBD_FREE(req, sizeof(*req));
776 GOTO(out, rc = -ENOMEM);
779 req->rq_idx = loi->loi_ost_idx;
782 /* XXX LOV STACKING */
783 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
784 req->rq_oabufs = info[i].count;
785 req->rq_pgaidx = shift;
786 shift += req->rq_oabufs;
788 /* remember the index for sort brw_page array */
789 info[i].index = req->rq_pgaidx;
791 req->rq_oi.oi_capa = oinfo->oi_capa;
793 lov_set_add_req(req, set);
796 GOTO(out, rc = -EIO);
798 /* rotate & sort the brw_page array */
799 for (i = 0; i < oa_bufs; i++) {
800 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
802 shift = info[stripe].index + info[stripe].off;
803 LASSERT(shift < oa_bufs);
804 set->set_pga[shift] = pga[i];
805 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
806 &set->set_pga[shift].off);
812 sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
817 lov_fini_brw_set(set);
822 int lov_fini_getattr_set(struct lov_request_set *set)
828 LASSERT(set->set_exp);
829 if (atomic_read(&set->set_completes))
830 rc = common_attr_done(set);
837 /* The callback for osc_getattr_async that finalizes a request info when a
838 * response is received. */
839 static int cb_getattr_update(void *cookie, int rc)
841 struct obd_info *oinfo = cookie;
842 struct lov_request *lovreq;
843 lovreq = container_of(oinfo, struct lov_request, rq_oi);
844 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
847 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
848 struct lov_request_set **reqset)
850 struct lov_request_set *set;
851 struct lov_obd *lov = &exp->exp_obd->u.lov;
854 OBD_ALLOC(set, sizeof(*set));
862 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
863 struct lov_oinfo *loi;
864 struct lov_request *req;
866 loi = oinfo->oi_md->lsm_oinfo[i];
867 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
868 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
869 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
870 /* SOM requires all the OSTs to be active. */
871 GOTO(out_set, rc = -EIO);
875 OBD_ALLOC(req, sizeof(*req));
877 GOTO(out_set, rc = -ENOMEM);
880 req->rq_idx = loi->loi_ost_idx;
882 OBDO_ALLOC(req->rq_oi.oi_oa);
883 if (req->rq_oi.oi_oa == NULL) {
884 OBD_FREE(req, sizeof(*req));
885 GOTO(out_set, rc = -ENOMEM);
887 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
888 sizeof(*req->rq_oi.oi_oa));
889 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
890 req->rq_oi.oi_cb_up = cb_getattr_update;
891 req->rq_oi.oi_capa = oinfo->oi_capa;
893 lov_set_add_req(req, set);
896 GOTO(out_set, rc = -EIO);
900 lov_fini_getattr_set(set);
904 int lov_fini_destroy_set(struct lov_request_set *set)
908 LASSERT(set->set_exp);
909 if (atomic_read(&set->set_completes)) {
910 /* FIXME update qos data here */
918 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
919 struct obdo *src_oa, struct lov_stripe_md *lsm,
920 struct obd_trans_info *oti,
921 struct lov_request_set **reqset)
923 struct lov_request_set *set;
924 struct lov_obd *lov = &exp->exp_obd->u.lov;
927 OBD_ALLOC(set, sizeof(*set));
934 set->set_oi->oi_md = lsm;
935 set->set_oi->oi_oa = src_oa;
937 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
938 set->set_cookies = oti->oti_logcookies;
940 for (i = 0; i < lsm->lsm_stripe_count; i++) {
941 struct lov_oinfo *loi;
942 struct lov_request *req;
944 loi = lsm->lsm_oinfo[i];
945 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
946 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
950 OBD_ALLOC(req, sizeof(*req));
952 GOTO(out_set, rc = -ENOMEM);
955 req->rq_idx = loi->loi_ost_idx;
957 OBDO_ALLOC(req->rq_oi.oi_oa);
958 if (req->rq_oi.oi_oa == NULL) {
959 OBD_FREE(req, sizeof(*req));
960 GOTO(out_set, rc = -ENOMEM);
962 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
963 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
964 lov_set_add_req(req, set);
967 GOTO(out_set, rc = -EIO);
971 lov_fini_destroy_set(set);
975 int lov_fini_setattr_set(struct lov_request_set *set)
981 LASSERT(set->set_exp);
982 if (atomic_read(&set->set_completes)) {
983 rc = common_attr_done(set);
984 /* FIXME update qos data here */
991 int lov_update_setattr_set(struct lov_request_set *set,
992 struct lov_request *req, int rc)
994 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
995 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
997 lov_update_set(set, req, rc);
999 /* grace error on inactive ost */
1000 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1001 lov->lov_tgts[req->rq_idx]->ltd_active))
1005 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1006 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1007 req->rq_oi.oi_oa->o_ctime;
1008 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1009 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1010 req->rq_oi.oi_oa->o_mtime;
1011 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1012 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1013 req->rq_oi.oi_oa->o_atime;
1019 /* The callback for osc_setattr_async that finalizes a request info when a
1020 * response is received. */
1021 static int cb_setattr_update(void *cookie, int rc)
1023 struct obd_info *oinfo = cookie;
1024 struct lov_request *lovreq;
1025 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1026 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1029 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1030 struct obd_trans_info *oti,
1031 struct lov_request_set **reqset)
1033 struct lov_request_set *set;
1034 struct lov_obd *lov = &exp->exp_obd->u.lov;
1037 OBD_ALLOC(set, sizeof(*set));
1044 set->set_oi = oinfo;
1045 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1046 set->set_cookies = oti->oti_logcookies;
1048 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1049 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1050 struct lov_request *req;
1052 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1053 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1057 OBD_ALLOC(req, sizeof(*req));
1059 GOTO(out_set, rc = -ENOMEM);
1061 req->rq_idx = loi->loi_ost_idx;
1063 OBDO_ALLOC(req->rq_oi.oi_oa);
1064 if (req->rq_oi.oi_oa == NULL) {
1065 OBD_FREE(req, sizeof(*req));
1066 GOTO(out_set, rc = -ENOMEM);
1068 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1069 sizeof(*req->rq_oi.oi_oa));
1070 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1071 req->rq_oi.oi_oa->o_stripe_idx = i;
1072 req->rq_oi.oi_cb_up = cb_setattr_update;
1073 req->rq_oi.oi_capa = oinfo->oi_capa;
1075 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1076 int off = lov_stripe_offset(oinfo->oi_md,
1077 oinfo->oi_oa->o_size, i,
1078 &req->rq_oi.oi_oa->o_size);
1080 if (off < 0 && req->rq_oi.oi_oa->o_size)
1081 req->rq_oi.oi_oa->o_size--;
1083 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1084 i, req->rq_oi.oi_oa->o_size,
1085 oinfo->oi_oa->o_size);
1087 lov_set_add_req(req, set);
1089 if (!set->set_count)
1090 GOTO(out_set, rc = -EIO);
1094 lov_fini_setattr_set(set);
1098 int lov_fini_punch_set(struct lov_request_set *set)
1104 LASSERT(set->set_exp);
1105 if (atomic_read(&set->set_completes)) {
1107 /* FIXME update qos data here */
1108 if (atomic_read(&set->set_success))
1109 rc = common_attr_done(set);
1112 lov_put_reqset(set);
1117 int lov_update_punch_set(struct lov_request_set *set,
1118 struct lov_request *req, int rc)
1120 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1121 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1123 lov_update_set(set, req, rc);
1125 /* grace error on inactive ost */
1126 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1130 lov_stripe_lock(lsm);
1131 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1132 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1133 req->rq_oi.oi_oa->o_blocks;
1136 lov_stripe_unlock(lsm);
1142 /* The callback for osc_punch that finalizes a request info when a response
1144 static int cb_update_punch(void *cookie, int rc)
1146 struct obd_info *oinfo = cookie;
1147 struct lov_request *lovreq;
1148 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1149 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1152 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1153 struct obd_trans_info *oti,
1154 struct lov_request_set **reqset)
1156 struct lov_request_set *set;
1157 struct lov_obd *lov = &exp->exp_obd->u.lov;
1160 OBD_ALLOC(set, sizeof(*set));
1165 set->set_oi = oinfo;
1168 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1169 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1170 struct lov_request *req;
1173 if (!lov_stripe_intersects(oinfo->oi_md, i,
1174 oinfo->oi_policy.l_extent.start,
1175 oinfo->oi_policy.l_extent.end,
1179 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1180 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1181 GOTO(out_set, rc = -EIO);
1184 OBD_ALLOC(req, sizeof(*req));
1186 GOTO(out_set, rc = -ENOMEM);
1188 req->rq_idx = loi->loi_ost_idx;
1190 OBDO_ALLOC(req->rq_oi.oi_oa);
1191 if (req->rq_oi.oi_oa == NULL) {
1192 OBD_FREE(req, sizeof(*req));
1193 GOTO(out_set, rc = -ENOMEM);
1195 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1196 sizeof(*req->rq_oi.oi_oa));
1197 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1198 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1200 req->rq_oi.oi_oa->o_stripe_idx = i;
1201 req->rq_oi.oi_cb_up = cb_update_punch;
1203 req->rq_oi.oi_policy.l_extent.start = rs;
1204 req->rq_oi.oi_policy.l_extent.end = re;
1205 req->rq_oi.oi_policy.l_extent.gid = -1;
1207 req->rq_oi.oi_capa = oinfo->oi_capa;
1209 lov_set_add_req(req, set);
1211 if (!set->set_count)
1212 GOTO(out_set, rc = -EIO);
1216 lov_fini_punch_set(set);
1220 int lov_fini_sync_set(struct lov_request_set *set)
1226 LASSERT(set->set_exp);
1227 if (atomic_read(&set->set_completes)) {
1228 if (!atomic_read(&set->set_success))
1230 /* FIXME update qos data here */
1233 lov_put_reqset(set);
1238 /* The callback for osc_sync that finalizes a request info when a
1239 * response is received. */
1240 static int cb_sync_update(void *cookie, int rc)
1242 struct obd_info *oinfo = cookie;
1243 struct lov_request *lovreq;
1245 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1246 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1249 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1250 obd_off start, obd_off end,
1251 struct lov_request_set **reqset)
1253 struct lov_request_set *set;
1254 struct lov_obd *lov = &exp->exp_obd->u.lov;
1263 set->set_oi = oinfo;
1265 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1266 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1267 struct lov_request *req;
1270 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1271 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1275 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1281 GOTO(out_set, rc = -ENOMEM);
1283 req->rq_idx = loi->loi_ost_idx;
1285 OBDO_ALLOC(req->rq_oi.oi_oa);
1286 if (req->rq_oi.oi_oa == NULL) {
1287 OBD_FREE(req, sizeof(*req));
1288 GOTO(out_set, rc = -ENOMEM);
1290 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1291 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1292 req->rq_oi.oi_oa->o_stripe_idx = i;
1294 req->rq_oi.oi_policy.l_extent.start = rs;
1295 req->rq_oi.oi_policy.l_extent.end = re;
1296 req->rq_oi.oi_policy.l_extent.gid = -1;
1297 req->rq_oi.oi_cb_up = cb_sync_update;
1299 lov_set_add_req(req, set);
1301 if (!set->set_count)
1302 GOTO(out_set, rc = -EIO);
1306 lov_fini_sync_set(set);
1310 #define LOV_U64_MAX ((__u64)~0ULL)
1311 #define LOV_SUM_MAX(tot, add) \
1313 if ((tot) + (add) < (tot)) \
1314 (tot) = LOV_U64_MAX; \
1319 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1322 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1324 if (osfs->os_files != LOV_U64_MAX)
1325 lov_do_div64(osfs->os_files, expected_stripes);
1326 if (osfs->os_ffree != LOV_U64_MAX)
1327 lov_do_div64(osfs->os_ffree, expected_stripes);
1329 spin_lock(&obd->obd_osfs_lock);
1330 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1331 obd->obd_osfs_age = cfs_time_current_64();
1332 spin_unlock(&obd->obd_osfs_lock);
1339 int lov_fini_statfs_set(struct lov_request_set *set)
1346 if (atomic_read(&set->set_completes)) {
1347 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1348 atomic_read(&set->set_success));
1350 lov_put_reqset(set);
1354 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1357 int shift = 0, quit = 0;
1361 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1363 if (osfs->os_bsize != lov_sfs->os_bsize) {
1364 /* assume all block sizes are always powers of 2 */
1365 /* get the bits difference */
1366 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1367 for (shift = 0; shift <= 64; ++shift) {
1379 if (osfs->os_bsize < lov_sfs->os_bsize) {
1380 osfs->os_bsize = lov_sfs->os_bsize;
1382 osfs->os_bfree >>= shift;
1383 osfs->os_bavail >>= shift;
1384 osfs->os_blocks >>= shift;
1385 } else if (shift != 0) {
1386 lov_sfs->os_bfree >>= shift;
1387 lov_sfs->os_bavail >>= shift;
1388 lov_sfs->os_blocks >>= shift;
1390 osfs->os_bfree += lov_sfs->os_bfree;
1391 osfs->os_bavail += lov_sfs->os_bavail;
1392 osfs->os_blocks += lov_sfs->os_blocks;
1393 /* XXX not sure about this one - depends on policy.
1394 * - could be minimum if we always stripe on all OBDs
1395 * (but that would be wrong for any other policy,
1396 * if one of the OBDs has no more objects left)
1397 * - could be sum if we stripe whole objects
1398 * - could be average, just to give a nice number
1400 * To give a "reasonable" (if not wholly accurate)
1401 * number, we divide the total number of free objects
1402 * by expected stripe count (watch out for overflow).
1404 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1405 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1409 /* The callback for osc_statfs_async that finalizes a request info when a
1410 * response is received. */
1411 static int cb_statfs_update(void *cookie, int rc)
1413 struct obd_info *oinfo = cookie;
1414 struct lov_request *lovreq;
1415 struct lov_request_set *set;
1416 struct obd_statfs *osfs, *lov_sfs;
1417 struct lov_obd *lov;
1418 struct lov_tgt_desc *tgt;
1419 struct obd_device *lovobd, *tgtobd;
1422 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1423 set = lovreq->rq_rqset;
1424 lovobd = set->set_obd;
1425 lov = &lovobd->u.lov;
1426 osfs = set->set_oi->oi_osfs;
1427 lov_sfs = oinfo->oi_osfs;
1428 success = atomic_read(&set->set_success);
1429 /* XXX: the same is done in lov_update_common_set, however
1430 lovset->set_exp is not initialized. */
1431 lov_update_set(set, lovreq, rc);
1436 tgt = lov->lov_tgts[lovreq->rq_idx];
1437 if (!tgt || !tgt->ltd_active)
1438 GOTO(out_update, rc);
1440 tgtobd = class_exp2obd(tgt->ltd_exp);
1441 spin_lock(&tgtobd->obd_osfs_lock);
1442 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1443 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1444 tgtobd->obd_osfs_age = cfs_time_current_64();
1445 spin_unlock(&tgtobd->obd_osfs_lock);
1448 lov_update_statfs(osfs, lov_sfs, success);
1452 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1453 lov_set_finished(set, 0)) {
1454 lov_statfs_interpret(NULL, set, set->set_count !=
1455 atomic_read(&set->set_success));
1461 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1462 struct lov_request_set **reqset)
1464 struct lov_request_set *set;
1465 struct lov_obd *lov = &obd->u.lov;
1468 OBD_ALLOC(set, sizeof(*set));
1474 set->set_oi = oinfo;
1476 /* We only get block data from the OBD */
1477 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1478 struct lov_request *req;
1480 if (lov->lov_tgts[i] == NULL ||
1481 (!lov_check_and_wait_active(lov, i) &&
1482 (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1483 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1487 /* skip targets that have been explicitly disabled by the
1489 if (!lov->lov_tgts[i]->ltd_exp) {
1490 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1494 OBD_ALLOC(req, sizeof(*req));
1496 GOTO(out_set, rc = -ENOMEM);
1498 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1499 if (req->rq_oi.oi_osfs == NULL) {
1500 OBD_FREE(req, sizeof(*req));
1501 GOTO(out_set, rc = -ENOMEM);
1505 req->rq_oi.oi_cb_up = cb_statfs_update;
1506 req->rq_oi.oi_flags = oinfo->oi_flags;
1508 lov_set_add_req(req, set);
1510 if (!set->set_count)
1511 GOTO(out_set, rc = -EIO);
1515 lov_fini_statfs_set(set);