cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

direct.c (27218B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/*
      3 * linux/fs/nfs/direct.c
      4 *
      5 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
      6 *
      7 * High-performance uncached I/O for the Linux NFS client
      8 *
      9 * There are important applications whose performance or correctness
     10 * depends on uncached access to file data.  Database clusters
     11 * (multiple copies of the same instance running on separate hosts)
     12 * implement their own cache coherency protocol that subsumes file
     13 * system cache protocols.  Applications that process datasets
     14 * considerably larger than the client's memory do not always benefit
     15 * from a local cache.  A streaming video server, for instance, has no
     16 * need to cache the contents of a file.
     17 *
     18 * When an application requests uncached I/O, all read and write requests
     19 * are made directly to the server; data stored or fetched via these
     20 * requests is not cached in the Linux page cache.  The client does not
     21 * correct unaligned requests from applications.  All requested bytes are
     22 * held on permanent storage before a direct write system call returns to
     23 * an application.
     24 *
     25 * Solaris implements an uncached I/O facility called directio() that
     26 * is used for backups and sequential I/O to very large files.  Solaris
     27 * also supports uncaching whole NFS partitions with "-o forcedirectio,"
     28 * an undocumented mount option.
     29 *
     30 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
     31 * help from Andrew Morton.
     32 *
     33 * 18 Dec 2001	Initial implementation for 2.4  --cel
     34 * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
     35 * 08 Jun 2003	Port to 2.5 APIs  --cel
     36 * 31 Mar 2004	Handle direct I/O without VFS support  --cel
     37 * 15 Sep 2004	Parallel async reads  --cel
     38 * 04 May 2005	support O_DIRECT with aio  --cel
     39 *
     40 */
     41
     42#include <linux/errno.h>
     43#include <linux/sched.h>
     44#include <linux/kernel.h>
     45#include <linux/file.h>
     46#include <linux/pagemap.h>
     47#include <linux/kref.h>
     48#include <linux/slab.h>
     49#include <linux/task_io_accounting_ops.h>
     50#include <linux/module.h>
     51
     52#include <linux/nfs_fs.h>
     53#include <linux/nfs_page.h>
     54#include <linux/sunrpc/clnt.h>
     55
     56#include <linux/uaccess.h>
     57#include <linux/atomic.h>
     58
     59#include "internal.h"
     60#include "iostat.h"
     61#include "pnfs.h"
     62#include "fscache.h"
     63
     64#define NFSDBG_FACILITY		NFSDBG_VFS
     65
     66static struct kmem_cache *nfs_direct_cachep;
     67
     68struct nfs_direct_req {
     69	struct kref		kref;		/* release manager */
     70
     71	/* I/O parameters */
     72	struct nfs_open_context	*ctx;		/* file open context info */
     73	struct nfs_lock_context *l_ctx;		/* Lock context info */
     74	struct kiocb *		iocb;		/* controlling i/o request */
     75	struct inode *		inode;		/* target file of i/o */
     76
     77	/* completion state */
     78	atomic_t		io_count;	/* i/os we're waiting for */
     79	spinlock_t		lock;		/* protect completion state */
     80
     81	loff_t			io_start;	/* Start offset for I/O */
     82	ssize_t			count,		/* bytes actually processed */
     83				max_count,	/* max expected count */
     84				bytes_left,	/* bytes left to be sent */
     85				error;		/* any reported error */
     86	struct completion	completion;	/* wait for i/o completion */
     87
     88	/* commit state */
     89	struct nfs_mds_commit_info mds_cinfo;	/* Storage for cinfo */
     90	struct pnfs_ds_commit_info ds_cinfo;	/* Storage for cinfo */
     91	struct work_struct	work;
     92	int			flags;
     93	/* for write */
     94#define NFS_ODIRECT_DO_COMMIT		(1)	/* an unstable reply was received */
     95#define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
     96	/* for read */
     97#define NFS_ODIRECT_SHOULD_DIRTY	(3)	/* dirty user-space page after read */
     98#define NFS_ODIRECT_DONE		INT_MAX	/* write verification failed */
     99};
    100
    101static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
    102static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
    103static void nfs_direct_write_complete(struct nfs_direct_req *dreq);
    104static void nfs_direct_write_schedule_work(struct work_struct *work);
    105
    106static inline void get_dreq(struct nfs_direct_req *dreq)
    107{
    108	atomic_inc(&dreq->io_count);
    109}
    110
    111static inline int put_dreq(struct nfs_direct_req *dreq)
    112{
    113	return atomic_dec_and_test(&dreq->io_count);
    114}
    115
    116static void
    117nfs_direct_handle_truncated(struct nfs_direct_req *dreq,
    118			    const struct nfs_pgio_header *hdr,
    119			    ssize_t dreq_len)
    120{
    121	if (!(test_bit(NFS_IOHDR_ERROR, &hdr->flags) ||
    122	      test_bit(NFS_IOHDR_EOF, &hdr->flags)))
    123		return;
    124	if (dreq->max_count >= dreq_len) {
    125		dreq->max_count = dreq_len;
    126		if (dreq->count > dreq_len)
    127			dreq->count = dreq_len;
    128
    129		if (test_bit(NFS_IOHDR_ERROR, &hdr->flags))
    130			dreq->error = hdr->error;
    131		else /* Clear outstanding error if this is EOF */
    132			dreq->error = 0;
    133	}
    134}
    135
    136static void
    137nfs_direct_count_bytes(struct nfs_direct_req *dreq,
    138		       const struct nfs_pgio_header *hdr)
    139{
    140	loff_t hdr_end = hdr->io_start + hdr->good_bytes;
    141	ssize_t dreq_len = 0;
    142
    143	if (hdr_end > dreq->io_start)
    144		dreq_len = hdr_end - dreq->io_start;
    145
    146	nfs_direct_handle_truncated(dreq, hdr, dreq_len);
    147
    148	if (dreq_len > dreq->max_count)
    149		dreq_len = dreq->max_count;
    150
    151	if (dreq->count < dreq_len)
    152		dreq->count = dreq_len;
    153}
    154
    155/**
    156 * nfs_swap_rw - NFS address space operation for swap I/O
    157 * @iocb: target I/O control block
    158 * @iter: I/O buffer
    159 *
    160 * Perform IO to the swap-file.  This is much like direct IO.
    161 */
    162int nfs_swap_rw(struct kiocb *iocb, struct iov_iter *iter)
    163{
    164	ssize_t ret;
    165
    166	VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
    167
    168	if (iov_iter_rw(iter) == READ)
    169		ret = nfs_file_direct_read(iocb, iter, true);
    170	else
    171		ret = nfs_file_direct_write(iocb, iter, true);
    172	if (ret < 0)
    173		return ret;
    174	return 0;
    175}
    176
    177static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
    178{
    179	unsigned int i;
    180	for (i = 0; i < npages; i++)
    181		put_page(pages[i]);
    182}
    183
    184void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
    185			      struct nfs_direct_req *dreq)
    186{
    187	cinfo->inode = dreq->inode;
    188	cinfo->mds = &dreq->mds_cinfo;
    189	cinfo->ds = &dreq->ds_cinfo;
    190	cinfo->dreq = dreq;
    191	cinfo->completion_ops = &nfs_direct_commit_completion_ops;
    192}
    193
    194static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
    195{
    196	struct nfs_direct_req *dreq;
    197
    198	dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
    199	if (!dreq)
    200		return NULL;
    201
    202	kref_init(&dreq->kref);
    203	kref_get(&dreq->kref);
    204	init_completion(&dreq->completion);
    205	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
    206	pnfs_init_ds_commit_info(&dreq->ds_cinfo);
    207	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
    208	spin_lock_init(&dreq->lock);
    209
    210	return dreq;
    211}
    212
    213static void nfs_direct_req_free(struct kref *kref)
    214{
    215	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
    216
    217	pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
    218	if (dreq->l_ctx != NULL)
    219		nfs_put_lock_context(dreq->l_ctx);
    220	if (dreq->ctx != NULL)
    221		put_nfs_open_context(dreq->ctx);
    222	kmem_cache_free(nfs_direct_cachep, dreq);
    223}
    224
    225static void nfs_direct_req_release(struct nfs_direct_req *dreq)
    226{
    227	kref_put(&dreq->kref, nfs_direct_req_free);
    228}
    229
    230ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
    231{
    232	return dreq->bytes_left;
    233}
    234EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
    235
    236/*
    237 * Collects and returns the final error value/byte-count.
    238 */
    239static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
    240{
    241	ssize_t result = -EIOCBQUEUED;
    242
    243	/* Async requests don't wait here */
    244	if (dreq->iocb)
    245		goto out;
    246
    247	result = wait_for_completion_killable(&dreq->completion);
    248
    249	if (!result) {
    250		result = dreq->count;
    251		WARN_ON_ONCE(dreq->count < 0);
    252	}
    253	if (!result)
    254		result = dreq->error;
    255
    256out:
    257	return (ssize_t) result;
    258}
    259
    260/*
    261 * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
    262 * the iocb is still valid here if this is a synchronous request.
    263 */
    264static void nfs_direct_complete(struct nfs_direct_req *dreq)
    265{
    266	struct inode *inode = dreq->inode;
    267
    268	inode_dio_end(inode);
    269
    270	if (dreq->iocb) {
    271		long res = (long) dreq->error;
    272		if (dreq->count != 0) {
    273			res = (long) dreq->count;
    274			WARN_ON_ONCE(dreq->count < 0);
    275		}
    276		dreq->iocb->ki_complete(dreq->iocb, res);
    277	}
    278
    279	complete(&dreq->completion);
    280
    281	nfs_direct_req_release(dreq);
    282}
    283
    284static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
    285{
    286	unsigned long bytes = 0;
    287	struct nfs_direct_req *dreq = hdr->dreq;
    288
    289	spin_lock(&dreq->lock);
    290	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
    291		spin_unlock(&dreq->lock);
    292		goto out_put;
    293	}
    294
    295	nfs_direct_count_bytes(dreq, hdr);
    296	spin_unlock(&dreq->lock);
    297
    298	while (!list_empty(&hdr->pages)) {
    299		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
    300		struct page *page = req->wb_page;
    301
    302		if (!PageCompound(page) && bytes < hdr->good_bytes &&
    303		    (dreq->flags == NFS_ODIRECT_SHOULD_DIRTY))
    304			set_page_dirty(page);
    305		bytes += req->wb_bytes;
    306		nfs_list_remove_request(req);
    307		nfs_release_request(req);
    308	}
    309out_put:
    310	if (put_dreq(dreq))
    311		nfs_direct_complete(dreq);
    312	hdr->release(hdr);
    313}
    314
    315static void nfs_read_sync_pgio_error(struct list_head *head, int error)
    316{
    317	struct nfs_page *req;
    318
    319	while (!list_empty(head)) {
    320		req = nfs_list_entry(head->next);
    321		nfs_list_remove_request(req);
    322		nfs_release_request(req);
    323	}
    324}
    325
    326static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
    327{
    328	get_dreq(hdr->dreq);
    329}
    330
    331static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
    332	.error_cleanup = nfs_read_sync_pgio_error,
    333	.init_hdr = nfs_direct_pgio_init,
    334	.completion = nfs_direct_read_completion,
    335};
    336
    337/*
    338 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
    339 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
    340 * bail and stop sending more reads.  Read length accounting is
    341 * handled automatically by nfs_direct_read_result().  Otherwise, if
    342 * no requests have been sent, just return an error.
    343 */
    344
    345static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
    346					      struct iov_iter *iter,
    347					      loff_t pos)
    348{
    349	struct nfs_pageio_descriptor desc;
    350	struct inode *inode = dreq->inode;
    351	ssize_t result = -EINVAL;
    352	size_t requested_bytes = 0;
    353	size_t rsize = max_t(size_t, NFS_SERVER(inode)->rsize, PAGE_SIZE);
    354
    355	nfs_pageio_init_read(&desc, dreq->inode, false,
    356			     &nfs_direct_read_completion_ops);
    357	get_dreq(dreq);
    358	desc.pg_dreq = dreq;
    359	inode_dio_begin(inode);
    360
    361	while (iov_iter_count(iter)) {
    362		struct page **pagevec;
    363		size_t bytes;
    364		size_t pgbase;
    365		unsigned npages, i;
    366
    367		result = iov_iter_get_pages_alloc(iter, &pagevec, 
    368						  rsize, &pgbase);
    369		if (result < 0)
    370			break;
    371	
    372		bytes = result;
    373		iov_iter_advance(iter, bytes);
    374		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
    375		for (i = 0; i < npages; i++) {
    376			struct nfs_page *req;
    377			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
    378			/* XXX do we need to do the eof zeroing found in async_filler? */
    379			req = nfs_create_request(dreq->ctx, pagevec[i],
    380						 pgbase, req_len);
    381			if (IS_ERR(req)) {
    382				result = PTR_ERR(req);
    383				break;
    384			}
    385			req->wb_index = pos >> PAGE_SHIFT;
    386			req->wb_offset = pos & ~PAGE_MASK;
    387			if (!nfs_pageio_add_request(&desc, req)) {
    388				result = desc.pg_error;
    389				nfs_release_request(req);
    390				break;
    391			}
    392			pgbase = 0;
    393			bytes -= req_len;
    394			requested_bytes += req_len;
    395			pos += req_len;
    396			dreq->bytes_left -= req_len;
    397		}
    398		nfs_direct_release_pages(pagevec, npages);
    399		kvfree(pagevec);
    400		if (result < 0)
    401			break;
    402	}
    403
    404	nfs_pageio_complete(&desc);
    405
    406	/*
    407	 * If no bytes were started, return the error, and let the
    408	 * generic layer handle the completion.
    409	 */
    410	if (requested_bytes == 0) {
    411		inode_dio_end(inode);
    412		nfs_direct_req_release(dreq);
    413		return result < 0 ? result : -EIO;
    414	}
    415
    416	if (put_dreq(dreq))
    417		nfs_direct_complete(dreq);
    418	return requested_bytes;
    419}
    420
    421/**
    422 * nfs_file_direct_read - file direct read operation for NFS files
    423 * @iocb: target I/O control block
    424 * @iter: vector of user buffers into which to read data
    425 * @swap: flag indicating this is swap IO, not O_DIRECT IO
    426 *
    427 * We use this function for direct reads instead of calling
    428 * generic_file_aio_read() in order to avoid gfar's check to see if
    429 * the request starts before the end of the file.  For that check
    430 * to work, we must generate a GETATTR before each direct read, and
    431 * even then there is a window between the GETATTR and the subsequent
    432 * READ where the file size could change.  Our preference is simply
    433 * to do all reads the application wants, and the server will take
    434 * care of managing the end of file boundary.
    435 *
    436 * This function also eliminates unnecessarily updating the file's
    437 * atime locally, as the NFS server sets the file's atime, and this
    438 * client must read the updated atime from the server back into its
    439 * cache.
    440 */
    441ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
    442			     bool swap)
    443{
    444	struct file *file = iocb->ki_filp;
    445	struct address_space *mapping = file->f_mapping;
    446	struct inode *inode = mapping->host;
    447	struct nfs_direct_req *dreq;
    448	struct nfs_lock_context *l_ctx;
    449	ssize_t result, requested;
    450	size_t count = iov_iter_count(iter);
    451	nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
    452
    453	dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
    454		file, count, (long long) iocb->ki_pos);
    455
    456	result = 0;
    457	if (!count)
    458		goto out;
    459
    460	task_io_account_read(count);
    461
    462	result = -ENOMEM;
    463	dreq = nfs_direct_req_alloc();
    464	if (dreq == NULL)
    465		goto out;
    466
    467	dreq->inode = inode;
    468	dreq->bytes_left = dreq->max_count = count;
    469	dreq->io_start = iocb->ki_pos;
    470	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
    471	l_ctx = nfs_get_lock_context(dreq->ctx);
    472	if (IS_ERR(l_ctx)) {
    473		result = PTR_ERR(l_ctx);
    474		nfs_direct_req_release(dreq);
    475		goto out_release;
    476	}
    477	dreq->l_ctx = l_ctx;
    478	if (!is_sync_kiocb(iocb))
    479		dreq->iocb = iocb;
    480
    481	if (iter_is_iovec(iter))
    482		dreq->flags = NFS_ODIRECT_SHOULD_DIRTY;
    483
    484	if (!swap)
    485		nfs_start_io_direct(inode);
    486
    487	NFS_I(inode)->read_io += count;
    488	requested = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
    489
    490	if (!swap)
    491		nfs_end_io_direct(inode);
    492
    493	if (requested > 0) {
    494		result = nfs_direct_wait(dreq);
    495		if (result > 0) {
    496			requested -= result;
    497			iocb->ki_pos += result;
    498		}
    499		iov_iter_revert(iter, requested);
    500	} else {
    501		result = requested;
    502	}
    503
    504out_release:
    505	nfs_direct_req_release(dreq);
    506out:
    507	return result;
    508}
    509
    510static void
    511nfs_direct_join_group(struct list_head *list, struct inode *inode)
    512{
    513	struct nfs_page *req, *next;
    514
    515	list_for_each_entry(req, list, wb_list) {
    516		if (req->wb_head != req || req->wb_this_page == req)
    517			continue;
    518		for (next = req->wb_this_page;
    519				next != req->wb_head;
    520				next = next->wb_this_page) {
    521			nfs_list_remove_request(next);
    522			nfs_release_request(next);
    523		}
    524		nfs_join_page_group(req, inode);
    525	}
    526}
    527
    528static void
    529nfs_direct_write_scan_commit_list(struct inode *inode,
    530				  struct list_head *list,
    531				  struct nfs_commit_info *cinfo)
    532{
    533	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
    534	pnfs_recover_commit_reqs(list, cinfo);
    535	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
    536	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
    537}
    538
    539static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
    540{
    541	struct nfs_pageio_descriptor desc;
    542	struct nfs_page *req, *tmp;
    543	LIST_HEAD(reqs);
    544	struct nfs_commit_info cinfo;
    545	LIST_HEAD(failed);
    546
    547	nfs_init_cinfo_from_dreq(&cinfo, dreq);
    548	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
    549
    550	nfs_direct_join_group(&reqs, dreq->inode);
    551
    552	dreq->count = 0;
    553	dreq->max_count = 0;
    554	list_for_each_entry(req, &reqs, wb_list)
    555		dreq->max_count += req->wb_bytes;
    556	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
    557	get_dreq(dreq);
    558
    559	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
    560			      &nfs_direct_write_completion_ops);
    561	desc.pg_dreq = dreq;
    562
    563	list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
    564		/* Bump the transmission count */
    565		req->wb_nio++;
    566		if (!nfs_pageio_add_request(&desc, req)) {
    567			nfs_list_move_request(req, &failed);
    568			spin_lock(&cinfo.inode->i_lock);
    569			dreq->flags = 0;
    570			if (desc.pg_error < 0)
    571				dreq->error = desc.pg_error;
    572			else
    573				dreq->error = -EIO;
    574			spin_unlock(&cinfo.inode->i_lock);
    575		}
    576		nfs_release_request(req);
    577	}
    578	nfs_pageio_complete(&desc);
    579
    580	while (!list_empty(&failed)) {
    581		req = nfs_list_entry(failed.next);
    582		nfs_list_remove_request(req);
    583		nfs_unlock_and_release_request(req);
    584	}
    585
    586	if (put_dreq(dreq))
    587		nfs_direct_write_complete(dreq);
    588}
    589
    590static void nfs_direct_commit_complete(struct nfs_commit_data *data)
    591{
    592	const struct nfs_writeverf *verf = data->res.verf;
    593	struct nfs_direct_req *dreq = data->dreq;
    594	struct nfs_commit_info cinfo;
    595	struct nfs_page *req;
    596	int status = data->task.tk_status;
    597
    598	if (status < 0) {
    599		/* Errors in commit are fatal */
    600		dreq->error = status;
    601		dreq->max_count = 0;
    602		dreq->count = 0;
    603		dreq->flags = NFS_ODIRECT_DONE;
    604	} else if (dreq->flags == NFS_ODIRECT_DONE)
    605		status = dreq->error;
    606
    607	nfs_init_cinfo_from_dreq(&cinfo, dreq);
    608
    609	while (!list_empty(&data->pages)) {
    610		req = nfs_list_entry(data->pages.next);
    611		nfs_list_remove_request(req);
    612		if (status >= 0 && !nfs_write_match_verf(verf, req)) {
    613			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
    614			/*
    615			 * Despite the reboot, the write was successful,
    616			 * so reset wb_nio.
    617			 */
    618			req->wb_nio = 0;
    619			nfs_mark_request_commit(req, NULL, &cinfo, 0);
    620		} else /* Error or match */
    621			nfs_release_request(req);
    622		nfs_unlock_and_release_request(req);
    623	}
    624
    625	if (nfs_commit_end(cinfo.mds))
    626		nfs_direct_write_complete(dreq);
    627}
    628
    629static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
    630		struct nfs_page *req)
    631{
    632	struct nfs_direct_req *dreq = cinfo->dreq;
    633
    634	spin_lock(&dreq->lock);
    635	if (dreq->flags != NFS_ODIRECT_DONE)
    636		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
    637	spin_unlock(&dreq->lock);
    638	nfs_mark_request_commit(req, NULL, cinfo, 0);
    639}
    640
    641static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
    642	.completion = nfs_direct_commit_complete,
    643	.resched_write = nfs_direct_resched_write,
    644};
    645
    646static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
    647{
    648	int res;
    649	struct nfs_commit_info cinfo;
    650	LIST_HEAD(mds_list);
    651
    652	nfs_init_cinfo_from_dreq(&cinfo, dreq);
    653	nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
    654	res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
    655	if (res < 0) /* res == -ENOMEM */
    656		nfs_direct_write_reschedule(dreq);
    657}
    658
    659static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
    660{
    661	struct nfs_commit_info cinfo;
    662	struct nfs_page *req;
    663	LIST_HEAD(reqs);
    664
    665	nfs_init_cinfo_from_dreq(&cinfo, dreq);
    666	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
    667
    668	while (!list_empty(&reqs)) {
    669		req = nfs_list_entry(reqs.next);
    670		nfs_list_remove_request(req);
    671		nfs_release_request(req);
    672		nfs_unlock_and_release_request(req);
    673	}
    674}
    675
    676static void nfs_direct_write_schedule_work(struct work_struct *work)
    677{
    678	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
    679	int flags = dreq->flags;
    680
    681	dreq->flags = 0;
    682	switch (flags) {
    683		case NFS_ODIRECT_DO_COMMIT:
    684			nfs_direct_commit_schedule(dreq);
    685			break;
    686		case NFS_ODIRECT_RESCHED_WRITES:
    687			nfs_direct_write_reschedule(dreq);
    688			break;
    689		default:
    690			nfs_direct_write_clear_reqs(dreq);
    691			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
    692			nfs_direct_complete(dreq);
    693	}
    694}
    695
    696static void nfs_direct_write_complete(struct nfs_direct_req *dreq)
    697{
    698	queue_work(nfsiod_workqueue, &dreq->work); /* Calls nfs_direct_write_schedule_work */
    699}
    700
    701static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
    702{
    703	struct nfs_direct_req *dreq = hdr->dreq;
    704	struct nfs_commit_info cinfo;
    705	struct nfs_page *req = nfs_list_entry(hdr->pages.next);
    706	int flags = NFS_ODIRECT_DONE;
    707
    708	nfs_init_cinfo_from_dreq(&cinfo, dreq);
    709
    710	spin_lock(&dreq->lock);
    711	if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) {
    712		spin_unlock(&dreq->lock);
    713		goto out_put;
    714	}
    715
    716	nfs_direct_count_bytes(dreq, hdr);
    717	if (hdr->good_bytes != 0 && nfs_write_need_commit(hdr)) {
    718		if (!dreq->flags)
    719			dreq->flags = NFS_ODIRECT_DO_COMMIT;
    720		flags = dreq->flags;
    721	}
    722	spin_unlock(&dreq->lock);
    723
    724	while (!list_empty(&hdr->pages)) {
    725
    726		req = nfs_list_entry(hdr->pages.next);
    727		nfs_list_remove_request(req);
    728		if (flags == NFS_ODIRECT_DO_COMMIT) {
    729			kref_get(&req->wb_kref);
    730			memcpy(&req->wb_verf, &hdr->verf.verifier,
    731			       sizeof(req->wb_verf));
    732			nfs_mark_request_commit(req, hdr->lseg, &cinfo,
    733				hdr->ds_commit_idx);
    734		} else if (flags == NFS_ODIRECT_RESCHED_WRITES) {
    735			kref_get(&req->wb_kref);
    736			nfs_mark_request_commit(req, NULL, &cinfo, 0);
    737		}
    738		nfs_unlock_and_release_request(req);
    739	}
    740
    741out_put:
    742	if (put_dreq(dreq))
    743		nfs_direct_write_complete(dreq);
    744	hdr->release(hdr);
    745}
    746
    747static void nfs_write_sync_pgio_error(struct list_head *head, int error)
    748{
    749	struct nfs_page *req;
    750
    751	while (!list_empty(head)) {
    752		req = nfs_list_entry(head->next);
    753		nfs_list_remove_request(req);
    754		nfs_unlock_and_release_request(req);
    755	}
    756}
    757
    758static void nfs_direct_write_reschedule_io(struct nfs_pgio_header *hdr)
    759{
    760	struct nfs_direct_req *dreq = hdr->dreq;
    761
    762	spin_lock(&dreq->lock);
    763	if (dreq->error == 0) {
    764		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
    765		/* fake unstable write to let common nfs resend pages */
    766		hdr->verf.committed = NFS_UNSTABLE;
    767		hdr->good_bytes = hdr->args.offset + hdr->args.count -
    768			hdr->io_start;
    769	}
    770	spin_unlock(&dreq->lock);
    771}
    772
    773static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
    774	.error_cleanup = nfs_write_sync_pgio_error,
    775	.init_hdr = nfs_direct_pgio_init,
    776	.completion = nfs_direct_write_completion,
    777	.reschedule_io = nfs_direct_write_reschedule_io,
    778};
    779
    780
    781/*
    782 * NB: Return the value of the first error return code.  Subsequent
    783 *     errors after the first one are ignored.
    784 */
    785/*
    786 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
    787 * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
    788 * bail and stop sending more writes.  Write length accounting is
    789 * handled automatically by nfs_direct_write_result().  Otherwise, if
    790 * no requests have been sent, just return an error.
    791 */
    792static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
    793					       struct iov_iter *iter,
    794					       loff_t pos, int ioflags)
    795{
    796	struct nfs_pageio_descriptor desc;
    797	struct inode *inode = dreq->inode;
    798	ssize_t result = 0;
    799	size_t requested_bytes = 0;
    800	size_t wsize = max_t(size_t, NFS_SERVER(inode)->wsize, PAGE_SIZE);
    801
    802	nfs_pageio_init_write(&desc, inode, ioflags, false,
    803			      &nfs_direct_write_completion_ops);
    804	desc.pg_dreq = dreq;
    805	get_dreq(dreq);
    806	inode_dio_begin(inode);
    807
    808	NFS_I(inode)->write_io += iov_iter_count(iter);
    809	while (iov_iter_count(iter)) {
    810		struct page **pagevec;
    811		size_t bytes;
    812		size_t pgbase;
    813		unsigned npages, i;
    814
    815		result = iov_iter_get_pages_alloc(iter, &pagevec, 
    816						  wsize, &pgbase);
    817		if (result < 0)
    818			break;
    819
    820		bytes = result;
    821		iov_iter_advance(iter, bytes);
    822		npages = (result + pgbase + PAGE_SIZE - 1) / PAGE_SIZE;
    823		for (i = 0; i < npages; i++) {
    824			struct nfs_page *req;
    825			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
    826
    827			req = nfs_create_request(dreq->ctx, pagevec[i],
    828						 pgbase, req_len);
    829			if (IS_ERR(req)) {
    830				result = PTR_ERR(req);
    831				break;
    832			}
    833
    834			if (desc.pg_error < 0) {
    835				nfs_free_request(req);
    836				result = desc.pg_error;
    837				break;
    838			}
    839
    840			nfs_lock_request(req);
    841			req->wb_index = pos >> PAGE_SHIFT;
    842			req->wb_offset = pos & ~PAGE_MASK;
    843			if (!nfs_pageio_add_request(&desc, req)) {
    844				result = desc.pg_error;
    845				nfs_unlock_and_release_request(req);
    846				break;
    847			}
    848			pgbase = 0;
    849			bytes -= req_len;
    850			requested_bytes += req_len;
    851			pos += req_len;
    852			dreq->bytes_left -= req_len;
    853		}
    854		nfs_direct_release_pages(pagevec, npages);
    855		kvfree(pagevec);
    856		if (result < 0)
    857			break;
    858	}
    859	nfs_pageio_complete(&desc);
    860
    861	/*
    862	 * If no bytes were started, return the error, and let the
    863	 * generic layer handle the completion.
    864	 */
    865	if (requested_bytes == 0) {
    866		inode_dio_end(inode);
    867		nfs_direct_req_release(dreq);
    868		return result < 0 ? result : -EIO;
    869	}
    870
    871	if (put_dreq(dreq))
    872		nfs_direct_write_complete(dreq);
    873	return requested_bytes;
    874}
    875
    876/**
    877 * nfs_file_direct_write - file direct write operation for NFS files
    878 * @iocb: target I/O control block
    879 * @iter: vector of user buffers from which to write data
    880 * @swap: flag indicating this is swap IO, not O_DIRECT IO
    881 *
    882 * We use this function for direct writes instead of calling
    883 * generic_file_aio_write() in order to avoid taking the inode
    884 * semaphore and updating the i_size.  The NFS server will set
    885 * the new i_size and this client must read the updated size
    886 * back into its cache.  We let the server do generic write
    887 * parameter checking and report problems.
    888 *
    889 * We eliminate local atime updates, see direct read above.
    890 *
    891 * We avoid unnecessary page cache invalidations for normal cached
    892 * readers of this file.
    893 *
    894 * Note that O_APPEND is not supported for NFS direct writes, as there
    895 * is no atomic O_APPEND write facility in the NFS protocol.
    896 */
    897ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter,
    898			      bool swap)
    899{
    900	ssize_t result, requested;
    901	size_t count;
    902	struct file *file = iocb->ki_filp;
    903	struct address_space *mapping = file->f_mapping;
    904	struct inode *inode = mapping->host;
    905	struct nfs_direct_req *dreq;
    906	struct nfs_lock_context *l_ctx;
    907	loff_t pos, end;
    908
    909	dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
    910		file, iov_iter_count(iter), (long long) iocb->ki_pos);
    911
    912	if (swap)
    913		/* bypass generic checks */
    914		result =  iov_iter_count(iter);
    915	else
    916		result = generic_write_checks(iocb, iter);
    917	if (result <= 0)
    918		return result;
    919	count = result;
    920	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
    921
    922	pos = iocb->ki_pos;
    923	end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
    924
    925	task_io_account_write(count);
    926
    927	result = -ENOMEM;
    928	dreq = nfs_direct_req_alloc();
    929	if (!dreq)
    930		goto out;
    931
    932	dreq->inode = inode;
    933	dreq->bytes_left = dreq->max_count = count;
    934	dreq->io_start = pos;
    935	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
    936	l_ctx = nfs_get_lock_context(dreq->ctx);
    937	if (IS_ERR(l_ctx)) {
    938		result = PTR_ERR(l_ctx);
    939		nfs_direct_req_release(dreq);
    940		goto out_release;
    941	}
    942	dreq->l_ctx = l_ctx;
    943	if (!is_sync_kiocb(iocb))
    944		dreq->iocb = iocb;
    945	pnfs_init_ds_commit_info_ops(&dreq->ds_cinfo, inode);
    946
    947	if (swap) {
    948		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
    949							    FLUSH_STABLE);
    950	} else {
    951		nfs_start_io_direct(inode);
    952
    953		requested = nfs_direct_write_schedule_iovec(dreq, iter, pos,
    954							    FLUSH_COND_STABLE);
    955
    956		if (mapping->nrpages) {
    957			invalidate_inode_pages2_range(mapping,
    958						      pos >> PAGE_SHIFT, end);
    959		}
    960
    961		nfs_end_io_direct(inode);
    962	}
    963
    964	if (requested > 0) {
    965		result = nfs_direct_wait(dreq);
    966		if (result > 0) {
    967			requested -= result;
    968			iocb->ki_pos = pos + result;
    969			/* XXX: should check the generic_write_sync retval */
    970			generic_write_sync(iocb, result);
    971		}
    972		iov_iter_revert(iter, requested);
    973	} else {
    974		result = requested;
    975	}
    976	nfs_fscache_invalidate(inode, FSCACHE_INVAL_DIO_WRITE);
    977out_release:
    978	nfs_direct_req_release(dreq);
    979out:
    980	return result;
    981}
    982
    983/**
    984 * nfs_init_directcache - create a slab cache for nfs_direct_req structures
    985 *
    986 */
    987int __init nfs_init_directcache(void)
    988{
    989	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
    990						sizeof(struct nfs_direct_req),
    991						0, (SLAB_RECLAIM_ACCOUNT|
    992							SLAB_MEM_SPREAD),
    993						NULL);
    994	if (nfs_direct_cachep == NULL)
    995		return -ENOMEM;
    996
    997	return 0;
    998}
    999
   1000/**
   1001 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
   1002 *
   1003 */
   1004void nfs_destroy_directcache(void)
   1005{
   1006	kmem_cache_destroy(nfs_direct_cachep);
   1007}