Merge remote-tracking branch 'upstream' into next
[cascardo/linux.git] / fs / nfs / direct.c
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001  Initial implementation for 2.4  --cel
33  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003  Port to 2.5 APIs  --cel
35  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004  Parallel async reads  --cel
37  * 04 May 2005  support O_DIRECT with aio  --cel
38  *
39  */
40
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/file.h>
45 #include <linux/pagemap.h>
46 #include <linux/kref.h>
47 #include <linux/slab.h>
48 #include <linux/task_io_accounting_ops.h>
49
50 #include <linux/nfs_fs.h>
51 #include <linux/nfs_page.h>
52 #include <linux/sunrpc/clnt.h>
53
54 #include <asm/uaccess.h>
55 #include <linux/atomic.h>
56
57 #include "internal.h"
58 #include "iostat.h"
59 #include "pnfs.h"
60
61 #define NFSDBG_FACILITY         NFSDBG_VFS
62
63 static struct kmem_cache *nfs_direct_cachep;
64
65 /*
66  * This represents a set of asynchronous requests that we're waiting on
67  */
68 struct nfs_direct_req {
69         struct kref             kref;           /* release manager */
70
71         /* I/O parameters */
72         struct nfs_open_context *ctx;           /* file open context info */
73         struct nfs_lock_context *l_ctx;         /* Lock context info */
74         struct kiocb *          iocb;           /* controlling i/o request */
75         struct inode *          inode;          /* target file of i/o */
76
77         /* completion state */
78         atomic_t                io_count;       /* i/os we're waiting for */
79         spinlock_t              lock;           /* protect completion state */
80         ssize_t                 count,          /* bytes actually processed */
81                                 error;          /* any reported error */
82         struct completion       completion;     /* wait for i/o completion */
83
84         /* commit state */
85         struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
86         struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
87         struct work_struct      work;
88         int                     flags;
89 #define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
90 #define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
91         struct nfs_writeverf    verf;           /* unstable write verifier */
92 };
93
94 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
95 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
96 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
97 static void nfs_direct_write_schedule_work(struct work_struct *work);
98
99 static inline void get_dreq(struct nfs_direct_req *dreq)
100 {
101         atomic_inc(&dreq->io_count);
102 }
103
104 static inline int put_dreq(struct nfs_direct_req *dreq)
105 {
106         return atomic_dec_and_test(&dreq->io_count);
107 }
108
109 /**
110  * nfs_direct_IO - NFS address space operation for direct I/O
111  * @rw: direction (read or write)
112  * @iocb: target I/O control block
113  * @iov: array of vectors that define I/O buffer
114  * @pos: offset in file to begin the operation
115  * @nr_segs: size of iovec array
116  *
117  * The presence of this routine in the address space ops vector means
118  * the NFS client supports direct I/O. However, for most direct IO, we
119  * shunt off direct read and write requests before the VFS gets them,
120  * so this method is only ever called for swap.
121  */
122 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
123 {
124 #ifndef CONFIG_NFS_SWAP
125         dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
126                         iocb->ki_filp->f_path.dentry->d_name.name,
127                         (long long) pos, nr_segs);
128
129         return -EINVAL;
130 #else
131         VM_BUG_ON(iocb->ki_left != PAGE_SIZE);
132         VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
133
134         if (rw == READ || rw == KERNEL_READ)
135                 return nfs_file_direct_read(iocb, iov, nr_segs, pos,
136                                 rw == READ ? true : false);
137         return nfs_file_direct_write(iocb, iov, nr_segs, pos,
138                                 rw == WRITE ? true : false);
139 #endif /* CONFIG_NFS_SWAP */
140 }
141
142 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
143 {
144         unsigned int i;
145         for (i = 0; i < npages; i++)
146                 page_cache_release(pages[i]);
147 }
148
149 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
150                               struct nfs_direct_req *dreq)
151 {
152         cinfo->lock = &dreq->lock;
153         cinfo->mds = &dreq->mds_cinfo;
154         cinfo->ds = &dreq->ds_cinfo;
155         cinfo->dreq = dreq;
156         cinfo->completion_ops = &nfs_direct_commit_completion_ops;
157 }
158
159 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
160 {
161         struct nfs_direct_req *dreq;
162
163         dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
164         if (!dreq)
165                 return NULL;
166
167         kref_init(&dreq->kref);
168         kref_get(&dreq->kref);
169         init_completion(&dreq->completion);
170         INIT_LIST_HEAD(&dreq->mds_cinfo.list);
171         INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
172         spin_lock_init(&dreq->lock);
173
174         return dreq;
175 }
176
177 static void nfs_direct_req_free(struct kref *kref)
178 {
179         struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
180
181         if (dreq->l_ctx != NULL)
182                 nfs_put_lock_context(dreq->l_ctx);
183         if (dreq->ctx != NULL)
184                 put_nfs_open_context(dreq->ctx);
185         kmem_cache_free(nfs_direct_cachep, dreq);
186 }
187
188 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
189 {
190         kref_put(&dreq->kref, nfs_direct_req_free);
191 }
192
193 /*
194  * Collects and returns the final error value/byte-count.
195  */
196 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
197 {
198         ssize_t result = -EIOCBQUEUED;
199
200         /* Async requests don't wait here */
201         if (dreq->iocb)
202                 goto out;
203
204         result = wait_for_completion_killable(&dreq->completion);
205
206         if (!result)
207                 result = dreq->error;
208         if (!result)
209                 result = dreq->count;
210
211 out:
212         return (ssize_t) result;
213 }
214
215 /*
216  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
217  * the iocb is still valid here if this is a synchronous request.
218  */
219 static void nfs_direct_complete(struct nfs_direct_req *dreq)
220 {
221         if (dreq->iocb) {
222                 long res = (long) dreq->error;
223                 if (!res)
224                         res = (long) dreq->count;
225                 aio_complete(dreq->iocb, res, 0);
226         }
227         complete_all(&dreq->completion);
228
229         nfs_direct_req_release(dreq);
230 }
231
232 static void nfs_direct_readpage_release(struct nfs_page *req)
233 {
234         dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
235                 req->wb_context->dentry->d_inode->i_sb->s_id,
236                 (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
237                 req->wb_bytes,
238                 (long long)req_offset(req));
239         nfs_release_request(req);
240 }
241
242 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
243 {
244         unsigned long bytes = 0;
245         struct nfs_direct_req *dreq = hdr->dreq;
246
247         if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
248                 goto out_put;
249
250         spin_lock(&dreq->lock);
251         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
252                 dreq->error = hdr->error;
253         else
254                 dreq->count += hdr->good_bytes;
255         spin_unlock(&dreq->lock);
256
257         while (!list_empty(&hdr->pages)) {
258                 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
259                 struct page *page = req->wb_page;
260
261                 if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
262                         if (bytes > hdr->good_bytes)
263                                 zero_user(page, 0, PAGE_SIZE);
264                         else if (hdr->good_bytes - bytes < PAGE_SIZE)
265                                 zero_user_segment(page,
266                                         hdr->good_bytes & ~PAGE_MASK,
267                                         PAGE_SIZE);
268                 }
269                 if (!PageCompound(page)) {
270                         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
271                                 if (bytes < hdr->good_bytes)
272                                         set_page_dirty(page);
273                         } else
274                                 set_page_dirty(page);
275                 }
276                 bytes += req->wb_bytes;
277                 nfs_list_remove_request(req);
278                 nfs_direct_readpage_release(req);
279         }
280 out_put:
281         if (put_dreq(dreq))
282                 nfs_direct_complete(dreq);
283         hdr->release(hdr);
284 }
285
286 static void nfs_read_sync_pgio_error(struct list_head *head)
287 {
288         struct nfs_page *req;
289
290         while (!list_empty(head)) {
291                 req = nfs_list_entry(head->next);
292                 nfs_list_remove_request(req);
293                 nfs_release_request(req);
294         }
295 }
296
297 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
298 {
299         get_dreq(hdr->dreq);
300 }
301
302 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
303         .error_cleanup = nfs_read_sync_pgio_error,
304         .init_hdr = nfs_direct_pgio_init,
305         .completion = nfs_direct_read_completion,
306 };
307
308 /*
309  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
310  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
311  * bail and stop sending more reads.  Read length accounting is
312  * handled automatically by nfs_direct_read_result().  Otherwise, if
313  * no requests have been sent, just return an error.
314  */
315 static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
316                                                 const struct iovec *iov,
317                                                 loff_t pos, bool uio)
318 {
319         struct nfs_direct_req *dreq = desc->pg_dreq;
320         struct nfs_open_context *ctx = dreq->ctx;
321         struct inode *inode = ctx->dentry->d_inode;
322         unsigned long user_addr = (unsigned long)iov->iov_base;
323         size_t count = iov->iov_len;
324         size_t rsize = NFS_SERVER(inode)->rsize;
325         unsigned int pgbase;
326         int result;
327         ssize_t started = 0;
328         struct page **pagevec = NULL;
329         unsigned int npages;
330
331         do {
332                 size_t bytes;
333                 int i;
334
335                 pgbase = user_addr & ~PAGE_MASK;
336                 bytes = min(max_t(size_t, rsize, PAGE_SIZE), count);
337
338                 result = -ENOMEM;
339                 npages = nfs_page_array_len(pgbase, bytes);
340                 if (!pagevec)
341                         pagevec = kmalloc(npages * sizeof(struct page *),
342                                           GFP_KERNEL);
343                 if (!pagevec)
344                         break;
345                 if (uio) {
346                         down_read(&current->mm->mmap_sem);
347                         result = get_user_pages(current, current->mm, user_addr,
348                                         npages, 1, 0, pagevec, NULL);
349                         up_read(&current->mm->mmap_sem);
350                         if (result < 0)
351                                 break;
352                 } else {
353                         WARN_ON(npages != 1);
354                         result = get_kernel_page(user_addr, 1, pagevec);
355                         if (WARN_ON(result != 1))
356                                 break;
357                 }
358
359                 if ((unsigned)result < npages) {
360                         bytes = result * PAGE_SIZE;
361                         if (bytes <= pgbase) {
362                                 nfs_direct_release_pages(pagevec, result);
363                                 break;
364                         }
365                         bytes -= pgbase;
366                         npages = result;
367                 }
368
369                 for (i = 0; i < npages; i++) {
370                         struct nfs_page *req;
371                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
372                         /* XXX do we need to do the eof zeroing found in async_filler? */
373                         req = nfs_create_request(dreq->ctx, dreq->inode,
374                                                  pagevec[i],
375                                                  pgbase, req_len);
376                         if (IS_ERR(req)) {
377                                 result = PTR_ERR(req);
378                                 break;
379                         }
380                         req->wb_index = pos >> PAGE_SHIFT;
381                         req->wb_offset = pos & ~PAGE_MASK;
382                         if (!nfs_pageio_add_request(desc, req)) {
383                                 result = desc->pg_error;
384                                 nfs_release_request(req);
385                                 break;
386                         }
387                         pgbase = 0;
388                         bytes -= req_len;
389                         started += req_len;
390                         user_addr += req_len;
391                         pos += req_len;
392                         count -= req_len;
393                 }
394                 /* The nfs_page now hold references to these pages */
395                 nfs_direct_release_pages(pagevec, npages);
396         } while (count != 0 && result >= 0);
397
398         kfree(pagevec);
399
400         if (started)
401                 return started;
402         return result < 0 ? (ssize_t) result : -EFAULT;
403 }
404
405 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
406                                               const struct iovec *iov,
407                                               unsigned long nr_segs,
408                                               loff_t pos, bool uio)
409 {
410         struct nfs_pageio_descriptor desc;
411         ssize_t result = -EINVAL;
412         size_t requested_bytes = 0;
413         unsigned long seg;
414
415         NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
416                              &nfs_direct_read_completion_ops);
417         get_dreq(dreq);
418         desc.pg_dreq = dreq;
419
420         for (seg = 0; seg < nr_segs; seg++) {
421                 const struct iovec *vec = &iov[seg];
422                 result = nfs_direct_read_schedule_segment(&desc, vec, pos, uio);
423                 if (result < 0)
424                         break;
425                 requested_bytes += result;
426                 if ((size_t)result < vec->iov_len)
427                         break;
428                 pos += vec->iov_len;
429         }
430
431         nfs_pageio_complete(&desc);
432
433         /*
434          * If no bytes were started, return the error, and let the
435          * generic layer handle the completion.
436          */
437         if (requested_bytes == 0) {
438                 nfs_direct_req_release(dreq);
439                 return result < 0 ? result : -EIO;
440         }
441
442         if (put_dreq(dreq))
443                 nfs_direct_complete(dreq);
444         return 0;
445 }
446
447 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
448                                unsigned long nr_segs, loff_t pos, bool uio)
449 {
450         ssize_t result = -ENOMEM;
451         struct inode *inode = iocb->ki_filp->f_mapping->host;
452         struct nfs_direct_req *dreq;
453
454         dreq = nfs_direct_req_alloc();
455         if (dreq == NULL)
456                 goto out;
457
458         dreq->inode = inode;
459         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
460         dreq->l_ctx = nfs_get_lock_context(dreq->ctx);
461         if (dreq->l_ctx == NULL)
462                 goto out_release;
463         if (!is_sync_kiocb(iocb))
464                 dreq->iocb = iocb;
465
466         result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos, uio);
467         if (!result)
468                 result = nfs_direct_wait(dreq);
469         NFS_I(inode)->read_io += result;
470 out_release:
471         nfs_direct_req_release(dreq);
472 out:
473         return result;
474 }
475
476 static void nfs_inode_dio_write_done(struct inode *inode)
477 {
478         nfs_zap_mapping(inode, inode->i_mapping);
479         inode_dio_done(inode);
480 }
481
482 #if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
483 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
484 {
485         struct nfs_pageio_descriptor desc;
486         struct nfs_page *req, *tmp;
487         LIST_HEAD(reqs);
488         struct nfs_commit_info cinfo;
489         LIST_HEAD(failed);
490
491         nfs_init_cinfo_from_dreq(&cinfo, dreq);
492         pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
493         spin_lock(cinfo.lock);
494         nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
495         spin_unlock(cinfo.lock);
496
497         dreq->count = 0;
498         get_dreq(dreq);
499
500         NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
501                               &nfs_direct_write_completion_ops);
502         desc.pg_dreq = dreq;
503
504         list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
505                 if (!nfs_pageio_add_request(&desc, req)) {
506                         nfs_list_remove_request(req);
507                         nfs_list_add_request(req, &failed);
508                         spin_lock(cinfo.lock);
509                         dreq->flags = 0;
510                         dreq->error = -EIO;
511                         spin_unlock(cinfo.lock);
512                 }
513                 nfs_release_request(req);
514         }
515         nfs_pageio_complete(&desc);
516
517         while (!list_empty(&failed)) {
518                 req = nfs_list_entry(failed.next);
519                 nfs_list_remove_request(req);
520                 nfs_unlock_and_release_request(req);
521         }
522
523         if (put_dreq(dreq))
524                 nfs_direct_write_complete(dreq, dreq->inode);
525 }
526
527 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
528 {
529         struct nfs_direct_req *dreq = data->dreq;
530         struct nfs_commit_info cinfo;
531         struct nfs_page *req;
532         int status = data->task.tk_status;
533
534         nfs_init_cinfo_from_dreq(&cinfo, dreq);
535         if (status < 0) {
536                 dprintk("NFS: %5u commit failed with error %d.\n",
537                         data->task.tk_pid, status);
538                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
539         } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
540                 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
541                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
542         }
543
544         dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
545         while (!list_empty(&data->pages)) {
546                 req = nfs_list_entry(data->pages.next);
547                 nfs_list_remove_request(req);
548                 if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
549                         /* Note the rewrite will go through mds */
550                         nfs_mark_request_commit(req, NULL, &cinfo);
551                 } else
552                         nfs_release_request(req);
553                 nfs_unlock_and_release_request(req);
554         }
555
556         if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
557                 nfs_direct_write_complete(dreq, data->inode);
558 }
559
560 static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
561 {
562         /* There is no lock to clear */
563 }
564
565 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
566         .completion = nfs_direct_commit_complete,
567         .error_cleanup = nfs_direct_error_cleanup,
568 };
569
570 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
571 {
572         int res;
573         struct nfs_commit_info cinfo;
574         LIST_HEAD(mds_list);
575
576         nfs_init_cinfo_from_dreq(&cinfo, dreq);
577         nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
578         res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
579         if (res < 0) /* res == -ENOMEM */
580                 nfs_direct_write_reschedule(dreq);
581 }
582
583 static void nfs_direct_write_schedule_work(struct work_struct *work)
584 {
585         struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
586         int flags = dreq->flags;
587
588         dreq->flags = 0;
589         switch (flags) {
590                 case NFS_ODIRECT_DO_COMMIT:
591                         nfs_direct_commit_schedule(dreq);
592                         break;
593                 case NFS_ODIRECT_RESCHED_WRITES:
594                         nfs_direct_write_reschedule(dreq);
595                         break;
596                 default:
597                         nfs_inode_dio_write_done(dreq->inode);
598                         nfs_direct_complete(dreq);
599         }
600 }
601
602 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
603 {
604         schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
605 }
606
607 #else
608 static void nfs_direct_write_schedule_work(struct work_struct *work)
609 {
610 }
611
612 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
613 {
614         nfs_inode_dio_write_done(inode);
615         nfs_direct_complete(dreq);
616 }
617 #endif
618
619 /*
620  * NB: Return the value of the first error return code.  Subsequent
621  *     errors after the first one are ignored.
622  */
623 /*
624  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
625  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
626  * bail and stop sending more writes.  Write length accounting is
627  * handled automatically by nfs_direct_write_result().  Otherwise, if
628  * no requests have been sent, just return an error.
629  */
630 static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
631                                                  const struct iovec *iov,
632                                                  loff_t pos, bool uio)
633 {
634         struct nfs_direct_req *dreq = desc->pg_dreq;
635         struct nfs_open_context *ctx = dreq->ctx;
636         struct inode *inode = ctx->dentry->d_inode;
637         unsigned long user_addr = (unsigned long)iov->iov_base;
638         size_t count = iov->iov_len;
639         size_t wsize = NFS_SERVER(inode)->wsize;
640         unsigned int pgbase;
641         int result;
642         ssize_t started = 0;
643         struct page **pagevec = NULL;
644         unsigned int npages;
645
646         do {
647                 size_t bytes;
648                 int i;
649
650                 pgbase = user_addr & ~PAGE_MASK;
651                 bytes = min(max_t(size_t, wsize, PAGE_SIZE), count);
652
653                 result = -ENOMEM;
654                 npages = nfs_page_array_len(pgbase, bytes);
655                 if (!pagevec)
656                         pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
657                 if (!pagevec)
658                         break;
659
660                 if (uio) {
661                         down_read(&current->mm->mmap_sem);
662                         result = get_user_pages(current, current->mm, user_addr,
663                                                 npages, 0, 0, pagevec, NULL);
664                         up_read(&current->mm->mmap_sem);
665                         if (result < 0)
666                                 break;
667                 } else {
668                         WARN_ON(npages != 1);
669                         result = get_kernel_page(user_addr, 0, pagevec);
670                         if (WARN_ON(result != 1))
671                                 break;
672                 }
673
674                 if ((unsigned)result < npages) {
675                         bytes = result * PAGE_SIZE;
676                         if (bytes <= pgbase) {
677                                 nfs_direct_release_pages(pagevec, result);
678                                 break;
679                         }
680                         bytes -= pgbase;
681                         npages = result;
682                 }
683
684                 for (i = 0; i < npages; i++) {
685                         struct nfs_page *req;
686                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
687
688                         req = nfs_create_request(dreq->ctx, dreq->inode,
689                                                  pagevec[i],
690                                                  pgbase, req_len);
691                         if (IS_ERR(req)) {
692                                 result = PTR_ERR(req);
693                                 break;
694                         }
695                         nfs_lock_request(req);
696                         req->wb_index = pos >> PAGE_SHIFT;
697                         req->wb_offset = pos & ~PAGE_MASK;
698                         if (!nfs_pageio_add_request(desc, req)) {
699                                 result = desc->pg_error;
700                                 nfs_unlock_and_release_request(req);
701                                 break;
702                         }
703                         pgbase = 0;
704                         bytes -= req_len;
705                         started += req_len;
706                         user_addr += req_len;
707                         pos += req_len;
708                         count -= req_len;
709                 }
710                 /* The nfs_page now hold references to these pages */
711                 nfs_direct_release_pages(pagevec, npages);
712         } while (count != 0 && result >= 0);
713
714         kfree(pagevec);
715
716         if (started)
717                 return started;
718         return result < 0 ? (ssize_t) result : -EFAULT;
719 }
720
721 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
722 {
723         struct nfs_direct_req *dreq = hdr->dreq;
724         struct nfs_commit_info cinfo;
725         int bit = -1;
726         struct nfs_page *req = nfs_list_entry(hdr->pages.next);
727
728         if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
729                 goto out_put;
730
731         nfs_init_cinfo_from_dreq(&cinfo, dreq);
732
733         spin_lock(&dreq->lock);
734
735         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
736                 dreq->flags = 0;
737                 dreq->error = hdr->error;
738         }
739         if (dreq->error != 0)
740                 bit = NFS_IOHDR_ERROR;
741         else {
742                 dreq->count += hdr->good_bytes;
743                 if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
744                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
745                         bit = NFS_IOHDR_NEED_RESCHED;
746                 } else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
747                         if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
748                                 bit = NFS_IOHDR_NEED_RESCHED;
749                         else if (dreq->flags == 0) {
750                                 memcpy(&dreq->verf, hdr->verf,
751                                        sizeof(dreq->verf));
752                                 bit = NFS_IOHDR_NEED_COMMIT;
753                                 dreq->flags = NFS_ODIRECT_DO_COMMIT;
754                         } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
755                                 if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
756                                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
757                                         bit = NFS_IOHDR_NEED_RESCHED;
758                                 } else
759                                         bit = NFS_IOHDR_NEED_COMMIT;
760                         }
761                 }
762         }
763         spin_unlock(&dreq->lock);
764
765         while (!list_empty(&hdr->pages)) {
766                 req = nfs_list_entry(hdr->pages.next);
767                 nfs_list_remove_request(req);
768                 switch (bit) {
769                 case NFS_IOHDR_NEED_RESCHED:
770                 case NFS_IOHDR_NEED_COMMIT:
771                         kref_get(&req->wb_kref);
772                         nfs_mark_request_commit(req, hdr->lseg, &cinfo);
773                 }
774                 nfs_unlock_and_release_request(req);
775         }
776
777 out_put:
778         if (put_dreq(dreq))
779                 nfs_direct_write_complete(dreq, hdr->inode);
780         hdr->release(hdr);
781 }
782
783 static void nfs_write_sync_pgio_error(struct list_head *head)
784 {
785         struct nfs_page *req;
786
787         while (!list_empty(head)) {
788                 req = nfs_list_entry(head->next);
789                 nfs_list_remove_request(req);
790                 nfs_unlock_and_release_request(req);
791         }
792 }
793
794 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
795         .error_cleanup = nfs_write_sync_pgio_error,
796         .init_hdr = nfs_direct_pgio_init,
797         .completion = nfs_direct_write_completion,
798 };
799
800 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
801                                                const struct iovec *iov,
802                                                unsigned long nr_segs,
803                                                loff_t pos, bool uio)
804 {
805         struct nfs_pageio_descriptor desc;
806         struct inode *inode = dreq->inode;
807         ssize_t result = 0;
808         size_t requested_bytes = 0;
809         unsigned long seg;
810
811         NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
812                               &nfs_direct_write_completion_ops);
813         desc.pg_dreq = dreq;
814         get_dreq(dreq);
815         atomic_inc(&inode->i_dio_count);
816
817         for (seg = 0; seg < nr_segs; seg++) {
818                 const struct iovec *vec = &iov[seg];
819                 result = nfs_direct_write_schedule_segment(&desc, vec, pos, uio);
820                 if (result < 0)
821                         break;
822                 requested_bytes += result;
823                 if ((size_t)result < vec->iov_len)
824                         break;
825                 pos += vec->iov_len;
826         }
827         nfs_pageio_complete(&desc);
828         NFS_I(dreq->inode)->write_io += desc.pg_bytes_written;
829
830         /*
831          * If no bytes were started, return the error, and let the
832          * generic layer handle the completion.
833          */
834         if (requested_bytes == 0) {
835                 inode_dio_done(inode);
836                 nfs_direct_req_release(dreq);
837                 return result < 0 ? result : -EIO;
838         }
839
840         if (put_dreq(dreq))
841                 nfs_direct_write_complete(dreq, dreq->inode);
842         return 0;
843 }
844
845 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
846                                 unsigned long nr_segs, loff_t pos,
847                                 size_t count, bool uio)
848 {
849         ssize_t result = -ENOMEM;
850         struct inode *inode = iocb->ki_filp->f_mapping->host;
851         struct nfs_direct_req *dreq;
852
853         dreq = nfs_direct_req_alloc();
854         if (!dreq)
855                 goto out;
856
857         dreq->inode = inode;
858         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
859         dreq->l_ctx = nfs_get_lock_context(dreq->ctx);
860         if (dreq->l_ctx == NULL)
861                 goto out_release;
862         if (!is_sync_kiocb(iocb))
863                 dreq->iocb = iocb;
864
865         result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, uio);
866         if (!result)
867                 result = nfs_direct_wait(dreq);
868 out_release:
869         nfs_direct_req_release(dreq);
870 out:
871         return result;
872 }
873
874 /**
875  * nfs_file_direct_read - file direct read operation for NFS files
876  * @iocb: target I/O control block
877  * @iov: vector of user buffers into which to read data
878  * @nr_segs: size of iov vector
879  * @pos: byte offset in file where reading starts
880  *
881  * We use this function for direct reads instead of calling
882  * generic_file_aio_read() in order to avoid gfar's check to see if
883  * the request starts before the end of the file.  For that check
884  * to work, we must generate a GETATTR before each direct read, and
885  * even then there is a window between the GETATTR and the subsequent
886  * READ where the file size could change.  Our preference is simply
887  * to do all reads the application wants, and the server will take
888  * care of managing the end of file boundary.
889  *
890  * This function also eliminates unnecessarily updating the file's
891  * atime locally, as the NFS server sets the file's atime, and this
892  * client must read the updated atime from the server back into its
893  * cache.
894  */
895 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
896                                 unsigned long nr_segs, loff_t pos, bool uio)
897 {
898         ssize_t retval = -EINVAL;
899         struct file *file = iocb->ki_filp;
900         struct address_space *mapping = file->f_mapping;
901         size_t count;
902
903         count = iov_length(iov, nr_segs);
904         nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
905
906         dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n",
907                 file->f_path.dentry->d_parent->d_name.name,
908                 file->f_path.dentry->d_name.name,
909                 count, (long long) pos);
910
911         retval = 0;
912         if (!count)
913                 goto out;
914
915         retval = nfs_sync_mapping(mapping);
916         if (retval)
917                 goto out;
918
919         task_io_account_read(count);
920
921         retval = nfs_direct_read(iocb, iov, nr_segs, pos, uio);
922         if (retval > 0)
923                 iocb->ki_pos = pos + retval;
924
925 out:
926         return retval;
927 }
928
929 /**
930  * nfs_file_direct_write - file direct write operation for NFS files
931  * @iocb: target I/O control block
932  * @iov: vector of user buffers from which to write data
933  * @nr_segs: size of iov vector
934  * @pos: byte offset in file where writing starts
935  *
936  * We use this function for direct writes instead of calling
937  * generic_file_aio_write() in order to avoid taking the inode
938  * semaphore and updating the i_size.  The NFS server will set
939  * the new i_size and this client must read the updated size
940  * back into its cache.  We let the server do generic write
941  * parameter checking and report problems.
942  *
943  * We eliminate local atime updates, see direct read above.
944  *
945  * We avoid unnecessary page cache invalidations for normal cached
946  * readers of this file.
947  *
948  * Note that O_APPEND is not supported for NFS direct writes, as there
949  * is no atomic O_APPEND write facility in the NFS protocol.
950  */
951 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
952                                 unsigned long nr_segs, loff_t pos, bool uio)
953 {
954         ssize_t retval = -EINVAL;
955         struct file *file = iocb->ki_filp;
956         struct address_space *mapping = file->f_mapping;
957         size_t count;
958
959         count = iov_length(iov, nr_segs);
960         nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
961
962         dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n",
963                 file->f_path.dentry->d_parent->d_name.name,
964                 file->f_path.dentry->d_name.name,
965                 count, (long long) pos);
966
967         retval = generic_write_checks(file, &pos, &count, 0);
968         if (retval)
969                 goto out;
970
971         retval = -EINVAL;
972         if ((ssize_t) count < 0)
973                 goto out;
974         retval = 0;
975         if (!count)
976                 goto out;
977
978         retval = nfs_sync_mapping(mapping);
979         if (retval)
980                 goto out;
981
982         task_io_account_write(count);
983
984         retval = nfs_direct_write(iocb, iov, nr_segs, pos, count, uio);
985         if (retval > 0) {
986                 struct inode *inode = mapping->host;
987
988                 iocb->ki_pos = pos + retval;
989                 spin_lock(&inode->i_lock);
990                 if (i_size_read(inode) < iocb->ki_pos)
991                         i_size_write(inode, iocb->ki_pos);
992                 spin_unlock(&inode->i_lock);
993         }
994 out:
995         return retval;
996 }
997
998 /**
999  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1000  *
1001  */
1002 int __init nfs_init_directcache(void)
1003 {
1004         nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1005                                                 sizeof(struct nfs_direct_req),
1006                                                 0, (SLAB_RECLAIM_ACCOUNT|
1007                                                         SLAB_MEM_SPREAD),
1008                                                 NULL);
1009         if (nfs_direct_cachep == NULL)
1010                 return -ENOMEM;
1011
1012         return 0;
1013 }
1014
1015 /**
1016  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1017  *
1018  */
1019 void nfs_destroy_directcache(void)
1020 {
1021         kmem_cache_destroy(nfs_direct_cachep);
1022 }