Merge tag 'for-v3.13/clock-fixes-a' of git://git.kernel.org/pub/scm/linux/kernel...
[cascardo/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <obd_lov.h>
48
49 #ifdef  __CYGWIN__
50 # include <ctype.h>
51 #endif
52
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
61
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64                          struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72
73         lmm_size = sizeof(**lmmp);
74         if (lmmp == NULL)
75                 return lmm_size;
76
77         if (*lmmp != NULL && lsm == NULL) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 return 0;
81         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
82                 return -EBADF;
83         }
84
85         if (*lmmp == NULL) {
86                 OBD_ALLOC(*lmmp, lmm_size);
87                 if (*lmmp == NULL)
88                         return -ENOMEM;
89         }
90
91         if (lsm)
92                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
93
94         return lmm_size;
95 }
96
97 /* Unpack OSC object metadata from disk storage (LE byte order). */
98 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
99                         struct lov_mds_md *lmm, int lmm_bytes)
100 {
101         int lsm_size;
102         struct obd_import *imp = class_exp2cliimp(exp);
103
104         if (lmm != NULL) {
105                 if (lmm_bytes < sizeof(*lmm)) {
106                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
107                                exp->exp_obd->obd_name, lmm_bytes,
108                                (int)sizeof(*lmm));
109                         return -EINVAL;
110                 }
111                 /* XXX LOV_MAGIC etc check? */
112
113                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
114                         CERROR("%s: zero lmm_object_id: rc = %d\n",
115                                exp->exp_obd->obd_name, -EINVAL);
116                         return -EINVAL;
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 return lsm_size;
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 return 0;
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (unlikely(*lsmp == NULL))
134                         return -ENOMEM;
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         return -ENOMEM;
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
142                 return -EBADF;
143         }
144
145         if (lmm != NULL)
146                 /* XXX zero *lsmp? */
147                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
148
149         if (imp != NULL &&
150             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
151                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
152         else
153                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
154
155         return lsm_size;
156 }
157
158 static inline void osc_pack_capa(struct ptlrpc_request *req,
159                                  struct ost_body *body, void *capa)
160 {
161         struct obd_capa *oc = (struct obd_capa *)capa;
162         struct lustre_capa *c;
163
164         if (!capa)
165                 return;
166
167         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168         LASSERT(c);
169         capa_cpy(c, oc);
170         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
171         DEBUG_CAPA(D_SEC, c, "pack");
172 }
173
174 static inline void osc_pack_req_body(struct ptlrpc_request *req,
175                                      struct obd_info *oinfo)
176 {
177         struct ost_body *body;
178
179         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
180         LASSERT(body);
181
182         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
183                              oinfo->oi_oa);
184         osc_pack_capa(req, body, oinfo->oi_capa);
185 }
186
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188                                      const struct req_msg_field *field,
189                                      struct obd_capa *oc)
190 {
191         if (oc == NULL)
192                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
193         else
194                 /* it is already calculated as sizeof struct obd_capa */
195                 ;
196 }
197
198 static int osc_getattr_interpret(const struct lu_env *env,
199                                  struct ptlrpc_request *req,
200                                  struct osc_async_args *aa, int rc)
201 {
202         struct ost_body *body;
203
204         if (rc != 0)
205                 GOTO(out, rc);
206
207         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
208         if (body) {
209                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
210                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
211                                      aa->aa_oi->oi_oa, &body->oa);
212
213                 /* This should really be sent by the OST */
214                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
215                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
216         } else {
217                 CDEBUG(D_INFO, "can't unpack ost_body\n");
218                 rc = -EPROTO;
219                 aa->aa_oi->oi_oa->o_valid = 0;
220         }
221 out:
222         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223         return rc;
224 }
225
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227                              struct ptlrpc_request_set *set)
228 {
229         struct ptlrpc_request *req;
230         struct osc_async_args *aa;
231         int                 rc;
232
233         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
234         if (req == NULL)
235                 return -ENOMEM;
236
237         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
238         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
239         if (rc) {
240                 ptlrpc_request_free(req);
241                 return rc;
242         }
243
244         osc_pack_req_body(req, oinfo);
245
246         ptlrpc_request_set_replen(req);
247         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
248
249         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
250         aa = ptlrpc_req_async_args(req);
251         aa->aa_oi = oinfo;
252
253         ptlrpc_set_add_req(set, req);
254         return 0;
255 }
256
257 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
258                        struct obd_info *oinfo)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body       *body;
262         int                 rc;
263
264         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
265         if (req == NULL)
266                 return -ENOMEM;
267
268         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
269         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
270         if (rc) {
271                 ptlrpc_request_free(req);
272                 return rc;
273         }
274
275         osc_pack_req_body(req, oinfo);
276
277         ptlrpc_request_set_replen(req);
278
279         rc = ptlrpc_queue_wait(req);
280         if (rc)
281                 GOTO(out, rc);
282
283         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
284         if (body == NULL)
285                 GOTO(out, rc = -EPROTO);
286
287         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
288         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
289                              &body->oa);
290
291         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
292         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
293
294  out:
295         ptlrpc_req_finished(req);
296         return rc;
297 }
298
299 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
300                        struct obd_info *oinfo, struct obd_trans_info *oti)
301 {
302         struct ptlrpc_request *req;
303         struct ost_body       *body;
304         int                 rc;
305
306         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
309         if (req == NULL)
310                 return -ENOMEM;
311
312         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
313         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
314         if (rc) {
315                 ptlrpc_request_free(req);
316                 return rc;
317         }
318
319         osc_pack_req_body(req, oinfo);
320
321         ptlrpc_request_set_replen(req);
322
323         rc = ptlrpc_queue_wait(req);
324         if (rc)
325                 GOTO(out, rc);
326
327         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
328         if (body == NULL)
329                 GOTO(out, rc = -EPROTO);
330
331         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
332                              &body->oa);
333
334 out:
335         ptlrpc_req_finished(req);
336         return rc;
337 }
338
339 static int osc_setattr_interpret(const struct lu_env *env,
340                                  struct ptlrpc_request *req,
341                                  struct osc_setattr_args *sa, int rc)
342 {
343         struct ost_body *body;
344
345         if (rc != 0)
346                 GOTO(out, rc);
347
348         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
349         if (body == NULL)
350                 GOTO(out, rc = -EPROTO);
351
352         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
353                              &body->oa);
354 out:
355         rc = sa->sa_upcall(sa->sa_cookie, rc);
356         return rc;
357 }
358
359 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
360                            struct obd_trans_info *oti,
361                            obd_enqueue_update_f upcall, void *cookie,
362                            struct ptlrpc_request_set *rqset)
363 {
364         struct ptlrpc_request   *req;
365         struct osc_setattr_args *sa;
366         int                   rc;
367
368         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
369         if (req == NULL)
370                 return -ENOMEM;
371
372         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
373         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
374         if (rc) {
375                 ptlrpc_request_free(req);
376                 return rc;
377         }
378
379         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
380                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
381
382         osc_pack_req_body(req, oinfo);
383
384         ptlrpc_request_set_replen(req);
385
386         /* do mds to ost setattr asynchronously */
387         if (!rqset) {
388                 /* Do not wait for response. */
389                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
390         } else {
391                 req->rq_interpret_reply =
392                         (ptlrpc_interpterer_t)osc_setattr_interpret;
393
394                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
395                 sa = ptlrpc_req_async_args(req);
396                 sa->sa_oa = oinfo->oi_oa;
397                 sa->sa_upcall = upcall;
398                 sa->sa_cookie = cookie;
399
400                 if (rqset == PTLRPCD_SET)
401                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
402                 else
403                         ptlrpc_set_add_req(rqset, req);
404         }
405
406         return 0;
407 }
408
409 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
410                              struct obd_trans_info *oti,
411                              struct ptlrpc_request_set *rqset)
412 {
413         return osc_setattr_async_base(exp, oinfo, oti,
414                                       oinfo->oi_cb_up, oinfo, rqset);
415 }
416
417 int osc_real_create(struct obd_export *exp, struct obdo *oa,
418                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
419 {
420         struct ptlrpc_request *req;
421         struct ost_body       *body;
422         struct lov_stripe_md  *lsm;
423         int                 rc;
424
425         LASSERT(oa);
426         LASSERT(ea);
427
428         lsm = *ea;
429         if (!lsm) {
430                 rc = obd_alloc_memmd(exp, &lsm);
431                 if (rc < 0)
432                         return rc;
433         }
434
435         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436         if (req == NULL)
437                 GOTO(out, rc = -ENOMEM);
438
439         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440         if (rc) {
441                 ptlrpc_request_free(req);
442                 GOTO(out, rc);
443         }
444
445         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446         LASSERT(body);
447
448         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
449
450         ptlrpc_request_set_replen(req);
451
452         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
453             oa->o_flags == OBD_FL_DELORPHAN) {
454                 DEBUG_REQ(D_HA, req,
455                           "delorphan from OST integration");
456                 /* Don't resend the delorphan req */
457                 req->rq_no_resend = req->rq_no_delay = 1;
458         }
459
460         rc = ptlrpc_queue_wait(req);
461         if (rc)
462                 GOTO(out_req, rc);
463
464         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
465         if (body == NULL)
466                 GOTO(out_req, rc = -EPROTO);
467
468         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
469         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
470
471         oa->o_blksize = cli_brw_size(exp->exp_obd);
472         oa->o_valid |= OBD_MD_FLBLKSZ;
473
474         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
475          * have valid lsm_oinfo data structs, so don't go touching that.
476          * This needs to be fixed in a big way.
477          */
478         lsm->lsm_oi = oa->o_oi;
479         *ea = lsm;
480
481         if (oti != NULL) {
482                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483
484                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485                         if (!oti->oti_logcookies)
486                                 oti_alloc_cookies(oti, 1);
487                         *oti->oti_logcookies = oa->o_lcookie;
488                 }
489         }
490
491         CDEBUG(D_HA, "transno: "LPD64"\n",
492                lustre_msg_get_transno(req->rq_repmsg));
493 out_req:
494         ptlrpc_req_finished(req);
495 out:
496         if (rc && !*ea)
497                 obd_free_memmd(exp, &lsm);
498         return rc;
499 }
500
501 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
502                    obd_enqueue_update_f upcall, void *cookie,
503                    struct ptlrpc_request_set *rqset)
504 {
505         struct ptlrpc_request   *req;
506         struct osc_setattr_args *sa;
507         struct ost_body  *body;
508         int                   rc;
509
510         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
511         if (req == NULL)
512                 return -ENOMEM;
513
514         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
515         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
516         if (rc) {
517                 ptlrpc_request_free(req);
518                 return rc;
519         }
520         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
521         ptlrpc_at_set_req_timeout(req);
522
523         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
524         LASSERT(body);
525         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
526                              oinfo->oi_oa);
527         osc_pack_capa(req, body, oinfo->oi_capa);
528
529         ptlrpc_request_set_replen(req);
530
531         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
532         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
533         sa = ptlrpc_req_async_args(req);
534         sa->sa_oa     = oinfo->oi_oa;
535         sa->sa_upcall = upcall;
536         sa->sa_cookie = cookie;
537         if (rqset == PTLRPCD_SET)
538                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
539         else
540                 ptlrpc_set_add_req(rqset, req);
541
542         return 0;
543 }
544
545 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
546                      struct obd_info *oinfo, struct obd_trans_info *oti,
547                      struct ptlrpc_request_set *rqset)
548 {
549         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
550         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
551         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
552         return osc_punch_base(exp, oinfo,
553                               oinfo->oi_cb_up, oinfo, rqset);
554 }
555
556 static int osc_sync_interpret(const struct lu_env *env,
557                               struct ptlrpc_request *req,
558                               void *arg, int rc)
559 {
560         struct osc_fsync_args *fa = arg;
561         struct ost_body *body;
562
563         if (rc)
564                 GOTO(out, rc);
565
566         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
567         if (body == NULL) {
568                 CERROR ("can't unpack ost_body\n");
569                 GOTO(out, rc = -EPROTO);
570         }
571
572         *fa->fa_oi->oi_oa = body->oa;
573 out:
574         rc = fa->fa_upcall(fa->fa_cookie, rc);
575         return rc;
576 }
577
578 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
579                   obd_enqueue_update_f upcall, void *cookie,
580                   struct ptlrpc_request_set *rqset)
581 {
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         struct osc_fsync_args *fa;
585         int                 rc;
586
587         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
588         if (req == NULL)
589                 return -ENOMEM;
590
591         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
592         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
593         if (rc) {
594                 ptlrpc_request_free(req);
595                 return rc;
596         }
597
598         /* overload the size and blocks fields in the oa with start/end */
599         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
600         LASSERT(body);
601         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
602                              oinfo->oi_oa);
603         osc_pack_capa(req, body, oinfo->oi_capa);
604
605         ptlrpc_request_set_replen(req);
606         req->rq_interpret_reply = osc_sync_interpret;
607
608         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
609         fa = ptlrpc_req_async_args(req);
610         fa->fa_oi = oinfo;
611         fa->fa_upcall = upcall;
612         fa->fa_cookie = cookie;
613
614         if (rqset == PTLRPCD_SET)
615                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
616         else
617                 ptlrpc_set_add_req(rqset, req);
618
619         return 0;
620 }
621
622 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
623                     struct obd_info *oinfo, obd_size start, obd_size end,
624                     struct ptlrpc_request_set *set)
625 {
626         if (!oinfo->oi_oa) {
627                 CDEBUG(D_INFO, "oa NULL\n");
628                 return -EINVAL;
629         }
630
631         oinfo->oi_oa->o_size = start;
632         oinfo->oi_oa->o_blocks = end;
633         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
634
635         return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
636 }
637
638 /* Find and cancel locally locks matched by @mode in the resource found by
639  * @objid. Found locks are added into @cancel list. Returns the amount of
640  * locks added to @cancels list. */
641 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
642                                    struct list_head *cancels,
643                                    ldlm_mode_t mode, int lock_flags)
644 {
645         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
646         struct ldlm_res_id res_id;
647         struct ldlm_resource *res;
648         int count;
649
650         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
651          * export) but disabled through procfs (flag in NS).
652          *
653          * This distinguishes from a case when ELC is not supported originally,
654          * when we still want to cancel locks in advance and just cancel them
655          * locally, without sending any RPC. */
656         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
657                 return 0;
658
659         ostid_build_res_name(&oa->o_oi, &res_id);
660         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
661         if (res == NULL)
662                 return 0;
663
664         LDLM_RESOURCE_ADDREF(res);
665         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
666                                            lock_flags, 0, NULL);
667         LDLM_RESOURCE_DELREF(res);
668         ldlm_resource_putref(res);
669         return count;
670 }
671
672 static int osc_destroy_interpret(const struct lu_env *env,
673                                  struct ptlrpc_request *req, void *data,
674                                  int rc)
675 {
676         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
677
678         atomic_dec(&cli->cl_destroy_in_flight);
679         wake_up(&cli->cl_destroy_waitq);
680         return 0;
681 }
682
683 static int osc_can_send_destroy(struct client_obd *cli)
684 {
685         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
686             cli->cl_max_rpcs_in_flight) {
687                 /* The destroy request can be sent */
688                 return 1;
689         }
690         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
691             cli->cl_max_rpcs_in_flight) {
692                 /*
693                  * The counter has been modified between the two atomic
694                  * operations.
695                  */
696                 wake_up(&cli->cl_destroy_waitq);
697         }
698         return 0;
699 }
700
701 int osc_create(const struct lu_env *env, struct obd_export *exp,
702                struct obdo *oa, struct lov_stripe_md **ea,
703                struct obd_trans_info *oti)
704 {
705         int rc = 0;
706
707         LASSERT(oa);
708         LASSERT(ea);
709         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
710
711         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
712             oa->o_flags == OBD_FL_RECREATE_OBJS) {
713                 return osc_real_create(exp, oa, ea, oti);
714         }
715
716         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
717                 return osc_real_create(exp, oa, ea, oti);
718
719         /* we should not get here anymore */
720         LBUG();
721
722         return rc;
723 }
724
725 /* Destroy requests can be async always on the client, and we don't even really
726  * care about the return code since the client cannot do anything at all about
727  * a destroy failure.
728  * When the MDS is unlinking a filename, it saves the file objects into a
729  * recovery llog, and these object records are cancelled when the OST reports
730  * they were destroyed and sync'd to disk (i.e. transaction committed).
731  * If the client dies, or the OST is down when the object should be destroyed,
732  * the records are not cancelled, and when the OST reconnects to the MDS next,
733  * it will retrieve the llog unlink logs and then sends the log cancellation
734  * cookies to the MDS after committing destroy transactions. */
735 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
736                        struct obdo *oa, struct lov_stripe_md *ea,
737                        struct obd_trans_info *oti, struct obd_export *md_export,
738                        void *capa)
739 {
740         struct client_obd     *cli = &exp->exp_obd->u.cli;
741         struct ptlrpc_request *req;
742         struct ost_body       *body;
743         LIST_HEAD(cancels);
744         int rc, count;
745
746         if (!oa) {
747                 CDEBUG(D_INFO, "oa NULL\n");
748                 return -EINVAL;
749         }
750
751         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
752                                         LDLM_FL_DISCARD_DATA);
753
754         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
755         if (req == NULL) {
756                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
757                 return -ENOMEM;
758         }
759
760         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
761         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
762                                0, &cancels, count);
763         if (rc) {
764                 ptlrpc_request_free(req);
765                 return rc;
766         }
767
768         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
769         ptlrpc_at_set_req_timeout(req);
770
771         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
772                 oa->o_lcookie = *oti->oti_logcookies;
773         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
774         LASSERT(body);
775         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
776
777         osc_pack_capa(req, body, (struct obd_capa *)capa);
778         ptlrpc_request_set_replen(req);
779
780         /* If osc_destory is for destroying the unlink orphan,
781          * sent from MDT to OST, which should not be blocked here,
782          * because the process might be triggered by ptlrpcd, and
783          * it is not good to block ptlrpcd thread (b=16006)*/
784         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
785                 req->rq_interpret_reply = osc_destroy_interpret;
786                 if (!osc_can_send_destroy(cli)) {
787                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
788                                                           NULL);
789
790                         /*
791                          * Wait until the number of on-going destroy RPCs drops
792                          * under max_rpc_in_flight
793                          */
794                         l_wait_event_exclusive(cli->cl_destroy_waitq,
795                                                osc_can_send_destroy(cli), &lwi);
796                 }
797         }
798
799         /* Do not wait for response */
800         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
801         return 0;
802 }
803
804 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
805                                 long writing_bytes)
806 {
807         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
808
809         LASSERT(!(oa->o_valid & bits));
810
811         oa->o_valid |= bits;
812         client_obd_list_lock(&cli->cl_loi_list_lock);
813         oa->o_dirty = cli->cl_dirty;
814         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
815                      cli->cl_dirty_max)) {
816                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
817                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
818                 oa->o_undirty = 0;
819         } else if (unlikely(atomic_read(&obd_dirty_pages) -
820                             atomic_read(&obd_dirty_transit_pages) >
821                             (long)(obd_max_dirty_pages + 1))) {
822                 /* The atomic_read() allowing the atomic_inc() are
823                  * not covered by a lock thus they may safely race and trip
824                  * this CERROR() unless we add in a small fudge factor (+1). */
825                 CERROR("dirty %d - %d > system dirty_max %d\n",
826                        atomic_read(&obd_dirty_pages),
827                        atomic_read(&obd_dirty_transit_pages),
828                        obd_max_dirty_pages);
829                 oa->o_undirty = 0;
830         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
831                 CERROR("dirty %lu - dirty_max %lu too big???\n",
832                        cli->cl_dirty, cli->cl_dirty_max);
833                 oa->o_undirty = 0;
834         } else {
835                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
836                                       PAGE_CACHE_SHIFT)*
837                                      (cli->cl_max_rpcs_in_flight + 1);
838                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
839         }
840         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
841         oa->o_dropped = cli->cl_lost_grant;
842         cli->cl_lost_grant = 0;
843         client_obd_list_unlock(&cli->cl_loi_list_lock);
844         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
845                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
846
847 }
848
849 void osc_update_next_shrink(struct client_obd *cli)
850 {
851         cli->cl_next_shrink_grant =
852                 cfs_time_shift(cli->cl_grant_shrink_interval);
853         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
854                cli->cl_next_shrink_grant);
855 }
856
857 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
858 {
859         client_obd_list_lock(&cli->cl_loi_list_lock);
860         cli->cl_avail_grant += grant;
861         client_obd_list_unlock(&cli->cl_loi_list_lock);
862 }
863
864 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
865 {
866         if (body->oa.o_valid & OBD_MD_FLGRANT) {
867                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
868                 __osc_update_grant(cli, body->oa.o_grant);
869         }
870 }
871
872 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
873                               obd_count keylen, void *key, obd_count vallen,
874                               void *val, struct ptlrpc_request_set *set);
875
876 static int osc_shrink_grant_interpret(const struct lu_env *env,
877                                       struct ptlrpc_request *req,
878                                       void *aa, int rc)
879 {
880         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
881         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
882         struct ost_body *body;
883
884         if (rc != 0) {
885                 __osc_update_grant(cli, oa->o_grant);
886                 GOTO(out, rc);
887         }
888
889         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
890         LASSERT(body);
891         osc_update_grant(cli, body);
892 out:
893         OBDO_FREE(oa);
894         return rc;
895 }
896
897 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
898 {
899         client_obd_list_lock(&cli->cl_loi_list_lock);
900         oa->o_grant = cli->cl_avail_grant / 4;
901         cli->cl_avail_grant -= oa->o_grant;
902         client_obd_list_unlock(&cli->cl_loi_list_lock);
903         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
904                 oa->o_valid |= OBD_MD_FLFLAGS;
905                 oa->o_flags = 0;
906         }
907         oa->o_flags |= OBD_FL_SHRINK_GRANT;
908         osc_update_next_shrink(cli);
909 }
910
911 /* Shrink the current grant, either from some large amount to enough for a
912  * full set of in-flight RPCs, or if we have already shrunk to that limit
913  * then to enough for a single RPC.  This avoids keeping more grant than
914  * needed, and avoids shrinking the grant piecemeal. */
915 static int osc_shrink_grant(struct client_obd *cli)
916 {
917         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
918                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
919
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         if (cli->cl_avail_grant <= target_bytes)
922                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         return osc_shrink_grant_to_target(cli, target_bytes);
926 }
927
928 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
929 {
930         int                     rc = 0;
931         struct ost_body *body;
932
933         client_obd_list_lock(&cli->cl_loi_list_lock);
934         /* Don't shrink if we are already above or below the desired limit
935          * We don't want to shrink below a single RPC, as that will negatively
936          * impact block allocation and long-term performance. */
937         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
938                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
939
940         if (target_bytes >= cli->cl_avail_grant) {
941                 client_obd_list_unlock(&cli->cl_loi_list_lock);
942                 return 0;
943         }
944         client_obd_list_unlock(&cli->cl_loi_list_lock);
945
946         OBD_ALLOC_PTR(body);
947         if (!body)
948                 return -ENOMEM;
949
950         osc_announce_cached(cli, &body->oa, 0);
951
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
954         cli->cl_avail_grant = target_bytes;
955         client_obd_list_unlock(&cli->cl_loi_list_lock);
956         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
957                 body->oa.o_valid |= OBD_MD_FLFLAGS;
958                 body->oa.o_flags = 0;
959         }
960         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
961         osc_update_next_shrink(cli);
962
963         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
964                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
965                                 sizeof(*body), body, NULL);
966         if (rc != 0)
967                 __osc_update_grant(cli, body->oa.o_grant);
968         OBD_FREE_PTR(body);
969         return rc;
970 }
971
972 static int osc_should_shrink_grant(struct client_obd *client)
973 {
974         cfs_time_t time = cfs_time_current();
975         cfs_time_t next_shrink = client->cl_next_shrink_grant;
976
977         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
978              OBD_CONNECT_GRANT_SHRINK) == 0)
979                 return 0;
980
981         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
982                 /* Get the current RPC size directly, instead of going via:
983                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
984                  * Keep comment here so that it can be found by searching. */
985                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
986
987                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
988                     client->cl_avail_grant > brw_size)
989                         return 1;
990                 else
991                         osc_update_next_shrink(client);
992         }
993         return 0;
994 }
995
996 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
997 {
998         struct client_obd *client;
999
1000         list_for_each_entry(client, &item->ti_obd_list,
1001                                 cl_grant_shrink_list) {
1002                 if (osc_should_shrink_grant(client))
1003                         osc_shrink_grant(client);
1004         }
1005         return 0;
1006 }
1007
1008 static int osc_add_shrink_grant(struct client_obd *client)
1009 {
1010         int rc;
1011
1012         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1013                                        TIMEOUT_GRANT,
1014                                        osc_grant_shrink_grant_cb, NULL,
1015                                        &client->cl_grant_shrink_list);
1016         if (rc) {
1017                 CERROR("add grant client %s error %d\n",
1018                         client->cl_import->imp_obd->obd_name, rc);
1019                 return rc;
1020         }
1021         CDEBUG(D_CACHE, "add grant client %s \n",
1022                client->cl_import->imp_obd->obd_name);
1023         osc_update_next_shrink(client);
1024         return 0;
1025 }
1026
1027 static int osc_del_shrink_grant(struct client_obd *client)
1028 {
1029         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1030                                          TIMEOUT_GRANT);
1031 }
1032
1033 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1034 {
1035         /*
1036          * ocd_grant is the total grant amount we're expect to hold: if we've
1037          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1038          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1039          *
1040          * race is tolerable here: if we're evicted, but imp_state already
1041          * left EVICTED state, then cl_dirty must be 0 already.
1042          */
1043         client_obd_list_lock(&cli->cl_loi_list_lock);
1044         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1045                 cli->cl_avail_grant = ocd->ocd_grant;
1046         else
1047                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1048
1049         if (cli->cl_avail_grant < 0) {
1050                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052                       ocd->ocd_grant, cli->cl_dirty);
1053                 /* workaround for servers which do not have the patch from
1054                  * LU-2679 */
1055                 cli->cl_avail_grant = ocd->ocd_grant;
1056         }
1057
1058         /* determine the appropriate chunk size used by osc_extent. */
1059         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060         client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067             list_empty(&cli->cl_grant_shrink_list))
1068                 osc_add_shrink_grant(cli);
1069 }
1070
1071 /* We assume that the reason this OSC got a short read is because it read
1072  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1073  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1074  * this stripe never got written at or beyond this stripe offset yet. */
1075 static void handle_short_read(int nob_read, obd_count page_count,
1076                               struct brw_page **pga)
1077 {
1078         char *ptr;
1079         int i = 0;
1080
1081         /* skip bytes read OK */
1082         while (nob_read > 0) {
1083                 LASSERT (page_count > 0);
1084
1085                 if (pga[i]->count > nob_read) {
1086                         /* EOF inside this page */
1087                         ptr = kmap(pga[i]->pg) +
1088                                 (pga[i]->off & ~CFS_PAGE_MASK);
1089                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090                         kunmap(pga[i]->pg);
1091                         page_count--;
1092                         i++;
1093                         break;
1094                 }
1095
1096                 nob_read -= pga[i]->count;
1097                 page_count--;
1098                 i++;
1099         }
1100
1101         /* zero remaining pages */
1102         while (page_count-- > 0) {
1103                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104                 memset(ptr, 0, pga[i]->count);
1105                 kunmap(pga[i]->pg);
1106                 i++;
1107         }
1108 }
1109
1110 static int check_write_rcs(struct ptlrpc_request *req,
1111                            int requested_nob, int niocount,
1112                            obd_count page_count, struct brw_page **pga)
1113 {
1114         int     i;
1115         __u32   *remote_rcs;
1116
1117         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118                                                   sizeof(*remote_rcs) *
1119                                                   niocount);
1120         if (remote_rcs == NULL) {
1121                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122                 return(-EPROTO);
1123         }
1124
1125         /* return error if any niobuf was in error */
1126         for (i = 0; i < niocount; i++) {
1127                 if ((int)remote_rcs[i] < 0)
1128                         return(remote_rcs[i]);
1129
1130                 if (remote_rcs[i] != 0) {
1131                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132                                 i, remote_rcs[i], req);
1133                         return(-EPROTO);
1134                 }
1135         }
1136
1137         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139                        req->rq_bulk->bd_nob_transferred, requested_nob);
1140                 return(-EPROTO);
1141         }
1142
1143         return (0);
1144 }
1145
1146 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147 {
1148         if (p1->flag != p2->flag) {
1149                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1150                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1151
1152                 /* warn if we try to combine flags that we don't know to be
1153                  * safe to combine */
1154                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1155                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1156                               "report this at http://bugs.whamcloud.com/\n",
1157                               p1->flag, p2->flag);
1158                 }
1159                 return 0;
1160         }
1161
1162         return (p1->off + p1->count == p2->off);
1163 }
1164
1165 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1166                                    struct brw_page **pga, int opc,
1167                                    cksum_type_t cksum_type)
1168 {
1169         __u32                           cksum;
1170         int                             i = 0;
1171         struct cfs_crypto_hash_desc     *hdesc;
1172         unsigned int                    bufsize;
1173         int                             err;
1174         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1175
1176         LASSERT(pg_count > 0);
1177
1178         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1179         if (IS_ERR(hdesc)) {
1180                 CERROR("Unable to initialize checksum hash %s\n",
1181                        cfs_crypto_hash_name(cfs_alg));
1182                 return PTR_ERR(hdesc);
1183         }
1184
1185         while (nob > 0 && pg_count > 0) {
1186                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1187
1188                 /* corrupt the data before we compute the checksum, to
1189                  * simulate an OST->client data error */
1190                 if (i == 0 && opc == OST_READ &&
1191                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1192                         unsigned char *ptr = kmap(pga[i]->pg);
1193                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1194                         memcpy(ptr + off, "bad1", min(4, nob));
1195                         kunmap(pga[i]->pg);
1196                 }
1197                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1198                                   pga[i]->off & ~CFS_PAGE_MASK,
1199                                   count);
1200                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1201                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1202
1203                 nob -= pga[i]->count;
1204                 pg_count--;
1205                 i++;
1206         }
1207
1208         bufsize = 4;
1209         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1210
1211         if (err)
1212                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1213
1214         /* For sending we only compute the wrong checksum instead
1215          * of corrupting the data so it is still correct on a redo */
1216         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1217                 cksum++;
1218
1219         return cksum;
1220 }
1221
1222 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1223                                 struct lov_stripe_md *lsm, obd_count page_count,
1224                                 struct brw_page **pga,
1225                                 struct ptlrpc_request **reqp,
1226                                 struct obd_capa *ocapa, int reserve,
1227                                 int resend)
1228 {
1229         struct ptlrpc_request   *req;
1230         struct ptlrpc_bulk_desc *desc;
1231         struct ost_body  *body;
1232         struct obd_ioobj        *ioobj;
1233         struct niobuf_remote    *niobuf;
1234         int niocount, i, requested_nob, opc, rc;
1235         struct osc_brw_async_args *aa;
1236         struct req_capsule      *pill;
1237         struct brw_page *pg_prev;
1238
1239         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240                 return -ENOMEM; /* Recoverable */
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242                 return -EINVAL; /* Fatal */
1243
1244         if ((cmd & OBD_BRW_WRITE) != 0) {
1245                 opc = OST_WRITE;
1246                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1247                                                 cli->cl_import->imp_rq_pool,
1248                                                 &RQF_OST_BRW_WRITE);
1249         } else {
1250                 opc = OST_READ;
1251                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252         }
1253         if (req == NULL)
1254                 return -ENOMEM;
1255
1256         for (niocount = i = 1; i < page_count; i++) {
1257                 if (!can_merge_pages(pga[i - 1], pga[i]))
1258                         niocount++;
1259         }
1260
1261         pill = &req->rq_pill;
1262         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263                              sizeof(*ioobj));
1264         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265                              niocount * sizeof(*niobuf));
1266         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269         if (rc) {
1270                 ptlrpc_request_free(req);
1271                 return rc;
1272         }
1273         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1274         ptlrpc_at_set_req_timeout(req);
1275         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1276          * retry logic */
1277         req->rq_no_retry_einprogress = 1;
1278
1279         desc = ptlrpc_prep_bulk_imp(req, page_count,
1280                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282                 OST_BULK_PORTAL);
1283
1284         if (desc == NULL)
1285                 GOTO(out, rc = -ENOMEM);
1286         /* NB request now owns desc and will free it when it gets freed */
1287
1288         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295         obdo_to_ioobj(oa, ioobj);
1296         ioobj->ioo_bufcnt = niocount;
1297         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1298          * that might be send for this request.  The actual number is decided
1299          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1300          * "max - 1" for old client compatibility sending "0", and also so the
1301          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1302         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT(page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308                 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                 LASSERT(pg->count > 0);
1311                 /* make sure there is no gap in the middle of page array */
1312                 LASSERTF(page_count == 1 ||
1313                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314                           ergo(i > 0 && i < page_count - 1,
1315                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1316                           ergo(i == page_count - 1, poff == 0)),
1317                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                          i, page_count, pg, pg->off, pg->count);
1319                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1320                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322                          i, page_count,
1323                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324                          pg_prev->pg, page_private(pg_prev->pg),
1325                          pg_prev->pg->index, pg_prev->off);
1326                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327                         (pg->flag & OBD_BRW_SRVLOCK));
1328
1329                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330                 requested_nob += pg->count;
1331
1332                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                         niobuf--;
1334                         niobuf->len += pg->count;
1335                 } else {
1336                         niobuf->offset = pg->off;
1337                         niobuf->len    = pg->count;
1338                         niobuf->flags  = pg->flag;
1339                 }
1340                 pg_prev = pg;
1341         }
1342
1343         LASSERTF((void *)(niobuf - niocount) ==
1344                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349         if (resend) {
1350                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1352                         body->oa.o_flags = 0;
1353                 }
1354                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355         }
1356
1357         if (osc_should_shrink_grant(cli))
1358                 osc_shrink_grant_local(cli, &body->oa);
1359
1360         /* size[REQ_REC_OFF] still sizeof (*body) */
1361         if (opc == OST_WRITE) {
1362                 if (cli->cl_checksum &&
1363                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364                         /* store cl_cksum_type in a local variable since
1365                          * it can be changed via lprocfs */
1366                         cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1370                                 body->oa.o_flags = 0;
1371                         }
1372                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1373                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375                                                              page_count, pga,
1376                                                              OST_WRITE,
1377                                                              cksum_type);
1378                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379                                body->oa.o_cksum);
1380                         /* save this in 'oa', too, for later checking */
1381                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                         oa->o_flags |= cksum_type_pack(cksum_type);
1383                 } else {
1384                         /* clear out the checksum flag, in case this is a
1385                          * resend but cl_checksum is no longer set. b=11238 */
1386                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1387                 }
1388                 oa->o_cksum = body->oa.o_cksum;
1389                 /* 1 RC per niobuf */
1390                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391                                      sizeof(__u32) * niocount);
1392         } else {
1393                 if (cli->cl_checksum &&
1394                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396                                 body->oa.o_flags = 0;
1397                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                 }
1400         }
1401         ptlrpc_request_set_replen(req);
1402
1403         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404         aa = ptlrpc_req_async_args(req);
1405         aa->aa_oa = oa;
1406         aa->aa_requested_nob = requested_nob;
1407         aa->aa_nio_count = niocount;
1408         aa->aa_page_count = page_count;
1409         aa->aa_resends = 0;
1410         aa->aa_ppga = pga;
1411         aa->aa_cli = cli;
1412         INIT_LIST_HEAD(&aa->aa_oaps);
1413         if (ocapa && reserve)
1414                 aa->aa_ocapa = capa_get(ocapa);
1415
1416         *reqp = req;
1417         return 0;
1418
1419  out:
1420         ptlrpc_req_finished(req);
1421         return rc;
1422 }
1423
1424 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1425                                 __u32 client_cksum, __u32 server_cksum, int nob,
1426                                 obd_count page_count, struct brw_page **pga,
1427                                 cksum_type_t client_cksum_type)
1428 {
1429         __u32 new_cksum;
1430         char *msg;
1431         cksum_type_t cksum_type;
1432
1433         if (server_cksum == client_cksum) {
1434                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1435                 return 0;
1436         }
1437
1438         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1439                                        oa->o_flags : 0);
1440         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1441                                       cksum_type);
1442
1443         if (cksum_type != client_cksum_type)
1444                 msg = "the server did not use the checksum type specified in "
1445                       "the original request - likely a protocol problem";
1446         else if (new_cksum == server_cksum)
1447                 msg = "changed on the client after we checksummed it - "
1448                       "likely false positive due to mmap IO (bug 11742)";
1449         else if (new_cksum == client_cksum)
1450                 msg = "changed in transit before arrival at OST";
1451         else
1452                 msg = "changed in transit AND doesn't match the original - "
1453                       "likely false positive due to mmap IO (bug 11742)";
1454
1455         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1456                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1457                            msg, libcfs_nid2str(peer->nid),
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1460                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1461                            POSTID(&oa->o_oi), pga[0]->off,
1462                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464                "client csum now %x\n", client_cksum, client_cksum_type,
1465                server_cksum, cksum_type, new_cksum);
1466         return 1;
1467 }
1468
1469 /* Note rc enters this function as number of bytes transferred */
1470 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471 {
1472         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473         const lnet_process_id_t *peer =
1474                         &req->rq_import->imp_connection->c_peer;
1475         struct client_obd *cli = aa->aa_cli;
1476         struct ost_body *body;
1477         __u32 client_cksum = 0;
1478
1479         if (rc < 0 && rc != -EDQUOT) {
1480                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1481                 return rc;
1482         }
1483
1484         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1485         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1486         if (body == NULL) {
1487                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1488                 return -EPROTO;
1489         }
1490
1491         /* set/clear over quota flag for a uid/gid */
1492         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1493             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1494                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1495
1496                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1497                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1498                        body->oa.o_flags);
1499                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1500         }
1501
1502         osc_update_grant(cli, body);
1503
1504         if (rc < 0)
1505                 return rc;
1506
1507         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1508                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1509
1510         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1511                 if (rc > 0) {
1512                         CERROR("Unexpected +ve rc %d\n", rc);
1513                         return -EPROTO;
1514                 }
1515                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1516
1517                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1518                         return -EAGAIN;
1519
1520                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1521                     check_write_checksum(&body->oa, peer, client_cksum,
1522                                          body->oa.o_cksum, aa->aa_requested_nob,
1523                                          aa->aa_page_count, aa->aa_ppga,
1524                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1525                         return -EAGAIN;
1526
1527                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1528                                      aa->aa_page_count, aa->aa_ppga);
1529                 GOTO(out, rc);
1530         }
1531
1532         /* The rest of this function executes only for OST_READs */
1533
1534         /* if unwrap_bulk failed, return -EAGAIN to retry */
1535         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1536         if (rc < 0)
1537                 GOTO(out, rc = -EAGAIN);
1538
1539         if (rc > aa->aa_requested_nob) {
1540                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1541                        aa->aa_requested_nob);
1542                 return -EPROTO;
1543         }
1544
1545         if (rc != req->rq_bulk->bd_nob_transferred) {
1546                 CERROR ("Unexpected rc %d (%d transferred)\n",
1547                         rc, req->rq_bulk->bd_nob_transferred);
1548                 return (-EPROTO);
1549         }
1550
1551         if (rc < aa->aa_requested_nob)
1552                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1553
1554         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1555                 static int cksum_counter;
1556                 __u32      server_cksum = body->oa.o_cksum;
1557                 char      *via;
1558                 char      *router;
1559                 cksum_type_t cksum_type;
1560
1561                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1562                                                body->oa.o_flags : 0);
1563                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1564                                                  aa->aa_ppga, OST_READ,
1565                                                  cksum_type);
1566
1567                 if (peer->nid == req->rq_bulk->bd_sender) {
1568                         via = router = "";
1569                 } else {
1570                         via = " via ";
1571                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1572                 }
1573
1574                 if (server_cksum == ~0 && rc > 0) {
1575                         CERROR("Protocol error: server %s set the 'checksum' "
1576                                "bit, but didn't send a checksum.  Not fatal, "
1577                                "but please notify on http://bugs.whamcloud.com/\n",
1578                                libcfs_nid2str(peer->nid));
1579                 } else if (server_cksum != client_cksum) {
1580                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1581                                            "%s%s%s inode "DFID" object "DOSTID
1582                                            " extent ["LPU64"-"LPU64"]\n",
1583                                            req->rq_import->imp_obd->obd_name,
1584                                            libcfs_nid2str(peer->nid),
1585                                            via, router,
1586                                            body->oa.o_valid & OBD_MD_FLFID ?
1587                                                 body->oa.o_parent_seq : (__u64)0,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_parent_oid : 0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_ver : 0,
1592                                            POSTID(&body->oa.o_oi),
1593                                            aa->aa_ppga[0]->off,
1594                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1595                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1596                                                                         1);
1597                         CERROR("client %x, server %x, cksum_type %x\n",
1598                                client_cksum, server_cksum, cksum_type);
1599                         cksum_counter = 0;
1600                         aa->aa_oa->o_cksum = client_cksum;
1601                         rc = -EAGAIN;
1602                 } else {
1603                         cksum_counter++;
1604                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605                         rc = 0;
1606                 }
1607         } else if (unlikely(client_cksum)) {
1608                 static int cksum_missed;
1609
1610                 cksum_missed++;
1611                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612                         CERROR("Checksum %u requested from %s but not sent\n",
1613                                cksum_missed, libcfs_nid2str(peer->nid));
1614         } else {
1615                 rc = 0;
1616         }
1617 out:
1618         if (rc >= 0)
1619                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620                                      aa->aa_oa, &body->oa);
1621
1622         return rc;
1623 }
1624
1625 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1626                             struct lov_stripe_md *lsm,
1627                             obd_count page_count, struct brw_page **pga,
1628                             struct obd_capa *ocapa)
1629 {
1630         struct ptlrpc_request *req;
1631         int                 rc;
1632         wait_queue_head_t           waitq;
1633         int                 generation, resends = 0;
1634         struct l_wait_info     lwi;
1635
1636         init_waitqueue_head(&waitq);
1637         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1638
1639 restart_bulk:
1640         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1641                                   page_count, pga, &req, ocapa, 0, resends);
1642         if (rc != 0)
1643                 return (rc);
1644
1645         if (resends) {
1646                 req->rq_generation_set = 1;
1647                 req->rq_import_generation = generation;
1648                 req->rq_sent = cfs_time_current_sec() + resends;
1649         }
1650
1651         rc = ptlrpc_queue_wait(req);
1652
1653         if (rc == -ETIMEDOUT && req->rq_resend) {
1654                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1655                 ptlrpc_req_finished(req);
1656                 goto restart_bulk;
1657         }
1658
1659         rc = osc_brw_fini_request(req, rc);
1660
1661         ptlrpc_req_finished(req);
1662         /* When server return -EINPROGRESS, client should always retry
1663          * regardless of the number of times the bulk was resent already.*/
1664         if (osc_recoverable_error(rc)) {
1665                 resends++;
1666                 if (rc != -EINPROGRESS &&
1667                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1668                         CERROR("%s: too many resend retries for object: "
1669                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1670                                POSTID(&oa->o_oi), rc);
1671                         goto out;
1672                 }
1673                 if (generation !=
1674                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1675                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1676                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1677                                POSTID(&oa->o_oi), rc);
1678                         goto out;
1679                 }
1680
1681                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1682                                        NULL);
1683                 l_wait_event(waitq, 0, &lwi);
1684
1685                 goto restart_bulk;
1686         }
1687 out:
1688         if (rc == -EAGAIN || rc == -EINPROGRESS)
1689                 rc = -EIO;
1690         return rc;
1691 }
1692
1693 static int osc_brw_redo_request(struct ptlrpc_request *request,
1694                                 struct osc_brw_async_args *aa, int rc)
1695 {
1696         struct ptlrpc_request *new_req;
1697         struct osc_brw_async_args *new_aa;
1698         struct osc_async_page *oap;
1699
1700         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1701                   "redo for recoverable error %d", rc);
1702
1703         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1704                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1705                                   aa->aa_cli, aa->aa_oa,
1706                                   NULL /* lsm unused by osc currently */,
1707                                   aa->aa_page_count, aa->aa_ppga,
1708                                   &new_req, aa->aa_ocapa, 0, 1);
1709         if (rc)
1710                 return rc;
1711
1712         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1713                 if (oap->oap_request != NULL) {
1714                         LASSERTF(request == oap->oap_request,
1715                                  "request %p != oap_request %p\n",
1716                                  request, oap->oap_request);
1717                         if (oap->oap_interrupted) {
1718                                 ptlrpc_req_finished(new_req);
1719                                 return -EINTR;
1720                         }
1721                 }
1722         }
1723         /* New request takes over pga and oaps from old request.
1724          * Note that copying a list_head doesn't work, need to move it... */
1725         aa->aa_resends++;
1726         new_req->rq_interpret_reply = request->rq_interpret_reply;
1727         new_req->rq_async_args = request->rq_async_args;
1728         /* cap resend delay to the current request timeout, this is similar to
1729          * what ptlrpc does (see after_reply()) */
1730         if (aa->aa_resends > new_req->rq_timeout)
1731                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1732         else
1733                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1734         new_req->rq_generation_set = 1;
1735         new_req->rq_import_generation = request->rq_import_generation;
1736
1737         new_aa = ptlrpc_req_async_args(new_req);
1738
1739         INIT_LIST_HEAD(&new_aa->aa_oaps);
1740         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1741         INIT_LIST_HEAD(&new_aa->aa_exts);
1742         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1743         new_aa->aa_resends = aa->aa_resends;
1744
1745         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1746                 if (oap->oap_request) {
1747                         ptlrpc_req_finished(oap->oap_request);
1748                         oap->oap_request = ptlrpc_request_addref(new_req);
1749                 }
1750         }
1751
1752         new_aa->aa_ocapa = aa->aa_ocapa;
1753         aa->aa_ocapa = NULL;
1754
1755         /* XXX: This code will run into problem if we're going to support
1756          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1757          * and wait for all of them to be finished. We should inherit request
1758          * set from old request. */
1759         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1760
1761         DEBUG_REQ(D_INFO, new_req, "new request");
1762         return 0;
1763 }
1764
1765 /*
1766  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1767  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1768  * fine for our small page arrays and doesn't require allocation.  its an
1769  * insertion sort that swaps elements that are strides apart, shrinking the
1770  * stride down until its '1' and the array is sorted.
1771  */
1772 static void sort_brw_pages(struct brw_page **array, int num)
1773 {
1774         int stride, i, j;
1775         struct brw_page *tmp;
1776
1777         if (num == 1)
1778                 return;
1779         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1780                 ;
1781
1782         do {
1783                 stride /= 3;
1784                 for (i = stride ; i < num ; i++) {
1785                         tmp = array[i];
1786                         j = i;
1787                         while (j >= stride && array[j - stride]->off > tmp->off) {
1788                                 array[j] = array[j - stride];
1789                                 j -= stride;
1790                         }
1791                         array[j] = tmp;
1792                 }
1793         } while (stride > 1);
1794 }
1795
1796 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1797 {
1798         int count = 1;
1799         int offset;
1800         int i = 0;
1801
1802         LASSERT (pages > 0);
1803         offset = pg[i]->off & ~CFS_PAGE_MASK;
1804
1805         for (;;) {
1806                 pages--;
1807                 if (pages == 0)  /* that's all */
1808                         return count;
1809
1810                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1811                         return count;   /* doesn't end on page boundary */
1812
1813                 i++;
1814                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1815                 if (offset != 0)        /* doesn't start on page boundary */
1816                         return count;
1817
1818                 count++;
1819         }
1820 }
1821
1822 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1823 {
1824         struct brw_page **ppga;
1825         int i;
1826
1827         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1828         if (ppga == NULL)
1829                 return NULL;
1830
1831         for (i = 0; i < count; i++)
1832                 ppga[i] = pga + i;
1833         return ppga;
1834 }
1835
1836 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1837 {
1838         LASSERT(ppga != NULL);
1839         OBD_FREE(ppga, sizeof(*ppga) * count);
1840 }
1841
1842 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1843                    obd_count page_count, struct brw_page *pga,
1844                    struct obd_trans_info *oti)
1845 {
1846         struct obdo *saved_oa = NULL;
1847         struct brw_page **ppga, **orig;
1848         struct obd_import *imp = class_exp2cliimp(exp);
1849         struct client_obd *cli;
1850         int rc, page_count_orig;
1851
1852         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1853         cli = &imp->imp_obd->u.cli;
1854
1855         if (cmd & OBD_BRW_CHECK) {
1856                 /* The caller just wants to know if there's a chance that this
1857                  * I/O can succeed */
1858
1859                 if (imp->imp_invalid)
1860                         return -EIO;
1861                 return 0;
1862         }
1863
1864         /* test_brw with a failed create can trip this, maybe others. */
1865         LASSERT(cli->cl_max_pages_per_rpc);
1866
1867         rc = 0;
1868
1869         orig = ppga = osc_build_ppga(pga, page_count);
1870         if (ppga == NULL)
1871                 return -ENOMEM;
1872         page_count_orig = page_count;
1873
1874         sort_brw_pages(ppga, page_count);
1875         while (page_count) {
1876                 obd_count pages_per_brw;
1877
1878                 if (page_count > cli->cl_max_pages_per_rpc)
1879                         pages_per_brw = cli->cl_max_pages_per_rpc;
1880                 else
1881                         pages_per_brw = page_count;
1882
1883                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1884
1885                 if (saved_oa != NULL) {
1886                         /* restore previously saved oa */
1887                         *oinfo->oi_oa = *saved_oa;
1888                 } else if (page_count > pages_per_brw) {
1889                         /* save a copy of oa (brw will clobber it) */
1890                         OBDO_ALLOC(saved_oa);
1891                         if (saved_oa == NULL)
1892                                 GOTO(out, rc = -ENOMEM);
1893                         *saved_oa = *oinfo->oi_oa;
1894                 }
1895
1896                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1897                                       pages_per_brw, ppga, oinfo->oi_capa);
1898
1899                 if (rc != 0)
1900                         break;
1901
1902                 page_count -= pages_per_brw;
1903                 ppga += pages_per_brw;
1904         }
1905
1906 out:
1907         osc_release_ppga(orig, page_count_orig);
1908
1909         if (saved_oa != NULL)
1910                 OBDO_FREE(saved_oa);
1911
1912         return rc;
1913 }
1914
1915 static int brw_interpret(const struct lu_env *env,
1916                          struct ptlrpc_request *req, void *data, int rc)
1917 {
1918         struct osc_brw_async_args *aa = data;
1919         struct osc_extent *ext;
1920         struct osc_extent *tmp;
1921         struct cl_object  *obj = NULL;
1922         struct client_obd *cli = aa->aa_cli;
1923
1924         rc = osc_brw_fini_request(req, rc);
1925         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1926         /* When server return -EINPROGRESS, client should always retry
1927          * regardless of the number of times the bulk was resent already. */
1928         if (osc_recoverable_error(rc)) {
1929                 if (req->rq_import_generation !=
1930                     req->rq_import->imp_generation) {
1931                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1932                                ""DOSTID", rc = %d.\n",
1933                                req->rq_import->imp_obd->obd_name,
1934                                POSTID(&aa->aa_oa->o_oi), rc);
1935                 } else if (rc == -EINPROGRESS ||
1936                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1937                         rc = osc_brw_redo_request(req, aa, rc);
1938                 } else {
1939                         CERROR("%s: too many resent retries for object: "
1940                                ""LPU64":"LPU64", rc = %d.\n",
1941                                req->rq_import->imp_obd->obd_name,
1942                                POSTID(&aa->aa_oa->o_oi), rc);
1943                 }
1944
1945                 if (rc == 0)
1946                         return 0;
1947                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1948                         rc = -EIO;
1949         }
1950
1951         if (aa->aa_ocapa) {
1952                 capa_put(aa->aa_ocapa);
1953                 aa->aa_ocapa = NULL;
1954         }
1955
1956         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1957                 if (obj == NULL && rc == 0) {
1958                         obj = osc2cl(ext->oe_obj);
1959                         cl_object_get(obj);
1960                 }
1961
1962                 list_del_init(&ext->oe_link);
1963                 osc_extent_finish(env, ext, 1, rc);
1964         }
1965         LASSERT(list_empty(&aa->aa_exts));
1966         LASSERT(list_empty(&aa->aa_oaps));
1967
1968         if (obj != NULL) {
1969                 struct obdo *oa = aa->aa_oa;
1970                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1971                 unsigned long valid = 0;
1972
1973                 LASSERT(rc == 0);
1974                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1975                         attr->cat_blocks = oa->o_blocks;
1976                         valid |= CAT_BLOCKS;
1977                 }
1978                 if (oa->o_valid & OBD_MD_FLMTIME) {
1979                         attr->cat_mtime = oa->o_mtime;
1980                         valid |= CAT_MTIME;
1981                 }
1982                 if (oa->o_valid & OBD_MD_FLATIME) {
1983                         attr->cat_atime = oa->o_atime;
1984                         valid |= CAT_ATIME;
1985                 }
1986                 if (oa->o_valid & OBD_MD_FLCTIME) {
1987                         attr->cat_ctime = oa->o_ctime;
1988                         valid |= CAT_CTIME;
1989                 }
1990                 if (valid != 0) {
1991                         cl_object_attr_lock(obj);
1992                         cl_object_attr_set(env, obj, attr, valid);
1993                         cl_object_attr_unlock(obj);
1994                 }
1995                 cl_object_put(env, obj);
1996         }
1997         OBDO_FREE(aa->aa_oa);
1998
1999         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2000                           req->rq_bulk->bd_nob_transferred);
2001         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2002         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2003
2004         client_obd_list_lock(&cli->cl_loi_list_lock);
2005         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2006          * is called so we know whether to go to sync BRWs or wait for more
2007          * RPCs to complete */
2008         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2009                 cli->cl_w_in_flight--;
2010         else
2011                 cli->cl_r_in_flight--;
2012         osc_wake_cache_waiters(cli);
2013         client_obd_list_unlock(&cli->cl_loi_list_lock);
2014
2015         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2016         return rc;
2017 }
2018
2019 /**
2020  * Build an RPC by the list of extent @ext_list. The caller must ensure
2021  * that the total pages in this list are NOT over max pages per RPC.
2022  * Extents in the list must be in OES_RPC state.
2023  */
2024 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2025                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2026 {
2027         struct ptlrpc_request           *req = NULL;
2028         struct osc_extent               *ext;
2029         struct brw_page                 **pga = NULL;
2030         struct osc_brw_async_args       *aa = NULL;
2031         struct obdo                     *oa = NULL;
2032         struct osc_async_page           *oap;
2033         struct osc_async_page           *tmp;
2034         struct cl_req                   *clerq = NULL;
2035         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2036                                                                       CRT_READ;
2037         struct ldlm_lock                *lock = NULL;
2038         struct cl_req_attr              *crattr = NULL;
2039         obd_off                         starting_offset = OBD_OBJECT_EOF;
2040         obd_off                         ending_offset = 0;
2041         int                             mpflag = 0;
2042         int                             mem_tight = 0;
2043         int                             page_count = 0;
2044         int                             i;
2045         int                             rc;
2046         LIST_HEAD(rpc_list);
2047
2048         LASSERT(!list_empty(ext_list));
2049
2050         /* add pages into rpc_list to build BRW rpc */
2051         list_for_each_entry(ext, ext_list, oe_link) {
2052                 LASSERT(ext->oe_state == OES_RPC);
2053                 mem_tight |= ext->oe_memalloc;
2054                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2055                         ++page_count;
2056                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2057                         if (starting_offset > oap->oap_obj_off)
2058                                 starting_offset = oap->oap_obj_off;
2059                         else
2060                                 LASSERT(oap->oap_page_off == 0);
2061                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2062                                 ending_offset = oap->oap_obj_off +
2063                                                 oap->oap_count;
2064                         else
2065                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2066                                         PAGE_CACHE_SIZE);
2067                 }
2068         }
2069
2070         if (mem_tight)
2071                 mpflag = cfs_memory_pressure_get_and_set();
2072
2073         OBD_ALLOC(crattr, sizeof(*crattr));
2074         if (crattr == NULL)
2075                 GOTO(out, rc = -ENOMEM);
2076
2077         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2078         if (pga == NULL)
2079                 GOTO(out, rc = -ENOMEM);
2080
2081         OBDO_ALLOC(oa);
2082         if (oa == NULL)
2083                 GOTO(out, rc = -ENOMEM);
2084
2085         i = 0;
2086         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2087                 struct cl_page *page = oap2cl_page(oap);
2088                 if (clerq == NULL) {
2089                         clerq = cl_req_alloc(env, page, crt,
2090                                              1 /* only 1-object rpcs for now */);
2091                         if (IS_ERR(clerq))
2092                                 GOTO(out, rc = PTR_ERR(clerq));
2093                         lock = oap->oap_ldlm_lock;
2094                 }
2095                 if (mem_tight)
2096                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2097                 pga[i] = &oap->oap_brw_page;
2098                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2099                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2100                        pga[i]->pg, page_index(oap->oap_page), oap,
2101                        pga[i]->flag);
2102                 i++;
2103                 cl_req_page_add(env, clerq, page);
2104         }
2105
2106         /* always get the data for the obdo for the rpc */
2107         LASSERT(clerq != NULL);
2108         crattr->cra_oa = oa;
2109         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2110         if (lock) {
2111                 oa->o_handle = lock->l_remote_handle;
2112                 oa->o_valid |= OBD_MD_FLHANDLE;
2113         }
2114
2115         rc = cl_req_prep(env, clerq);
2116         if (rc != 0) {
2117                 CERROR("cl_req_prep failed: %d\n", rc);
2118                 GOTO(out, rc);
2119         }
2120
2121         sort_brw_pages(pga, page_count);
2122         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2123                         pga, &req, crattr->cra_capa, 1, 0);
2124         if (rc != 0) {
2125                 CERROR("prep_req failed: %d\n", rc);
2126                 GOTO(out, rc);
2127         }
2128
2129         req->rq_interpret_reply = brw_interpret;
2130
2131         if (mem_tight != 0)
2132                 req->rq_memalloc = 1;
2133
2134         /* Need to update the timestamps after the request is built in case
2135          * we race with setattr (locally or in queue at OST).  If OST gets
2136          * later setattr before earlier BRW (as determined by the request xid),
2137          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2138          * way to do this in a single call.  bug 10150 */
2139         cl_req_attr_set(env, clerq, crattr,
2140                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2141
2142         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2143
2144         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2145         aa = ptlrpc_req_async_args(req);
2146         INIT_LIST_HEAD(&aa->aa_oaps);
2147         list_splice_init(&rpc_list, &aa->aa_oaps);
2148         INIT_LIST_HEAD(&aa->aa_exts);
2149         list_splice_init(ext_list, &aa->aa_exts);
2150         aa->aa_clerq = clerq;
2151
2152         /* queued sync pages can be torn down while the pages
2153          * were between the pending list and the rpc */
2154         tmp = NULL;
2155         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2156                 /* only one oap gets a request reference */
2157                 if (tmp == NULL)
2158                         tmp = oap;
2159                 if (oap->oap_interrupted && !req->rq_intr) {
2160                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2161                                         oap, req);
2162                         ptlrpc_mark_interrupted(req);
2163                 }
2164         }
2165         if (tmp != NULL)
2166                 tmp->oap_request = ptlrpc_request_addref(req);
2167
2168         client_obd_list_lock(&cli->cl_loi_list_lock);
2169         starting_offset >>= PAGE_CACHE_SHIFT;
2170         if (cmd == OBD_BRW_READ) {
2171                 cli->cl_r_in_flight++;
2172                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2173                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2174                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2175                                       starting_offset + 1);
2176         } else {
2177                 cli->cl_w_in_flight++;
2178                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2179                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2180                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2181                                       starting_offset + 1);
2182         }
2183         client_obd_list_unlock(&cli->cl_loi_list_lock);
2184
2185         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2186                   page_count, aa, cli->cl_r_in_flight,
2187                   cli->cl_w_in_flight);
2188
2189         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2190          * see which CPU/NUMA node the majority of pages were allocated
2191          * on, and try to assign the async RPC to the CPU core
2192          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2193          *
2194          * But on the other hand, we expect that multiple ptlrpcd
2195          * threads and the initial write sponsor can run in parallel,
2196          * especially when data checksum is enabled, which is CPU-bound
2197          * operation and single ptlrpcd thread cannot process in time.
2198          * So more ptlrpcd threads sharing BRW load
2199          * (with PDL_POLICY_ROUND) seems better.
2200          */
2201         ptlrpcd_add_req(req, pol, -1);
2202         rc = 0;
2203
2204 out:
2205         if (mem_tight != 0)
2206                 cfs_memory_pressure_restore(mpflag);
2207
2208         if (crattr != NULL) {
2209                 capa_put(crattr->cra_capa);
2210                 OBD_FREE(crattr, sizeof(*crattr));
2211         }
2212
2213         if (rc != 0) {
2214                 LASSERT(req == NULL);
2215
2216                 if (oa)
2217                         OBDO_FREE(oa);
2218                 if (pga)
2219                         OBD_FREE(pga, sizeof(*pga) * page_count);
2220                 /* this should happen rarely and is pretty bad, it makes the
2221                  * pending list not follow the dirty order */
2222                 while (!list_empty(ext_list)) {
2223                         ext = list_entry(ext_list->next, struct osc_extent,
2224                                              oe_link);
2225                         list_del_init(&ext->oe_link);
2226                         osc_extent_finish(env, ext, 0, rc);
2227                 }
2228                 if (clerq && !IS_ERR(clerq))
2229                         cl_req_completion(env, clerq, rc);
2230         }
2231         return rc;
2232 }
2233
2234 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2235                                         struct ldlm_enqueue_info *einfo)
2236 {
2237         void *data = einfo->ei_cbdata;
2238         int set = 0;
2239
2240         LASSERT(lock != NULL);
2241         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2242         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2243         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2244         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2245
2246         lock_res_and_lock(lock);
2247         spin_lock(&osc_ast_guard);
2248
2249         if (lock->l_ast_data == NULL)
2250                 lock->l_ast_data = data;
2251         if (lock->l_ast_data == data)
2252                 set = 1;
2253
2254         spin_unlock(&osc_ast_guard);
2255         unlock_res_and_lock(lock);
2256
2257         return set;
2258 }
2259
2260 static int osc_set_data_with_check(struct lustre_handle *lockh,
2261                                    struct ldlm_enqueue_info *einfo)
2262 {
2263         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2264         int set = 0;
2265
2266         if (lock != NULL) {
2267                 set = osc_set_lock_data_with_check(lock, einfo);
2268                 LDLM_LOCK_PUT(lock);
2269         } else
2270                 CERROR("lockh %p, data %p - client evicted?\n",
2271                        lockh, einfo->ei_cbdata);
2272         return set;
2273 }
2274
2275 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2276                              ldlm_iterator_t replace, void *data)
2277 {
2278         struct ldlm_res_id res_id;
2279         struct obd_device *obd = class_exp2obd(exp);
2280
2281         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2282         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2283         return 0;
2284 }
2285
2286 /* find any ldlm lock of the inode in osc
2287  * return 0    not find
2288  *      1    find one
2289  *      < 0    error */
2290 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2291                            ldlm_iterator_t replace, void *data)
2292 {
2293         struct ldlm_res_id res_id;
2294         struct obd_device *obd = class_exp2obd(exp);
2295         int rc = 0;
2296
2297         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2298         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2299         if (rc == LDLM_ITER_STOP)
2300                 return(1);
2301         if (rc == LDLM_ITER_CONTINUE)
2302                 return(0);
2303         return(rc);
2304 }
2305
2306 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2307                             obd_enqueue_update_f upcall, void *cookie,
2308                             __u64 *flags, int agl, int rc)
2309 {
2310         int intent = *flags & LDLM_FL_HAS_INTENT;
2311
2312         if (intent) {
2313                 /* The request was created before ldlm_cli_enqueue call. */
2314                 if (rc == ELDLM_LOCK_ABORTED) {
2315                         struct ldlm_reply *rep;
2316                         rep = req_capsule_server_get(&req->rq_pill,
2317                                                      &RMF_DLM_REP);
2318
2319                         LASSERT(rep != NULL);
2320                         rep->lock_policy_res1 =
2321                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2322                         if (rep->lock_policy_res1)
2323                                 rc = rep->lock_policy_res1;
2324                 }
2325         }
2326
2327         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2328             (rc == 0)) {
2329                 *flags |= LDLM_FL_LVB_READY;
2330                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2331                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2332         }
2333
2334         /* Call the update callback. */
2335         rc = (*upcall)(cookie, rc);
2336         return rc;
2337 }
2338
2339 static int osc_enqueue_interpret(const struct lu_env *env,
2340                                  struct ptlrpc_request *req,
2341                                  struct osc_enqueue_args *aa, int rc)
2342 {
2343         struct ldlm_lock *lock;
2344         struct lustre_handle handle;
2345         __u32 mode;
2346         struct ost_lvb *lvb;
2347         __u32 lvb_len;
2348         __u64 *flags = aa->oa_flags;
2349
2350         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2351          * might be freed anytime after lock upcall has been called. */
2352         lustre_handle_copy(&handle, aa->oa_lockh);
2353         mode = aa->oa_ei->ei_mode;
2354
2355         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2356          * be valid. */
2357         lock = ldlm_handle2lock(&handle);
2358
2359         /* Take an additional reference so that a blocking AST that
2360          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2361          * to arrive after an upcall has been executed by
2362          * osc_enqueue_fini(). */
2363         ldlm_lock_addref(&handle, mode);
2364
2365         /* Let CP AST to grant the lock first. */
2366         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2367
2368         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2369                 lvb = NULL;
2370                 lvb_len = 0;
2371         } else {
2372                 lvb = aa->oa_lvb;
2373                 lvb_len = sizeof(*aa->oa_lvb);
2374         }
2375
2376         /* Complete obtaining the lock procedure. */
2377         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2378                                    mode, flags, lvb, lvb_len, &handle, rc);
2379         /* Complete osc stuff. */
2380         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2381                               flags, aa->oa_agl, rc);
2382
2383         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2384
2385         /* Release the lock for async request. */
2386         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2387                 /*
2388                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2389                  * not already released by
2390                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2391                  */
2392                 ldlm_lock_decref(&handle, mode);
2393
2394         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2395                  aa->oa_lockh, req, aa);
2396         ldlm_lock_decref(&handle, mode);
2397         LDLM_LOCK_PUT(lock);
2398         return rc;
2399 }
2400
2401 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2402                         struct lov_oinfo *loi, int flags,
2403                         struct ost_lvb *lvb, __u32 mode, int rc)
2404 {
2405         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2406
2407         if (rc == ELDLM_OK) {
2408                 __u64 tmp;
2409
2410                 LASSERT(lock != NULL);
2411                 loi->loi_lvb = *lvb;
2412                 tmp = loi->loi_lvb.lvb_size;
2413                 /* Extend KMS up to the end of this lock and no further
2414                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2415                 if (tmp > lock->l_policy_data.l_extent.end)
2416                         tmp = lock->l_policy_data.l_extent.end + 1;
2417                 if (tmp >= loi->loi_kms) {
2418                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2419                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2420                         loi_kms_set(loi, tmp);
2421                 } else {
2422                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2423                                    LPU64"; leaving kms="LPU64", end="LPU64,
2424                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2425                                    lock->l_policy_data.l_extent.end);
2426                 }
2427                 ldlm_lock_allow_match(lock);
2428         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2429                 LASSERT(lock != NULL);
2430                 loi->loi_lvb = *lvb;
2431                 ldlm_lock_allow_match(lock);
2432                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2433                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2434                 rc = ELDLM_OK;
2435         }
2436
2437         if (lock != NULL) {
2438                 if (rc != ELDLM_OK)
2439                         ldlm_lock_fail_match(lock);
2440
2441                 LDLM_LOCK_PUT(lock);
2442         }
2443 }
2444 EXPORT_SYMBOL(osc_update_enqueue);
2445
2446 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2447
2448 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2449  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2450  * other synchronous requests, however keeping some locks and trying to obtain
2451  * others may take a considerable amount of time in a case of ost failure; and
2452  * when other sync requests do not get released lock from a client, the client
2453  * is excluded from the cluster -- such scenarious make the life difficult, so
2454  * release locks just after they are obtained. */
2455 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2456                      __u64 *flags, ldlm_policy_data_t *policy,
2457                      struct ost_lvb *lvb, int kms_valid,
2458                      obd_enqueue_update_f upcall, void *cookie,
2459                      struct ldlm_enqueue_info *einfo,
2460                      struct lustre_handle *lockh,
2461                      struct ptlrpc_request_set *rqset, int async, int agl)
2462 {
2463         struct obd_device *obd = exp->exp_obd;
2464         struct ptlrpc_request *req = NULL;
2465         int intent = *flags & LDLM_FL_HAS_INTENT;
2466         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2467         ldlm_mode_t mode;
2468         int rc;
2469
2470         /* Filesystem lock extents are extended to page boundaries so that
2471          * dealing with the page cache is a little smoother.  */
2472         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2473         policy->l_extent.end |= ~CFS_PAGE_MASK;
2474
2475         /*
2476          * kms is not valid when either object is completely fresh (so that no
2477          * locks are cached), or object was evicted. In the latter case cached
2478          * lock cannot be used, because it would prime inode state with
2479          * potentially stale LVB.
2480          */
2481         if (!kms_valid)
2482                 goto no_match;
2483
2484         /* Next, search for already existing extent locks that will cover us */
2485         /* If we're trying to read, we also search for an existing PW lock.  The
2486          * VFS and page cache already protect us locally, so lots of readers/
2487          * writers can share a single PW lock.
2488          *
2489          * There are problems with conversion deadlocks, so instead of
2490          * converting a read lock to a write lock, we'll just enqueue a new
2491          * one.
2492          *
2493          * At some point we should cancel the read lock instead of making them
2494          * send us a blocking callback, but there are problems with canceling
2495          * locks out from other users right now, too. */
2496         mode = einfo->ei_mode;
2497         if (einfo->ei_mode == LCK_PR)
2498                 mode |= LCK_PW;
2499         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2500                                einfo->ei_type, policy, mode, lockh, 0);
2501         if (mode) {
2502                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2503
2504                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2505                         /* For AGL, if enqueue RPC is sent but the lock is not
2506                          * granted, then skip to process this strpe.
2507                          * Return -ECANCELED to tell the caller. */
2508                         ldlm_lock_decref(lockh, mode);
2509                         LDLM_LOCK_PUT(matched);
2510                         return -ECANCELED;
2511                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2512                         *flags |= LDLM_FL_LVB_READY;
2513                         /* addref the lock only if not async requests and PW
2514                          * lock is matched whereas we asked for PR. */
2515                         if (!rqset && einfo->ei_mode != mode)
2516                                 ldlm_lock_addref(lockh, LCK_PR);
2517                         if (intent) {
2518                                 /* I would like to be able to ASSERT here that
2519                                  * rss <= kms, but I can't, for reasons which
2520                                  * are explained in lov_enqueue() */
2521                         }
2522
2523                         /* We already have a lock, and it's referenced.
2524                          *
2525                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2526                          * AGL upcall may change it to CLS_HELD directly. */
2527                         (*upcall)(cookie, ELDLM_OK);
2528
2529                         if (einfo->ei_mode != mode)
2530                                 ldlm_lock_decref(lockh, LCK_PW);
2531                         else if (rqset)
2532                                 /* For async requests, decref the lock. */
2533                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2534                         LDLM_LOCK_PUT(matched);
2535                         return ELDLM_OK;
2536                 } else {
2537                         ldlm_lock_decref(lockh, mode);
2538                         LDLM_LOCK_PUT(matched);
2539                 }
2540         }
2541
2542  no_match:
2543         if (intent) {
2544                 LIST_HEAD(cancels);
2545                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2546                                            &RQF_LDLM_ENQUEUE_LVB);
2547                 if (req == NULL)
2548                         return -ENOMEM;
2549
2550                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2551                 if (rc) {
2552                         ptlrpc_request_free(req);
2553                         return rc;
2554                 }
2555
2556                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2557                                      sizeof(*lvb));
2558                 ptlrpc_request_set_replen(req);
2559         }
2560
2561         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2562         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2563
2564         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2565                               sizeof(*lvb), LVB_T_OST, lockh, async);
2566         if (rqset) {
2567                 if (!rc) {
2568                         struct osc_enqueue_args *aa;
2569                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2570                         aa = ptlrpc_req_async_args(req);
2571                         aa->oa_ei = einfo;
2572                         aa->oa_exp = exp;
2573                         aa->oa_flags  = flags;
2574                         aa->oa_upcall = upcall;
2575                         aa->oa_cookie = cookie;
2576                         aa->oa_lvb    = lvb;
2577                         aa->oa_lockh  = lockh;
2578                         aa->oa_agl    = !!agl;
2579
2580                         req->rq_interpret_reply =
2581                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2582                         if (rqset == PTLRPCD_SET)
2583                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2584                         else
2585                                 ptlrpc_set_add_req(rqset, req);
2586                 } else if (intent) {
2587                         ptlrpc_req_finished(req);
2588                 }
2589                 return rc;
2590         }
2591
2592         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2593         if (intent)
2594                 ptlrpc_req_finished(req);
2595
2596         return rc;
2597 }
2598
2599 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2600                        struct ldlm_enqueue_info *einfo,
2601                        struct ptlrpc_request_set *rqset)
2602 {
2603         struct ldlm_res_id res_id;
2604         int rc;
2605
2606         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2607         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2608                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2609                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2610                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2611                               rqset, rqset != NULL, 0);
2612         return rc;
2613 }
2614
2615 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2616                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2617                    int *flags, void *data, struct lustre_handle *lockh,
2618                    int unref)
2619 {
2620         struct obd_device *obd = exp->exp_obd;
2621         int lflags = *flags;
2622         ldlm_mode_t rc;
2623
2624         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2625                 return -EIO;
2626
2627         /* Filesystem lock extents are extended to page boundaries so that
2628          * dealing with the page cache is a little smoother */
2629         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2630         policy->l_extent.end |= ~CFS_PAGE_MASK;
2631
2632         /* Next, search for already existing extent locks that will cover us */
2633         /* If we're trying to read, we also search for an existing PW lock.  The
2634          * VFS and page cache already protect us locally, so lots of readers/
2635          * writers can share a single PW lock. */
2636         rc = mode;
2637         if (mode == LCK_PR)
2638                 rc |= LCK_PW;
2639         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2640                              res_id, type, policy, rc, lockh, unref);
2641         if (rc) {
2642                 if (data != NULL) {
2643                         if (!osc_set_data_with_check(lockh, data)) {
2644                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2645                                         ldlm_lock_decref(lockh, rc);
2646                                 return 0;
2647                         }
2648                 }
2649                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2650                         ldlm_lock_addref(lockh, LCK_PR);
2651                         ldlm_lock_decref(lockh, LCK_PW);
2652                 }
2653                 return rc;
2654         }
2655         return rc;
2656 }
2657
2658 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2659 {
2660         if (unlikely(mode == LCK_GROUP))
2661                 ldlm_lock_decref_and_cancel(lockh, mode);
2662         else
2663                 ldlm_lock_decref(lockh, mode);
2664
2665         return 0;
2666 }
2667
2668 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2669                       __u32 mode, struct lustre_handle *lockh)
2670 {
2671         return osc_cancel_base(lockh, mode);
2672 }
2673
2674 static int osc_cancel_unused(struct obd_export *exp,
2675                              struct lov_stripe_md *lsm,
2676                              ldlm_cancel_flags_t flags,
2677                              void *opaque)
2678 {
2679         struct obd_device *obd = class_exp2obd(exp);
2680         struct ldlm_res_id res_id, *resp = NULL;
2681
2682         if (lsm != NULL) {
2683                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2684                 resp = &res_id;
2685         }
2686
2687         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2688 }
2689
2690 static int osc_statfs_interpret(const struct lu_env *env,
2691                                 struct ptlrpc_request *req,
2692                                 struct osc_async_args *aa, int rc)
2693 {
2694         struct obd_statfs *msfs;
2695
2696         if (rc == -EBADR)
2697                 /* The request has in fact never been sent
2698                  * due to issues at a higher level (LOV).
2699                  * Exit immediately since the caller is
2700                  * aware of the problem and takes care
2701                  * of the clean up */
2702                  return rc;
2703
2704         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2705             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2706                 GOTO(out, rc = 0);
2707
2708         if (rc != 0)
2709                 GOTO(out, rc);
2710
2711         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712         if (msfs == NULL) {
2713                 GOTO(out, rc = -EPROTO);
2714         }
2715
2716         *aa->aa_oi->oi_osfs = *msfs;
2717 out:
2718         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719         return rc;
2720 }
2721
2722 static int osc_statfs_async(struct obd_export *exp,
2723                             struct obd_info *oinfo, __u64 max_age,
2724                             struct ptlrpc_request_set *rqset)
2725 {
2726         struct obd_device     *obd = class_exp2obd(exp);
2727         struct ptlrpc_request *req;
2728         struct osc_async_args *aa;
2729         int                 rc;
2730
2731         /* We could possibly pass max_age in the request (as an absolute
2732          * timestamp or a "seconds.usec ago") so the target can avoid doing
2733          * extra calls into the filesystem if that isn't necessary (e.g.
2734          * during mount that would help a bit).  Having relative timestamps
2735          * is not so great if request processing is slow, while absolute
2736          * timestamps are not ideal because they need time synchronization. */
2737         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2738         if (req == NULL)
2739                 return -ENOMEM;
2740
2741         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2742         if (rc) {
2743                 ptlrpc_request_free(req);
2744                 return rc;
2745         }
2746         ptlrpc_request_set_replen(req);
2747         req->rq_request_portal = OST_CREATE_PORTAL;
2748         ptlrpc_at_set_req_timeout(req);
2749
2750         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2751                 /* procfs requests not want stat in wait for avoid deadlock */
2752                 req->rq_no_resend = 1;
2753                 req->rq_no_delay = 1;
2754         }
2755
2756         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2757         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2758         aa = ptlrpc_req_async_args(req);
2759         aa->aa_oi = oinfo;
2760
2761         ptlrpc_set_add_req(rqset, req);
2762         return 0;
2763 }
2764
2765 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2766                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2767 {
2768         struct obd_device     *obd = class_exp2obd(exp);
2769         struct obd_statfs     *msfs;
2770         struct ptlrpc_request *req;
2771         struct obd_import     *imp = NULL;
2772         int rc;
2773
2774         /*Since the request might also come from lprocfs, so we need
2775          *sync this with client_disconnect_export Bug15684*/
2776         down_read(&obd->u.cli.cl_sem);
2777         if (obd->u.cli.cl_import)
2778                 imp = class_import_get(obd->u.cli.cl_import);
2779         up_read(&obd->u.cli.cl_sem);
2780         if (!imp)
2781                 return -ENODEV;
2782
2783         /* We could possibly pass max_age in the request (as an absolute
2784          * timestamp or a "seconds.usec ago") so the target can avoid doing
2785          * extra calls into the filesystem if that isn't necessary (e.g.
2786          * during mount that would help a bit).  Having relative timestamps
2787          * is not so great if request processing is slow, while absolute
2788          * timestamps are not ideal because they need time synchronization. */
2789         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2790
2791         class_import_put(imp);
2792
2793         if (req == NULL)
2794                 return -ENOMEM;
2795
2796         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2797         if (rc) {
2798                 ptlrpc_request_free(req);
2799                 return rc;
2800         }
2801         ptlrpc_request_set_replen(req);
2802         req->rq_request_portal = OST_CREATE_PORTAL;
2803         ptlrpc_at_set_req_timeout(req);
2804
2805         if (flags & OBD_STATFS_NODELAY) {
2806                 /* procfs requests not want stat in wait for avoid deadlock */
2807                 req->rq_no_resend = 1;
2808                 req->rq_no_delay = 1;
2809         }
2810
2811         rc = ptlrpc_queue_wait(req);
2812         if (rc)
2813                 GOTO(out, rc);
2814
2815         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2816         if (msfs == NULL) {
2817                 GOTO(out, rc = -EPROTO);
2818         }
2819
2820         *osfs = *msfs;
2821
2822  out:
2823         ptlrpc_req_finished(req);
2824         return rc;
2825 }
2826
2827 /* Retrieve object striping information.
2828  *
2829  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2830  * the maximum number of OST indices which will fit in the user buffer.
2831  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2832  */
2833 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2834 {
2835         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2836         struct lov_user_md_v3 lum, *lumk;
2837         struct lov_user_ost_data_v1 *lmm_objects;
2838         int rc = 0, lum_size;
2839
2840         if (!lsm)
2841                 return -ENODATA;
2842
2843         /* we only need the header part from user space to get lmm_magic and
2844          * lmm_stripe_count, (the header part is common to v1 and v3) */
2845         lum_size = sizeof(struct lov_user_md_v1);
2846         if (copy_from_user(&lum, lump, lum_size))
2847                 return -EFAULT;
2848
2849         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2850             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2851                 return -EINVAL;
2852
2853         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2854         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2855         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2856         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2857
2858         /* we can use lov_mds_md_size() to compute lum_size
2859          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2860         if (lum.lmm_stripe_count > 0) {
2861                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2862                 OBD_ALLOC(lumk, lum_size);
2863                 if (!lumk)
2864                         return -ENOMEM;
2865
2866                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2867                         lmm_objects =
2868                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2869                 else
2870                         lmm_objects = &(lumk->lmm_objects[0]);
2871                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2872         } else {
2873                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2874                 lumk = &lum;
2875         }
2876
2877         lumk->lmm_oi = lsm->lsm_oi;
2878         lumk->lmm_stripe_count = 1;
2879
2880         if (copy_to_user(lump, lumk, lum_size))
2881                 rc = -EFAULT;
2882
2883         if (lumk != &lum)
2884                 OBD_FREE(lumk, lum_size);
2885
2886         return rc;
2887 }
2888
2889
2890 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2891                          void *karg, void *uarg)
2892 {
2893         struct obd_device *obd = exp->exp_obd;
2894         struct obd_ioctl_data *data = karg;
2895         int err = 0;
2896
2897         if (!try_module_get(THIS_MODULE)) {
2898                 CERROR("Can't get module. Is it alive?");
2899                 return -EINVAL;
2900         }
2901         switch (cmd) {
2902         case OBD_IOC_LOV_GET_CONFIG: {
2903                 char *buf;
2904                 struct lov_desc *desc;
2905                 struct obd_uuid uuid;
2906
2907                 buf = NULL;
2908                 len = 0;
2909                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2910                         GOTO(out, err = -EINVAL);
2911
2912                 data = (struct obd_ioctl_data *)buf;
2913
2914                 if (sizeof(*desc) > data->ioc_inllen1) {
2915                         obd_ioctl_freedata(buf, len);
2916                         GOTO(out, err = -EINVAL);
2917                 }
2918
2919                 if (data->ioc_inllen2 < sizeof(uuid)) {
2920                         obd_ioctl_freedata(buf, len);
2921                         GOTO(out, err = -EINVAL);
2922                 }
2923
2924                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2925                 desc->ld_tgt_count = 1;
2926                 desc->ld_active_tgt_count = 1;
2927                 desc->ld_default_stripe_count = 1;
2928                 desc->ld_default_stripe_size = 0;
2929                 desc->ld_default_stripe_offset = 0;
2930                 desc->ld_pattern = 0;
2931                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2932
2933                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2934
2935                 err = copy_to_user((void *)uarg, buf, len);
2936                 if (err)
2937                         err = -EFAULT;
2938                 obd_ioctl_freedata(buf, len);
2939                 GOTO(out, err);
2940         }
2941         case LL_IOC_LOV_SETSTRIPE:
2942                 err = obd_alloc_memmd(exp, karg);
2943                 if (err > 0)
2944                         err = 0;
2945                 GOTO(out, err);
2946         case LL_IOC_LOV_GETSTRIPE:
2947                 err = osc_getstripe(karg, uarg);
2948                 GOTO(out, err);
2949         case OBD_IOC_CLIENT_RECOVER:
2950                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2951                                             data->ioc_inlbuf1, 0);
2952                 if (err > 0)
2953                         err = 0;
2954                 GOTO(out, err);
2955         case IOC_OSC_SET_ACTIVE:
2956                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2957                                                data->ioc_offset);
2958                 GOTO(out, err);
2959         case OBD_IOC_POLL_QUOTACHECK:
2960                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2961                 GOTO(out, err);
2962         case OBD_IOC_PING_TARGET:
2963                 err = ptlrpc_obd_ping(obd);
2964                 GOTO(out, err);
2965         default:
2966                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2967                        cmd, current_comm());
2968                 GOTO(out, err = -ENOTTY);
2969         }
2970 out:
2971         module_put(THIS_MODULE);
2972         return err;
2973 }
2974
2975 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2976                         obd_count keylen, void *key, __u32 *vallen, void *val,
2977                         struct lov_stripe_md *lsm)
2978 {
2979         if (!vallen || !val)
2980                 return -EFAULT;
2981
2982         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2983                 __u32 *stripe = val;
2984                 *vallen = sizeof(*stripe);
2985                 *stripe = 0;
2986                 return 0;
2987         } else if (KEY_IS(KEY_LAST_ID)) {
2988                 struct ptlrpc_request *req;
2989                 obd_id          *reply;
2990                 char              *tmp;
2991                 int                 rc;
2992
2993                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2994                                            &RQF_OST_GET_INFO_LAST_ID);
2995                 if (req == NULL)
2996                         return -ENOMEM;
2997
2998                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2999                                      RCL_CLIENT, keylen);
3000                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3001                 if (rc) {
3002                         ptlrpc_request_free(req);
3003                         return rc;
3004                 }
3005
3006                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3007                 memcpy(tmp, key, keylen);
3008
3009                 req->rq_no_delay = req->rq_no_resend = 1;
3010                 ptlrpc_request_set_replen(req);
3011                 rc = ptlrpc_queue_wait(req);
3012                 if (rc)
3013                         GOTO(out, rc);
3014
3015                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3016                 if (reply == NULL)
3017                         GOTO(out, rc = -EPROTO);
3018
3019                 *((obd_id *)val) = *reply;
3020         out:
3021                 ptlrpc_req_finished(req);
3022                 return rc;
3023         } else if (KEY_IS(KEY_FIEMAP)) {
3024                 struct ll_fiemap_info_key *fm_key =
3025                                 (struct ll_fiemap_info_key *)key;
3026                 struct ldlm_res_id       res_id;
3027                 ldlm_policy_data_t       policy;
3028                 struct lustre_handle     lockh;
3029                 ldlm_mode_t              mode = 0;
3030                 struct ptlrpc_request   *req;
3031                 struct ll_user_fiemap   *reply;
3032                 char                    *tmp;
3033                 int                      rc;
3034
3035                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3036                         goto skip_locking;
3037
3038                 policy.l_extent.start = fm_key->fiemap.fm_start &
3039                                                 CFS_PAGE_MASK;
3040
3041                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3042                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3043                         policy.l_extent.end = OBD_OBJECT_EOF;
3044                 else
3045                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3046                                 fm_key->fiemap.fm_length +
3047                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3048
3049                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3050                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3051                                        LDLM_FL_BLOCK_GRANTED |
3052                                        LDLM_FL_LVB_READY,
3053                                        &res_id, LDLM_EXTENT, &policy,
3054                                        LCK_PR | LCK_PW, &lockh, 0);
3055                 if (mode) { /* lock is cached on client */
3056                         if (mode != LCK_PR) {
3057                                 ldlm_lock_addref(&lockh, LCK_PR);
3058                                 ldlm_lock_decref(&lockh, LCK_PW);
3059                         }
3060                 } else { /* no cached lock, needs acquire lock on server side */
3061                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3062                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3063                 }
3064
3065 skip_locking:
3066                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3067                                            &RQF_OST_GET_INFO_FIEMAP);
3068                 if (req == NULL)
3069                         GOTO(drop_lock, rc = -ENOMEM);
3070
3071                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3072                                      RCL_CLIENT, keylen);
3073                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3074                                      RCL_CLIENT, *vallen);
3075                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3076                                      RCL_SERVER, *vallen);
3077
3078                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3079                 if (rc) {
3080                         ptlrpc_request_free(req);
3081                         GOTO(drop_lock, rc);
3082                 }
3083
3084                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3085                 memcpy(tmp, key, keylen);
3086                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087                 memcpy(tmp, val, *vallen);
3088
3089                 ptlrpc_request_set_replen(req);
3090                 rc = ptlrpc_queue_wait(req);
3091                 if (rc)
3092                         GOTO(fini_req, rc);
3093
3094                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3095                 if (reply == NULL)
3096                         GOTO(fini_req, rc = -EPROTO);
3097
3098                 memcpy(val, reply, *vallen);
3099 fini_req:
3100                 ptlrpc_req_finished(req);
3101 drop_lock:
3102                 if (mode)
3103                         ldlm_lock_decref(&lockh, LCK_PR);
3104                 return rc;
3105         }
3106
3107         return -EINVAL;
3108 }
3109
3110 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3111                               obd_count keylen, void *key, obd_count vallen,
3112                               void *val, struct ptlrpc_request_set *set)
3113 {
3114         struct ptlrpc_request *req;
3115         struct obd_device     *obd = exp->exp_obd;
3116         struct obd_import     *imp = class_exp2cliimp(exp);
3117         char              *tmp;
3118         int                 rc;
3119
3120         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3121
3122         if (KEY_IS(KEY_CHECKSUM)) {
3123                 if (vallen != sizeof(int))
3124                         return -EINVAL;
3125                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3126                 return 0;
3127         }
3128
3129         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3130                 sptlrpc_conf_client_adapt(obd);
3131                 return 0;
3132         }
3133
3134         if (KEY_IS(KEY_FLUSH_CTX)) {
3135                 sptlrpc_import_flush_my_ctx(imp);
3136                 return 0;
3137         }
3138
3139         if (KEY_IS(KEY_CACHE_SET)) {
3140                 struct client_obd *cli = &obd->u.cli;
3141
3142                 LASSERT(cli->cl_cache == NULL); /* only once */
3143                 cli->cl_cache = (struct cl_client_cache *)val;
3144                 atomic_inc(&cli->cl_cache->ccc_users);
3145                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3146
3147                 /* add this osc into entity list */
3148                 LASSERT(list_empty(&cli->cl_lru_osc));
3149                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3150                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3151                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3152
3153                 return 0;
3154         }
3155
3156         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3157                 struct client_obd *cli = &obd->u.cli;
3158                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3159                 int target = *(int *)val;
3160
3161                 nr = osc_lru_shrink(cli, min(nr, target));
3162                 *(int *)val -= nr;
3163                 return 0;
3164         }
3165
3166         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3167                 return -EINVAL;
3168
3169         /* We pass all other commands directly to OST. Since nobody calls osc
3170            methods directly and everybody is supposed to go through LOV, we
3171            assume lov checked invalid values for us.
3172            The only recognised values so far are evict_by_nid and mds_conn.
3173            Even if something bad goes through, we'd get a -EINVAL from OST
3174            anyway. */
3175
3176         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3177                                                 &RQF_OST_SET_GRANT_INFO :
3178                                                 &RQF_OBD_SET_INFO);
3179         if (req == NULL)
3180                 return -ENOMEM;
3181
3182         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3183                              RCL_CLIENT, keylen);
3184         if (!KEY_IS(KEY_GRANT_SHRINK))
3185                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3186                                      RCL_CLIENT, vallen);
3187         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3188         if (rc) {
3189                 ptlrpc_request_free(req);
3190                 return rc;
3191         }
3192
3193         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3194         memcpy(tmp, key, keylen);
3195         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3196                                                         &RMF_OST_BODY :
3197                                                         &RMF_SETINFO_VAL);
3198         memcpy(tmp, val, vallen);
3199
3200         if (KEY_IS(KEY_GRANT_SHRINK)) {
3201                 struct osc_grant_args *aa;
3202                 struct obdo *oa;
3203
3204                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3205                 aa = ptlrpc_req_async_args(req);
3206                 OBDO_ALLOC(oa);
3207                 if (!oa) {
3208                         ptlrpc_req_finished(req);
3209                         return -ENOMEM;
3210                 }
3211                 *oa = ((struct ost_body *)val)->oa;
3212                 aa->aa_oa = oa;
3213                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3214         }
3215
3216         ptlrpc_request_set_replen(req);
3217         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3218                 LASSERT(set != NULL);
3219                 ptlrpc_set_add_req(set, req);
3220                 ptlrpc_check_set(NULL, set);
3221         } else
3222                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3223
3224         return 0;
3225 }
3226
3227
3228 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3229                          struct obd_device *disk_obd, int *index)
3230 {
3231         /* this code is not supposed to be used with LOD/OSP
3232          * to be removed soon */
3233         LBUG();
3234         return 0;
3235 }
3236
3237 static int osc_llog_finish(struct obd_device *obd, int count)
3238 {
3239         struct llog_ctxt *ctxt;
3240
3241         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3242         if (ctxt) {
3243                 llog_cat_close(NULL, ctxt->loc_handle);
3244                 llog_cleanup(NULL, ctxt);
3245         }
3246
3247         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3248         if (ctxt)
3249                 llog_cleanup(NULL, ctxt);
3250         return 0;
3251 }
3252
3253 static int osc_reconnect(const struct lu_env *env,
3254                          struct obd_export *exp, struct obd_device *obd,
3255                          struct obd_uuid *cluuid,
3256                          struct obd_connect_data *data,
3257                          void *localdata)
3258 {
3259         struct client_obd *cli = &obd->u.cli;
3260
3261         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3262                 long lost_grant;
3263
3264                 client_obd_list_lock(&cli->cl_loi_list_lock);
3265                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3266                                 2 * cli_brw_size(obd);
3267                 lost_grant = cli->cl_lost_grant;
3268                 cli->cl_lost_grant = 0;
3269                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3270
3271                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3272                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3273                        data->ocd_version, data->ocd_grant, lost_grant);
3274         }
3275
3276         return 0;
3277 }
3278
3279 static int osc_disconnect(struct obd_export *exp)
3280 {
3281         struct obd_device *obd = class_exp2obd(exp);
3282         struct llog_ctxt  *ctxt;
3283         int rc;
3284
3285         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3286         if (ctxt) {
3287                 if (obd->u.cli.cl_conn_count == 1) {
3288                         /* Flush any remaining cancel messages out to the
3289                          * target */
3290                         llog_sync(ctxt, exp, 0);
3291                 }
3292                 llog_ctxt_put(ctxt);
3293         } else {
3294                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3295                        obd);
3296         }
3297
3298         rc = client_disconnect_export(exp);
3299         /**
3300          * Initially we put del_shrink_grant before disconnect_export, but it
3301          * causes the following problem if setup (connect) and cleanup
3302          * (disconnect) are tangled together.
3303          *      connect p1                   disconnect p2
3304          *   ptlrpc_connect_import
3305          *     ...............         class_manual_cleanup
3306          *                                   osc_disconnect
3307          *                                   del_shrink_grant
3308          *   ptlrpc_connect_interrupt
3309          *     init_grant_shrink
3310          *   add this client to shrink list
3311          *                                    cleanup_osc
3312          * Bang! pinger trigger the shrink.
3313          * So the osc should be disconnected from the shrink list, after we
3314          * are sure the import has been destroyed. BUG18662
3315          */
3316         if (obd->u.cli.cl_import == NULL)
3317                 osc_del_shrink_grant(&obd->u.cli);
3318         return rc;
3319 }
3320
3321 static int osc_import_event(struct obd_device *obd,
3322                             struct obd_import *imp,
3323                             enum obd_import_event event)
3324 {
3325         struct client_obd *cli;
3326         int rc = 0;
3327
3328         LASSERT(imp->imp_obd == obd);
3329
3330         switch (event) {
3331         case IMP_EVENT_DISCON: {
3332                 cli = &obd->u.cli;
3333                 client_obd_list_lock(&cli->cl_loi_list_lock);
3334                 cli->cl_avail_grant = 0;
3335                 cli->cl_lost_grant = 0;
3336                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3337                 break;
3338         }
3339         case IMP_EVENT_INACTIVE: {
3340                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3341                 break;
3342         }
3343         case IMP_EVENT_INVALIDATE: {
3344                 struct ldlm_namespace *ns = obd->obd_namespace;
3345                 struct lu_env    *env;
3346                 int                 refcheck;
3347
3348                 env = cl_env_get(&refcheck);
3349                 if (!IS_ERR(env)) {
3350                         /* Reset grants */
3351                         cli = &obd->u.cli;
3352                         /* all pages go to failing rpcs due to the invalid
3353                          * import */
3354                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3355
3356                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3357                         cl_env_put(env, &refcheck);
3358                 } else
3359                         rc = PTR_ERR(env);
3360                 break;
3361         }
3362         case IMP_EVENT_ACTIVE: {
3363                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3364                 break;
3365         }
3366         case IMP_EVENT_OCD: {
3367                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3368
3369                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3370                         osc_init_grant(&obd->u.cli, ocd);
3371
3372                 /* See bug 7198 */
3373                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3374                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3375
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3377                 break;
3378         }
3379         case IMP_EVENT_DEACTIVATE: {
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3381                 break;
3382         }
3383         case IMP_EVENT_ACTIVATE: {
3384                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3385                 break;
3386         }
3387         default:
3388                 CERROR("Unknown import event %d\n", event);
3389                 LBUG();
3390         }
3391         return rc;
3392 }
3393
3394 /**
3395  * Determine whether the lock can be canceled before replaying the lock
3396  * during recovery, see bug16774 for detailed information.
3397  *
3398  * \retval zero the lock can't be canceled
3399  * \retval other ok to cancel
3400  */
3401 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3402 {
3403         check_res_locked(lock->l_resource);
3404
3405         /*
3406          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3407          *
3408          * XXX as a future improvement, we can also cancel unused write lock
3409          * if it doesn't have dirty data and active mmaps.
3410          */
3411         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3412             (lock->l_granted_mode == LCK_PR ||
3413              lock->l_granted_mode == LCK_CR) &&
3414             (osc_dlm_lock_pageref(lock) == 0))
3415                 return 1;
3416
3417         return 0;
3418 }
3419
3420 static int brw_queue_work(const struct lu_env *env, void *data)
3421 {
3422         struct client_obd *cli = data;
3423
3424         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3425
3426         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3427         return 0;
3428 }
3429
3430 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3431 {
3432         struct lprocfs_static_vars lvars = { 0 };
3433         struct client_obd         *cli = &obd->u.cli;
3434         void                   *handler;
3435         int                     rc;
3436
3437         rc = ptlrpcd_addref();
3438         if (rc)
3439                 return rc;
3440
3441         rc = client_obd_setup(obd, lcfg);
3442         if (rc)
3443                 GOTO(out_ptlrpcd, rc);
3444
3445         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3446         if (IS_ERR(handler))
3447                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3448         cli->cl_writeback_work = handler;
3449
3450         rc = osc_quota_setup(obd);
3451         if (rc)
3452                 GOTO(out_ptlrpcd_work, rc);
3453
3454         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3455         lprocfs_osc_init_vars(&lvars);
3456         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3457                 lproc_osc_attach_seqstat(obd);
3458                 sptlrpc_lprocfs_cliobd_attach(obd);
3459                 ptlrpc_lprocfs_register_obd(obd);
3460         }
3461
3462         /* We need to allocate a few requests more, because
3463          * brw_interpret tries to create new requests before freeing
3464          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3465          * reserved, but I'm afraid that might be too much wasted RAM
3466          * in fact, so 2 is just my guess and still should work. */
3467         cli->cl_import->imp_rq_pool =
3468                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3469                                     OST_MAXREQSIZE,
3470                                     ptlrpc_add_rqs_to_pool);
3471
3472         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3473         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3474         return rc;
3475
3476 out_ptlrpcd_work:
3477         ptlrpcd_destroy_work(handler);
3478 out_client_setup:
3479         client_obd_cleanup(obd);
3480 out_ptlrpcd:
3481         ptlrpcd_decref();
3482         return rc;
3483 }
3484
3485 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3486 {
3487         int rc = 0;
3488
3489         switch (stage) {
3490         case OBD_CLEANUP_EARLY: {
3491                 struct obd_import *imp;
3492                 imp = obd->u.cli.cl_import;
3493                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3494                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3495                 ptlrpc_deactivate_import(imp);
3496                 spin_lock(&imp->imp_lock);
3497                 imp->imp_pingable = 0;
3498                 spin_unlock(&imp->imp_lock);
3499                 break;
3500         }
3501         case OBD_CLEANUP_EXPORTS: {
3502                 struct client_obd *cli = &obd->u.cli;
3503                 /* LU-464
3504                  * for echo client, export may be on zombie list, wait for
3505                  * zombie thread to cull it, because cli.cl_import will be
3506                  * cleared in client_disconnect_export():
3507                  *   class_export_destroy() -> obd_cleanup() ->
3508                  *   echo_device_free() -> echo_client_cleanup() ->
3509                  *   obd_disconnect() -> osc_disconnect() ->
3510                  *   client_disconnect_export()
3511                  */
3512                 obd_zombie_barrier();
3513                 if (cli->cl_writeback_work) {
3514                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3515                         cli->cl_writeback_work = NULL;
3516                 }
3517                 obd_cleanup_client_import(obd);
3518                 ptlrpc_lprocfs_unregister_obd(obd);
3519                 lprocfs_obd_cleanup(obd);
3520                 rc = obd_llog_finish(obd, 0);
3521                 if (rc != 0)
3522                         CERROR("failed to cleanup llogging subsystems\n");
3523                 break;
3524                 }
3525         }
3526         return rc;
3527 }
3528
3529 int osc_cleanup(struct obd_device *obd)
3530 {
3531         struct client_obd *cli = &obd->u.cli;
3532         int rc;
3533
3534         /* lru cleanup */
3535         if (cli->cl_cache != NULL) {
3536                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3537                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3538                 list_del_init(&cli->cl_lru_osc);
3539                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3540                 cli->cl_lru_left = NULL;
3541                 atomic_dec(&cli->cl_cache->ccc_users);
3542                 cli->cl_cache = NULL;
3543         }
3544
3545         /* free memory of osc quota cache */
3546         osc_quota_cleanup(obd);
3547
3548         rc = client_obd_cleanup(obd);
3549
3550         ptlrpcd_decref();
3551         return rc;
3552 }
3553
3554 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3555 {
3556         struct lprocfs_static_vars lvars = { 0 };
3557         int rc = 0;
3558
3559         lprocfs_osc_init_vars(&lvars);
3560
3561         switch (lcfg->lcfg_command) {
3562         default:
3563                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3564                                               lcfg, obd);
3565                 if (rc > 0)
3566                         rc = 0;
3567                 break;
3568         }
3569
3570         return(rc);
3571 }
3572
3573 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3574 {
3575         return osc_process_config_base(obd, buf);
3576 }
3577
3578 struct obd_ops osc_obd_ops = {
3579         .o_owner                = THIS_MODULE,
3580         .o_setup                = osc_setup,
3581         .o_precleanup      = osc_precleanup,
3582         .o_cleanup            = osc_cleanup,
3583         .o_add_conn          = client_import_add_conn,
3584         .o_del_conn          = client_import_del_conn,
3585         .o_connect            = client_connect_import,
3586         .o_reconnect        = osc_reconnect,
3587         .o_disconnect      = osc_disconnect,
3588         .o_statfs              = osc_statfs,
3589         .o_statfs_async  = osc_statfs_async,
3590         .o_packmd              = osc_packmd,
3591         .o_unpackmd          = osc_unpackmd,
3592         .o_create              = osc_create,
3593         .o_destroy            = osc_destroy,
3594         .o_getattr            = osc_getattr,
3595         .o_getattr_async        = osc_getattr_async,
3596         .o_setattr            = osc_setattr,
3597         .o_setattr_async        = osc_setattr_async,
3598         .o_brw            = osc_brw,
3599         .o_punch                = osc_punch,
3600         .o_sync          = osc_sync,
3601         .o_enqueue            = osc_enqueue,
3602         .o_change_cbdata        = osc_change_cbdata,
3603         .o_find_cbdata    = osc_find_cbdata,
3604         .o_cancel              = osc_cancel,
3605         .o_cancel_unused        = osc_cancel_unused,
3606         .o_iocontrol        = osc_iocontrol,
3607         .o_get_info          = osc_get_info,
3608         .o_set_info_async       = osc_set_info_async,
3609         .o_import_event  = osc_import_event,
3610         .o_llog_init        = osc_llog_init,
3611         .o_llog_finish    = osc_llog_finish,
3612         .o_process_config       = osc_process_config,
3613         .o_quotactl          = osc_quotactl,
3614         .o_quotacheck      = osc_quotacheck,
3615 };
3616
3617 extern struct lu_kmem_descr osc_caches[];
3618 extern spinlock_t osc_ast_guard;
3619 extern struct lock_class_key osc_ast_guard_class;
3620
3621 int __init osc_init(void)
3622 {
3623         struct lprocfs_static_vars lvars = { 0 };
3624         int rc;
3625
3626         /* print an address of _any_ initialized kernel symbol from this
3627          * module, to allow debugging with gdb that doesn't support data
3628          * symbols from modules.*/
3629         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3630
3631         rc = lu_kmem_init(osc_caches);
3632         if (rc)
3633                 return rc;
3634
3635         lprocfs_osc_init_vars(&lvars);
3636
3637         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3638                                  LUSTRE_OSC_NAME, &osc_device_type);
3639         if (rc) {
3640                 lu_kmem_fini(osc_caches);
3641                 return rc;
3642         }
3643
3644         spin_lock_init(&osc_ast_guard);
3645         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3646
3647         return rc;
3648 }
3649
3650 static void /*__exit*/ osc_exit(void)
3651 {
3652         class_unregister_type(LUSTRE_OSC_NAME);
3653         lu_kmem_fini(osc_caches);
3654 }
3655
3656 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3657 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3658 MODULE_LICENSE("GPL");
3659 MODULE_VERSION(LUSTRE_VERSION_STRING);
3660
3661 module_init(osc_init);
3662 module_exit(osc_exit);