cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xfs_log_cil.c (51727B)


      1// SPDX-License-Identifier: GPL-2.0
      2/*
      3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
      4 */
      5
      6#include "xfs.h"
      7#include "xfs_fs.h"
      8#include "xfs_format.h"
      9#include "xfs_log_format.h"
     10#include "xfs_shared.h"
     11#include "xfs_trans_resv.h"
     12#include "xfs_mount.h"
     13#include "xfs_extent_busy.h"
     14#include "xfs_trans.h"
     15#include "xfs_trans_priv.h"
     16#include "xfs_log.h"
     17#include "xfs_log_priv.h"
     18#include "xfs_trace.h"
     19
     20struct workqueue_struct *xfs_discard_wq;
     21
     22/*
     23 * Allocate a new ticket. Failing to get a new ticket makes it really hard to
     24 * recover, so we don't allow failure here. Also, we allocate in a context that
     25 * we don't want to be issuing transactions from, so we need to tell the
     26 * allocation code this as well.
     27 *
     28 * We don't reserve any space for the ticket - we are going to steal whatever
     29 * space we require from transactions as they commit. To ensure we reserve all
     30 * the space required, we need to set the current reservation of the ticket to
     31 * zero so that we know to steal the initial transaction overhead from the
     32 * first transaction commit.
     33 */
     34static struct xlog_ticket *
     35xlog_cil_ticket_alloc(
     36	struct xlog	*log)
     37{
     38	struct xlog_ticket *tic;
     39
     40	tic = xlog_ticket_alloc(log, 0, 1, 0);
     41
     42	/*
     43	 * set the current reservation to zero so we know to steal the basic
     44	 * transaction overhead reservation from the first transaction commit.
     45	 */
     46	tic->t_curr_res = 0;
     47	return tic;
     48}
     49
     50/*
     51 * Check if the current log item was first committed in this sequence.
     52 * We can't rely on just the log item being in the CIL, we have to check
     53 * the recorded commit sequence number.
     54 *
     55 * Note: for this to be used in a non-racy manner, it has to be called with
     56 * CIL flushing locked out. As a result, it should only be used during the
     57 * transaction commit process when deciding what to format into the item.
     58 */
     59static bool
     60xlog_item_in_current_chkpt(
     61	struct xfs_cil		*cil,
     62	struct xfs_log_item	*lip)
     63{
     64	if (list_empty(&lip->li_cil))
     65		return false;
     66
     67	/*
     68	 * li_seq is written on the first commit of a log item to record the
     69	 * first checkpoint it is written to. Hence if it is different to the
     70	 * current sequence, we're in a new checkpoint.
     71	 */
     72	return lip->li_seq == READ_ONCE(cil->xc_current_sequence);
     73}
     74
     75bool
     76xfs_log_item_in_current_chkpt(
     77	struct xfs_log_item *lip)
     78{
     79	return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip);
     80}
     81
     82/*
     83 * Unavoidable forward declaration - xlog_cil_push_work() calls
     84 * xlog_cil_ctx_alloc() itself.
     85 */
     86static void xlog_cil_push_work(struct work_struct *work);
     87
     88static struct xfs_cil_ctx *
     89xlog_cil_ctx_alloc(void)
     90{
     91	struct xfs_cil_ctx	*ctx;
     92
     93	ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
     94	INIT_LIST_HEAD(&ctx->committing);
     95	INIT_LIST_HEAD(&ctx->busy_extents);
     96	INIT_WORK(&ctx->push_work, xlog_cil_push_work);
     97	return ctx;
     98}
     99
    100static void
    101xlog_cil_ctx_switch(
    102	struct xfs_cil		*cil,
    103	struct xfs_cil_ctx	*ctx)
    104{
    105	ctx->sequence = ++cil->xc_current_sequence;
    106	ctx->cil = cil;
    107	cil->xc_ctx = ctx;
    108}
    109
    110/*
    111 * After the first stage of log recovery is done, we know where the head and
    112 * tail of the log are. We need this log initialisation done before we can
    113 * initialise the first CIL checkpoint context.
    114 *
    115 * Here we allocate a log ticket to track space usage during a CIL push.  This
    116 * ticket is passed to xlog_write() directly so that we don't slowly leak log
    117 * space by failing to account for space used by log headers and additional
    118 * region headers for split regions.
    119 */
    120void
    121xlog_cil_init_post_recovery(
    122	struct xlog	*log)
    123{
    124	log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
    125	log->l_cilp->xc_ctx->sequence = 1;
    126}
    127
    128static inline int
    129xlog_cil_iovec_space(
    130	uint	niovecs)
    131{
    132	return round_up((sizeof(struct xfs_log_vec) +
    133					niovecs * sizeof(struct xfs_log_iovec)),
    134			sizeof(uint64_t));
    135}
    136
    137/*
    138 * Allocate or pin log vector buffers for CIL insertion.
    139 *
    140 * The CIL currently uses disposable buffers for copying a snapshot of the
    141 * modified items into the log during a push. The biggest problem with this is
    142 * the requirement to allocate the disposable buffer during the commit if:
    143 *	a) does not exist; or
    144 *	b) it is too small
    145 *
    146 * If we do this allocation within xlog_cil_insert_format_items(), it is done
    147 * under the xc_ctx_lock, which means that a CIL push cannot occur during
    148 * the memory allocation. This means that we have a potential deadlock situation
    149 * under low memory conditions when we have lots of dirty metadata pinned in
    150 * the CIL and we need a CIL commit to occur to free memory.
    151 *
    152 * To avoid this, we need to move the memory allocation outside the
    153 * xc_ctx_lock, but because the log vector buffers are disposable, that opens
    154 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log
    155 * vector buffers between the check and the formatting of the item into the
    156 * log vector buffer within the xc_ctx_lock.
    157 *
    158 * Because the log vector buffer needs to be unchanged during the CIL push
    159 * process, we cannot share the buffer between the transaction commit (which
    160 * modifies the buffer) and the CIL push context that is writing the changes
    161 * into the log. This means skipping preallocation of buffer space is
    162 * unreliable, but we most definitely do not want to be allocating and freeing
    163 * buffers unnecessarily during commits when overwrites can be done safely.
    164 *
    165 * The simplest solution to this problem is to allocate a shadow buffer when a
    166 * log item is committed for the second time, and then to only use this buffer
    167 * if necessary. The buffer can remain attached to the log item until such time
    168 * it is needed, and this is the buffer that is reallocated to match the size of
    169 * the incoming modification. Then during the formatting of the item we can swap
    170 * the active buffer with the new one if we can't reuse the existing buffer. We
    171 * don't free the old buffer as it may be reused on the next modification if
    172 * it's size is right, otherwise we'll free and reallocate it at that point.
    173 *
    174 * This function builds a vector for the changes in each log item in the
    175 * transaction. It then works out the length of the buffer needed for each log
    176 * item, allocates them and attaches the vector to the log item in preparation
    177 * for the formatting step which occurs under the xc_ctx_lock.
    178 *
    179 * While this means the memory footprint goes up, it avoids the repeated
    180 * alloc/free pattern that repeated modifications of an item would otherwise
    181 * cause, and hence minimises the CPU overhead of such behaviour.
    182 */
    183static void
    184xlog_cil_alloc_shadow_bufs(
    185	struct xlog		*log,
    186	struct xfs_trans	*tp)
    187{
    188	struct xfs_log_item	*lip;
    189
    190	list_for_each_entry(lip, &tp->t_items, li_trans) {
    191		struct xfs_log_vec *lv;
    192		int	niovecs = 0;
    193		int	nbytes = 0;
    194		int	buf_size;
    195		bool	ordered = false;
    196
    197		/* Skip items which aren't dirty in this transaction. */
    198		if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
    199			continue;
    200
    201		/* get number of vecs and size of data to be stored */
    202		lip->li_ops->iop_size(lip, &niovecs, &nbytes);
    203
    204		/*
    205		 * Ordered items need to be tracked but we do not wish to write
    206		 * them. We need a logvec to track the object, but we do not
    207		 * need an iovec or buffer to be allocated for copying data.
    208		 */
    209		if (niovecs == XFS_LOG_VEC_ORDERED) {
    210			ordered = true;
    211			niovecs = 0;
    212			nbytes = 0;
    213		}
    214
    215		/*
    216		 * We 64-bit align the length of each iovec so that the start of
    217		 * the next one is naturally aligned.  We'll need to account for
    218		 * that slack space here.
    219		 *
    220		 * We also add the xlog_op_header to each region when
    221		 * formatting, but that's not accounted to the size of the item
    222		 * at this point. Hence we'll need an addition number of bytes
    223		 * for each vector to hold an opheader.
    224		 *
    225		 * Then round nbytes up to 64-bit alignment so that the initial
    226		 * buffer alignment is easy to calculate and verify.
    227		 */
    228		nbytes += niovecs *
    229			(sizeof(uint64_t) + sizeof(struct xlog_op_header));
    230		nbytes = round_up(nbytes, sizeof(uint64_t));
    231
    232		/*
    233		 * The data buffer needs to start 64-bit aligned, so round up
    234		 * that space to ensure we can align it appropriately and not
    235		 * overrun the buffer.
    236		 */
    237		buf_size = nbytes + xlog_cil_iovec_space(niovecs);
    238
    239		/*
    240		 * if we have no shadow buffer, or it is too small, we need to
    241		 * reallocate it.
    242		 */
    243		if (!lip->li_lv_shadow ||
    244		    buf_size > lip->li_lv_shadow->lv_size) {
    245			/*
    246			 * We free and allocate here as a realloc would copy
    247			 * unnecessary data. We don't use kvzalloc() for the
    248			 * same reason - we don't need to zero the data area in
    249			 * the buffer, only the log vector header and the iovec
    250			 * storage.
    251			 */
    252			kmem_free(lip->li_lv_shadow);
    253			lv = xlog_kvmalloc(buf_size);
    254
    255			memset(lv, 0, xlog_cil_iovec_space(niovecs));
    256
    257			lv->lv_item = lip;
    258			lv->lv_size = buf_size;
    259			if (ordered)
    260				lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
    261			else
    262				lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1];
    263			lip->li_lv_shadow = lv;
    264		} else {
    265			/* same or smaller, optimise common overwrite case */
    266			lv = lip->li_lv_shadow;
    267			if (ordered)
    268				lv->lv_buf_len = XFS_LOG_VEC_ORDERED;
    269			else
    270				lv->lv_buf_len = 0;
    271			lv->lv_bytes = 0;
    272			lv->lv_next = NULL;
    273		}
    274
    275		/* Ensure the lv is set up according to ->iop_size */
    276		lv->lv_niovecs = niovecs;
    277
    278		/* The allocated data region lies beyond the iovec region */
    279		lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs);
    280	}
    281
    282}
    283
    284/*
    285 * Prepare the log item for insertion into the CIL. Calculate the difference in
    286 * log space it will consume, and if it is a new item pin it as well.
    287 */
    288STATIC void
    289xfs_cil_prepare_item(
    290	struct xlog		*log,
    291	struct xfs_log_vec	*lv,
    292	struct xfs_log_vec	*old_lv,
    293	int			*diff_len)
    294{
    295	/* Account for the new LV being passed in */
    296	if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
    297		*diff_len += lv->lv_bytes;
    298
    299	/*
    300	 * If there is no old LV, this is the first time we've seen the item in
    301	 * this CIL context and so we need to pin it. If we are replacing the
    302	 * old_lv, then remove the space it accounts for and make it the shadow
    303	 * buffer for later freeing. In both cases we are now switching to the
    304	 * shadow buffer, so update the pointer to it appropriately.
    305	 */
    306	if (!old_lv) {
    307		if (lv->lv_item->li_ops->iop_pin)
    308			lv->lv_item->li_ops->iop_pin(lv->lv_item);
    309		lv->lv_item->li_lv_shadow = NULL;
    310	} else if (old_lv != lv) {
    311		ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED);
    312
    313		*diff_len -= old_lv->lv_bytes;
    314		lv->lv_item->li_lv_shadow = old_lv;
    315	}
    316
    317	/* attach new log vector to log item */
    318	lv->lv_item->li_lv = lv;
    319
    320	/*
    321	 * If this is the first time the item is being committed to the
    322	 * CIL, store the sequence number on the log item so we can
    323	 * tell in future commits whether this is the first checkpoint
    324	 * the item is being committed into.
    325	 */
    326	if (!lv->lv_item->li_seq)
    327		lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence;
    328}
    329
    330/*
    331 * Format log item into a flat buffers
    332 *
    333 * For delayed logging, we need to hold a formatted buffer containing all the
    334 * changes on the log item. This enables us to relog the item in memory and
    335 * write it out asynchronously without needing to relock the object that was
    336 * modified at the time it gets written into the iclog.
    337 *
    338 * This function takes the prepared log vectors attached to each log item, and
    339 * formats the changes into the log vector buffer. The buffer it uses is
    340 * dependent on the current state of the vector in the CIL - the shadow lv is
    341 * guaranteed to be large enough for the current modification, but we will only
    342 * use that if we can't reuse the existing lv. If we can't reuse the existing
    343 * lv, then simple swap it out for the shadow lv. We don't free it - that is
    344 * done lazily either by th enext modification or the freeing of the log item.
    345 *
    346 * We don't set up region headers during this process; we simply copy the
    347 * regions into the flat buffer. We can do this because we still have to do a
    348 * formatting step to write the regions into the iclog buffer.  Writing the
    349 * ophdrs during the iclog write means that we can support splitting large
    350 * regions across iclog boundares without needing a change in the format of the
    351 * item/region encapsulation.
    352 *
    353 * Hence what we need to do now is change the rewrite the vector array to point
    354 * to the copied region inside the buffer we just allocated. This allows us to
    355 * format the regions into the iclog as though they are being formatted
    356 * directly out of the objects themselves.
    357 */
    358static void
    359xlog_cil_insert_format_items(
    360	struct xlog		*log,
    361	struct xfs_trans	*tp,
    362	int			*diff_len)
    363{
    364	struct xfs_log_item	*lip;
    365
    366	/* Bail out if we didn't find a log item.  */
    367	if (list_empty(&tp->t_items)) {
    368		ASSERT(0);
    369		return;
    370	}
    371
    372	list_for_each_entry(lip, &tp->t_items, li_trans) {
    373		struct xfs_log_vec *lv;
    374		struct xfs_log_vec *old_lv = NULL;
    375		struct xfs_log_vec *shadow;
    376		bool	ordered = false;
    377
    378		/* Skip items which aren't dirty in this transaction. */
    379		if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
    380			continue;
    381
    382		/*
    383		 * The formatting size information is already attached to
    384		 * the shadow lv on the log item.
    385		 */
    386		shadow = lip->li_lv_shadow;
    387		if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED)
    388			ordered = true;
    389
    390		/* Skip items that do not have any vectors for writing */
    391		if (!shadow->lv_niovecs && !ordered)
    392			continue;
    393
    394		/* compare to existing item size */
    395		old_lv = lip->li_lv;
    396		if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
    397			/* same or smaller, optimise common overwrite case */
    398			lv = lip->li_lv;
    399			lv->lv_next = NULL;
    400
    401			if (ordered)
    402				goto insert;
    403
    404			/*
    405			 * set the item up as though it is a new insertion so
    406			 * that the space reservation accounting is correct.
    407			 */
    408			*diff_len -= lv->lv_bytes;
    409
    410			/* Ensure the lv is set up according to ->iop_size */
    411			lv->lv_niovecs = shadow->lv_niovecs;
    412
    413			/* reset the lv buffer information for new formatting */
    414			lv->lv_buf_len = 0;
    415			lv->lv_bytes = 0;
    416			lv->lv_buf = (char *)lv +
    417					xlog_cil_iovec_space(lv->lv_niovecs);
    418		} else {
    419			/* switch to shadow buffer! */
    420			lv = shadow;
    421			lv->lv_item = lip;
    422			if (ordered) {
    423				/* track as an ordered logvec */
    424				ASSERT(lip->li_lv == NULL);
    425				goto insert;
    426			}
    427		}
    428
    429		ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t)));
    430		lip->li_ops->iop_format(lip, lv);
    431insert:
    432		xfs_cil_prepare_item(log, lv, old_lv, diff_len);
    433	}
    434}
    435
    436/*
    437 * Insert the log items into the CIL and calculate the difference in space
    438 * consumed by the item. Add the space to the checkpoint ticket and calculate
    439 * if the change requires additional log metadata. If it does, take that space
    440 * as well. Remove the amount of space we added to the checkpoint ticket from
    441 * the current transaction ticket so that the accounting works out correctly.
    442 */
    443static void
    444xlog_cil_insert_items(
    445	struct xlog		*log,
    446	struct xfs_trans	*tp,
    447	uint32_t		released_space)
    448{
    449	struct xfs_cil		*cil = log->l_cilp;
    450	struct xfs_cil_ctx	*ctx = cil->xc_ctx;
    451	struct xfs_log_item	*lip;
    452	int			len = 0;
    453	int			iclog_space;
    454	int			iovhdr_res = 0, split_res = 0, ctx_res = 0;
    455
    456	ASSERT(tp);
    457
    458	/*
    459	 * We can do this safely because the context can't checkpoint until we
    460	 * are done so it doesn't matter exactly how we update the CIL.
    461	 */
    462	xlog_cil_insert_format_items(log, tp, &len);
    463
    464	spin_lock(&cil->xc_cil_lock);
    465
    466	/* attach the transaction to the CIL if it has any busy extents */
    467	if (!list_empty(&tp->t_busy))
    468		list_splice_init(&tp->t_busy, &ctx->busy_extents);
    469
    470	/*
    471	 * Now transfer enough transaction reservation to the context ticket
    472	 * for the checkpoint. The context ticket is special - the unit
    473	 * reservation has to grow as well as the current reservation as we
    474	 * steal from tickets so we can correctly determine the space used
    475	 * during the transaction commit.
    476	 */
    477	if (ctx->ticket->t_curr_res == 0) {
    478		ctx_res = ctx->ticket->t_unit_res;
    479		ctx->ticket->t_curr_res = ctx_res;
    480		tp->t_ticket->t_curr_res -= ctx_res;
    481	}
    482
    483	/* do we need space for more log record headers? */
    484	iclog_space = log->l_iclog_size - log->l_iclog_hsize;
    485	if (len > 0 && (ctx->space_used / iclog_space !=
    486				(ctx->space_used + len) / iclog_space)) {
    487		split_res = (len + iclog_space - 1) / iclog_space;
    488		/* need to take into account split region headers, too */
    489		split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
    490		ctx->ticket->t_unit_res += split_res;
    491		ctx->ticket->t_curr_res += split_res;
    492		tp->t_ticket->t_curr_res -= split_res;
    493		ASSERT(tp->t_ticket->t_curr_res >= len);
    494	}
    495	tp->t_ticket->t_curr_res -= len;
    496	tp->t_ticket->t_curr_res += released_space;
    497	ctx->space_used += len;
    498	ctx->space_used -= released_space;
    499
    500	/*
    501	 * If we've overrun the reservation, dump the tx details before we move
    502	 * the log items. Shutdown is imminent...
    503	 */
    504	if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
    505		xfs_warn(log->l_mp, "Transaction log reservation overrun:");
    506		xfs_warn(log->l_mp,
    507			 "  log items: %d bytes (iov hdrs: %d bytes)",
    508			 len, iovhdr_res);
    509		xfs_warn(log->l_mp, "  split region headers: %d bytes",
    510			 split_res);
    511		xfs_warn(log->l_mp, "  ctx ticket: %d bytes", ctx_res);
    512		xlog_print_trans(tp);
    513	}
    514
    515	/*
    516	 * Now (re-)position everything modified at the tail of the CIL.
    517	 * We do this here so we only need to take the CIL lock once during
    518	 * the transaction commit.
    519	 */
    520	list_for_each_entry(lip, &tp->t_items, li_trans) {
    521
    522		/* Skip items which aren't dirty in this transaction. */
    523		if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
    524			continue;
    525
    526		/*
    527		 * Only move the item if it isn't already at the tail. This is
    528		 * to prevent a transient list_empty() state when reinserting
    529		 * an item that is already the only item in the CIL.
    530		 */
    531		if (!list_is_last(&lip->li_cil, &cil->xc_cil))
    532			list_move_tail(&lip->li_cil, &cil->xc_cil);
    533	}
    534
    535	spin_unlock(&cil->xc_cil_lock);
    536
    537	if (tp->t_ticket->t_curr_res < 0)
    538		xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
    539}
    540
    541static void
    542xlog_cil_free_logvec(
    543	struct xfs_log_vec	*log_vector)
    544{
    545	struct xfs_log_vec	*lv;
    546
    547	for (lv = log_vector; lv; ) {
    548		struct xfs_log_vec *next = lv->lv_next;
    549		kmem_free(lv);
    550		lv = next;
    551	}
    552}
    553
    554static void
    555xlog_discard_endio_work(
    556	struct work_struct	*work)
    557{
    558	struct xfs_cil_ctx	*ctx =
    559		container_of(work, struct xfs_cil_ctx, discard_endio_work);
    560	struct xfs_mount	*mp = ctx->cil->xc_log->l_mp;
    561
    562	xfs_extent_busy_clear(mp, &ctx->busy_extents, false);
    563	kmem_free(ctx);
    564}
    565
    566/*
    567 * Queue up the actual completion to a thread to avoid IRQ-safe locking for
    568 * pagb_lock.  Note that we need a unbounded workqueue, otherwise we might
    569 * get the execution delayed up to 30 seconds for weird reasons.
    570 */
    571static void
    572xlog_discard_endio(
    573	struct bio		*bio)
    574{
    575	struct xfs_cil_ctx	*ctx = bio->bi_private;
    576
    577	INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work);
    578	queue_work(xfs_discard_wq, &ctx->discard_endio_work);
    579	bio_put(bio);
    580}
    581
    582static void
    583xlog_discard_busy_extents(
    584	struct xfs_mount	*mp,
    585	struct xfs_cil_ctx	*ctx)
    586{
    587	struct list_head	*list = &ctx->busy_extents;
    588	struct xfs_extent_busy	*busyp;
    589	struct bio		*bio = NULL;
    590	struct blk_plug		plug;
    591	int			error = 0;
    592
    593	ASSERT(xfs_has_discard(mp));
    594
    595	blk_start_plug(&plug);
    596	list_for_each_entry(busyp, list, list) {
    597		trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
    598					 busyp->length);
    599
    600		error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
    601				XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
    602				XFS_FSB_TO_BB(mp, busyp->length),
    603				GFP_NOFS, &bio);
    604		if (error && error != -EOPNOTSUPP) {
    605			xfs_info(mp,
    606	 "discard failed for extent [0x%llx,%u], error %d",
    607				 (unsigned long long)busyp->bno,
    608				 busyp->length,
    609				 error);
    610			break;
    611		}
    612	}
    613
    614	if (bio) {
    615		bio->bi_private = ctx;
    616		bio->bi_end_io = xlog_discard_endio;
    617		submit_bio(bio);
    618	} else {
    619		xlog_discard_endio_work(&ctx->discard_endio_work);
    620	}
    621	blk_finish_plug(&plug);
    622}
    623
    624/*
    625 * Mark all items committed and clear busy extents. We free the log vector
    626 * chains in a separate pass so that we unpin the log items as quickly as
    627 * possible.
    628 */
    629static void
    630xlog_cil_committed(
    631	struct xfs_cil_ctx	*ctx)
    632{
    633	struct xfs_mount	*mp = ctx->cil->xc_log->l_mp;
    634	bool			abort = xlog_is_shutdown(ctx->cil->xc_log);
    635
    636	/*
    637	 * If the I/O failed, we're aborting the commit and already shutdown.
    638	 * Wake any commit waiters before aborting the log items so we don't
    639	 * block async log pushers on callbacks. Async log pushers explicitly do
    640	 * not wait on log force completion because they may be holding locks
    641	 * required to unpin items.
    642	 */
    643	if (abort) {
    644		spin_lock(&ctx->cil->xc_push_lock);
    645		wake_up_all(&ctx->cil->xc_start_wait);
    646		wake_up_all(&ctx->cil->xc_commit_wait);
    647		spin_unlock(&ctx->cil->xc_push_lock);
    648	}
    649
    650	xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
    651					ctx->start_lsn, abort);
    652
    653	xfs_extent_busy_sort(&ctx->busy_extents);
    654	xfs_extent_busy_clear(mp, &ctx->busy_extents,
    655			      xfs_has_discard(mp) && !abort);
    656
    657	spin_lock(&ctx->cil->xc_push_lock);
    658	list_del(&ctx->committing);
    659	spin_unlock(&ctx->cil->xc_push_lock);
    660
    661	xlog_cil_free_logvec(ctx->lv_chain);
    662
    663	if (!list_empty(&ctx->busy_extents))
    664		xlog_discard_busy_extents(mp, ctx);
    665	else
    666		kmem_free(ctx);
    667}
    668
    669void
    670xlog_cil_process_committed(
    671	struct list_head	*list)
    672{
    673	struct xfs_cil_ctx	*ctx;
    674
    675	while ((ctx = list_first_entry_or_null(list,
    676			struct xfs_cil_ctx, iclog_entry))) {
    677		list_del(&ctx->iclog_entry);
    678		xlog_cil_committed(ctx);
    679	}
    680}
    681
    682/*
    683* Record the LSN of the iclog we were just granted space to start writing into.
    684* If the context doesn't have a start_lsn recorded, then this iclog will
    685* contain the start record for the checkpoint. Otherwise this write contains
    686* the commit record for the checkpoint.
    687*/
    688void
    689xlog_cil_set_ctx_write_state(
    690	struct xfs_cil_ctx	*ctx,
    691	struct xlog_in_core	*iclog)
    692{
    693	struct xfs_cil		*cil = ctx->cil;
    694	xfs_lsn_t		lsn = be64_to_cpu(iclog->ic_header.h_lsn);
    695
    696	ASSERT(!ctx->commit_lsn);
    697	if (!ctx->start_lsn) {
    698		spin_lock(&cil->xc_push_lock);
    699		/*
    700		 * The LSN we need to pass to the log items on transaction
    701		 * commit is the LSN reported by the first log vector write, not
    702		 * the commit lsn. If we use the commit record lsn then we can
    703		 * move the grant write head beyond the tail LSN and overwrite
    704		 * it.
    705		 */
    706		ctx->start_lsn = lsn;
    707		wake_up_all(&cil->xc_start_wait);
    708		spin_unlock(&cil->xc_push_lock);
    709
    710		/*
    711		 * Make sure the metadata we are about to overwrite in the log
    712		 * has been flushed to stable storage before this iclog is
    713		 * issued.
    714		 */
    715		spin_lock(&cil->xc_log->l_icloglock);
    716		iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
    717		spin_unlock(&cil->xc_log->l_icloglock);
    718		return;
    719	}
    720
    721	/*
    722	 * Take a reference to the iclog for the context so that we still hold
    723	 * it when xlog_write is done and has released it. This means the
    724	 * context controls when the iclog is released for IO.
    725	 */
    726	atomic_inc(&iclog->ic_refcnt);
    727
    728	/*
    729	 * xlog_state_get_iclog_space() guarantees there is enough space in the
    730	 * iclog for an entire commit record, so we can attach the context
    731	 * callbacks now.  This needs to be done before we make the commit_lsn
    732	 * visible to waiters so that checkpoints with commit records in the
    733	 * same iclog order their IO completion callbacks in the same order that
    734	 * the commit records appear in the iclog.
    735	 */
    736	spin_lock(&cil->xc_log->l_icloglock);
    737	list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks);
    738	spin_unlock(&cil->xc_log->l_icloglock);
    739
    740	/*
    741	 * Now we can record the commit LSN and wake anyone waiting for this
    742	 * sequence to have the ordered commit record assigned to a physical
    743	 * location in the log.
    744	 */
    745	spin_lock(&cil->xc_push_lock);
    746	ctx->commit_iclog = iclog;
    747	ctx->commit_lsn = lsn;
    748	wake_up_all(&cil->xc_commit_wait);
    749	spin_unlock(&cil->xc_push_lock);
    750}
    751
    752
    753/*
    754 * Ensure that the order of log writes follows checkpoint sequence order. This
    755 * relies on the context LSN being zero until the log write has guaranteed the
    756 * LSN that the log write will start at via xlog_state_get_iclog_space().
    757 */
    758enum _record_type {
    759	_START_RECORD,
    760	_COMMIT_RECORD,
    761};
    762
    763static int
    764xlog_cil_order_write(
    765	struct xfs_cil		*cil,
    766	xfs_csn_t		sequence,
    767	enum _record_type	record)
    768{
    769	struct xfs_cil_ctx	*ctx;
    770
    771restart:
    772	spin_lock(&cil->xc_push_lock);
    773	list_for_each_entry(ctx, &cil->xc_committing, committing) {
    774		/*
    775		 * Avoid getting stuck in this loop because we were woken by the
    776		 * shutdown, but then went back to sleep once already in the
    777		 * shutdown state.
    778		 */
    779		if (xlog_is_shutdown(cil->xc_log)) {
    780			spin_unlock(&cil->xc_push_lock);
    781			return -EIO;
    782		}
    783
    784		/*
    785		 * Higher sequences will wait for this one so skip them.
    786		 * Don't wait for our own sequence, either.
    787		 */
    788		if (ctx->sequence >= sequence)
    789			continue;
    790
    791		/* Wait until the LSN for the record has been recorded. */
    792		switch (record) {
    793		case _START_RECORD:
    794			if (!ctx->start_lsn) {
    795				xlog_wait(&cil->xc_start_wait, &cil->xc_push_lock);
    796				goto restart;
    797			}
    798			break;
    799		case _COMMIT_RECORD:
    800			if (!ctx->commit_lsn) {
    801				xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
    802				goto restart;
    803			}
    804			break;
    805		}
    806	}
    807	spin_unlock(&cil->xc_push_lock);
    808	return 0;
    809}
    810
    811/*
    812 * Write out the log vector change now attached to the CIL context. This will
    813 * write a start record that needs to be strictly ordered in ascending CIL
    814 * sequence order so that log recovery will always use in-order start LSNs when
    815 * replaying checkpoints.
    816 */
    817static int
    818xlog_cil_write_chain(
    819	struct xfs_cil_ctx	*ctx,
    820	struct xfs_log_vec	*chain,
    821	uint32_t		chain_len)
    822{
    823	struct xlog		*log = ctx->cil->xc_log;
    824	int			error;
    825
    826	error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD);
    827	if (error)
    828		return error;
    829	return xlog_write(log, ctx, chain, ctx->ticket, chain_len);
    830}
    831
    832/*
    833 * Write out the commit record of a checkpoint transaction to close off a
    834 * running log write. These commit records are strictly ordered in ascending CIL
    835 * sequence order so that log recovery will always replay the checkpoints in the
    836 * correct order.
    837 */
    838static int
    839xlog_cil_write_commit_record(
    840	struct xfs_cil_ctx	*ctx)
    841{
    842	struct xlog		*log = ctx->cil->xc_log;
    843	struct xlog_op_header	ophdr = {
    844		.oh_clientid = XFS_TRANSACTION,
    845		.oh_tid = cpu_to_be32(ctx->ticket->t_tid),
    846		.oh_flags = XLOG_COMMIT_TRANS,
    847	};
    848	struct xfs_log_iovec	reg = {
    849		.i_addr = &ophdr,
    850		.i_len = sizeof(struct xlog_op_header),
    851		.i_type = XLOG_REG_TYPE_COMMIT,
    852	};
    853	struct xfs_log_vec	vec = {
    854		.lv_niovecs = 1,
    855		.lv_iovecp = &reg,
    856	};
    857	int			error;
    858
    859	if (xlog_is_shutdown(log))
    860		return -EIO;
    861
    862	error = xlog_cil_order_write(ctx->cil, ctx->sequence, _COMMIT_RECORD);
    863	if (error)
    864		return error;
    865
    866	/* account for space used by record data */
    867	ctx->ticket->t_curr_res -= reg.i_len;
    868	error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len);
    869	if (error)
    870		xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
    871	return error;
    872}
    873
    874struct xlog_cil_trans_hdr {
    875	struct xlog_op_header	oph[2];
    876	struct xfs_trans_header	thdr;
    877	struct xfs_log_iovec	lhdr[2];
    878};
    879
    880/*
    881 * Build a checkpoint transaction header to begin the journal transaction.  We
    882 * need to account for the space used by the transaction header here as it is
    883 * not accounted for in xlog_write().
    884 *
    885 * This is the only place we write a transaction header, so we also build the
    886 * log opheaders that indicate the start of a log transaction and wrap the
    887 * transaction header. We keep the start record in it's own log vector rather
    888 * than compacting them into a single region as this ends up making the logic
    889 * in xlog_write() for handling empty opheaders for start, commit and unmount
    890 * records much simpler.
    891 */
    892static void
    893xlog_cil_build_trans_hdr(
    894	struct xfs_cil_ctx	*ctx,
    895	struct xlog_cil_trans_hdr *hdr,
    896	struct xfs_log_vec	*lvhdr,
    897	int			num_iovecs)
    898{
    899	struct xlog_ticket	*tic = ctx->ticket;
    900	__be32			tid = cpu_to_be32(tic->t_tid);
    901
    902	memset(hdr, 0, sizeof(*hdr));
    903
    904	/* Log start record */
    905	hdr->oph[0].oh_tid = tid;
    906	hdr->oph[0].oh_clientid = XFS_TRANSACTION;
    907	hdr->oph[0].oh_flags = XLOG_START_TRANS;
    908
    909	/* log iovec region pointer */
    910	hdr->lhdr[0].i_addr = &hdr->oph[0];
    911	hdr->lhdr[0].i_len = sizeof(struct xlog_op_header);
    912	hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER;
    913
    914	/* log opheader */
    915	hdr->oph[1].oh_tid = tid;
    916	hdr->oph[1].oh_clientid = XFS_TRANSACTION;
    917	hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header));
    918
    919	/* transaction header in host byte order format */
    920	hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
    921	hdr->thdr.th_type = XFS_TRANS_CHECKPOINT;
    922	hdr->thdr.th_tid = tic->t_tid;
    923	hdr->thdr.th_num_items = num_iovecs;
    924
    925	/* log iovec region pointer */
    926	hdr->lhdr[1].i_addr = &hdr->oph[1];
    927	hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) +
    928				sizeof(struct xfs_trans_header);
    929	hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR;
    930
    931	lvhdr->lv_niovecs = 2;
    932	lvhdr->lv_iovecp = &hdr->lhdr[0];
    933	lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
    934	lvhdr->lv_next = ctx->lv_chain;
    935
    936	tic->t_curr_res -= lvhdr->lv_bytes;
    937}
    938
    939/*
    940 * Pull all the log vectors off the items in the CIL, and remove the items from
    941 * the CIL. We don't need the CIL lock here because it's only needed on the
    942 * transaction commit side which is currently locked out by the flush lock.
    943 *
    944 * If a log item is marked with a whiteout, we do not need to write it to the
    945 * journal and so we just move them to the whiteout list for the caller to
    946 * dispose of appropriately.
    947 */
    948static void
    949xlog_cil_build_lv_chain(
    950	struct xfs_cil		*cil,
    951	struct xfs_cil_ctx	*ctx,
    952	struct list_head	*whiteouts,
    953	uint32_t		*num_iovecs,
    954	uint32_t		*num_bytes)
    955{
    956	struct xfs_log_vec	*lv = NULL;
    957
    958	while (!list_empty(&cil->xc_cil)) {
    959		struct xfs_log_item	*item;
    960
    961		item = list_first_entry(&cil->xc_cil,
    962					struct xfs_log_item, li_cil);
    963
    964		if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) {
    965			list_move(&item->li_cil, whiteouts);
    966			trace_xfs_cil_whiteout_skip(item);
    967			continue;
    968		}
    969
    970		list_del_init(&item->li_cil);
    971		if (!ctx->lv_chain)
    972			ctx->lv_chain = item->li_lv;
    973		else
    974			lv->lv_next = item->li_lv;
    975		lv = item->li_lv;
    976		item->li_lv = NULL;
    977		*num_iovecs += lv->lv_niovecs;
    978
    979		/* we don't write ordered log vectors */
    980		if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
    981			*num_bytes += lv->lv_bytes;
    982	}
    983}
    984
    985static void
    986xlog_cil_cleanup_whiteouts(
    987	struct list_head	*whiteouts)
    988{
    989	while (!list_empty(whiteouts)) {
    990		struct xfs_log_item *item = list_first_entry(whiteouts,
    991						struct xfs_log_item, li_cil);
    992		list_del_init(&item->li_cil);
    993		trace_xfs_cil_whiteout_unpin(item);
    994		item->li_ops->iop_unpin(item, 1);
    995	}
    996}
    997
    998/*
    999 * Push the Committed Item List to the log.
   1000 *
   1001 * If the current sequence is the same as xc_push_seq we need to do a flush. If
   1002 * xc_push_seq is less than the current sequence, then it has already been
   1003 * flushed and we don't need to do anything - the caller will wait for it to
   1004 * complete if necessary.
   1005 *
   1006 * xc_push_seq is checked unlocked against the sequence number for a match.
   1007 * Hence we can allow log forces to run racily and not issue pushes for the
   1008 * same sequence twice.  If we get a race between multiple pushes for the same
   1009 * sequence they will block on the first one and then abort, hence avoiding
   1010 * needless pushes.
   1011 */
   1012static void
   1013xlog_cil_push_work(
   1014	struct work_struct	*work)
   1015{
   1016	struct xfs_cil_ctx	*ctx =
   1017		container_of(work, struct xfs_cil_ctx, push_work);
   1018	struct xfs_cil		*cil = ctx->cil;
   1019	struct xlog		*log = cil->xc_log;
   1020	struct xfs_cil_ctx	*new_ctx;
   1021	int			num_iovecs = 0;
   1022	int			num_bytes = 0;
   1023	int			error = 0;
   1024	struct xlog_cil_trans_hdr thdr;
   1025	struct xfs_log_vec	lvhdr = { NULL };
   1026	xfs_csn_t		push_seq;
   1027	bool			push_commit_stable;
   1028	LIST_HEAD		(whiteouts);
   1029
   1030	new_ctx = xlog_cil_ctx_alloc();
   1031	new_ctx->ticket = xlog_cil_ticket_alloc(log);
   1032
   1033	down_write(&cil->xc_ctx_lock);
   1034
   1035	spin_lock(&cil->xc_push_lock);
   1036	push_seq = cil->xc_push_seq;
   1037	ASSERT(push_seq <= ctx->sequence);
   1038	push_commit_stable = cil->xc_push_commit_stable;
   1039	cil->xc_push_commit_stable = false;
   1040
   1041	/*
   1042	 * As we are about to switch to a new, empty CIL context, we no longer
   1043	 * need to throttle tasks on CIL space overruns. Wake any waiters that
   1044	 * the hard push throttle may have caught so they can start committing
   1045	 * to the new context. The ctx->xc_push_lock provides the serialisation
   1046	 * necessary for safely using the lockless waitqueue_active() check in
   1047	 * this context.
   1048	 */
   1049	if (waitqueue_active(&cil->xc_push_wait))
   1050		wake_up_all(&cil->xc_push_wait);
   1051
   1052	/*
   1053	 * Check if we've anything to push. If there is nothing, then we don't
   1054	 * move on to a new sequence number and so we have to be able to push
   1055	 * this sequence again later.
   1056	 */
   1057	if (list_empty(&cil->xc_cil)) {
   1058		cil->xc_push_seq = 0;
   1059		spin_unlock(&cil->xc_push_lock);
   1060		goto out_skip;
   1061	}
   1062
   1063
   1064	/* check for a previously pushed sequence */
   1065	if (push_seq < ctx->sequence) {
   1066		spin_unlock(&cil->xc_push_lock);
   1067		goto out_skip;
   1068	}
   1069
   1070	/*
   1071	 * We are now going to push this context, so add it to the committing
   1072	 * list before we do anything else. This ensures that anyone waiting on
   1073	 * this push can easily detect the difference between a "push in
   1074	 * progress" and "CIL is empty, nothing to do".
   1075	 *
   1076	 * IOWs, a wait loop can now check for:
   1077	 *	the current sequence not being found on the committing list;
   1078	 *	an empty CIL; and
   1079	 *	an unchanged sequence number
   1080	 * to detect a push that had nothing to do and therefore does not need
   1081	 * waiting on. If the CIL is not empty, we get put on the committing
   1082	 * list before emptying the CIL and bumping the sequence number. Hence
   1083	 * an empty CIL and an unchanged sequence number means we jumped out
   1084	 * above after doing nothing.
   1085	 *
   1086	 * Hence the waiter will either find the commit sequence on the
   1087	 * committing list or the sequence number will be unchanged and the CIL
   1088	 * still dirty. In that latter case, the push has not yet started, and
   1089	 * so the waiter will have to continue trying to check the CIL
   1090	 * committing list until it is found. In extreme cases of delay, the
   1091	 * sequence may fully commit between the attempts the wait makes to wait
   1092	 * on the commit sequence.
   1093	 */
   1094	list_add(&ctx->committing, &cil->xc_committing);
   1095	spin_unlock(&cil->xc_push_lock);
   1096
   1097	xlog_cil_build_lv_chain(cil, ctx, &whiteouts, &num_iovecs, &num_bytes);
   1098
   1099	/*
   1100	 * Switch the contexts so we can drop the context lock and move out
   1101	 * of a shared context. We can't just go straight to the commit record,
   1102	 * though - we need to synchronise with previous and future commits so
   1103	 * that the commit records are correctly ordered in the log to ensure
   1104	 * that we process items during log IO completion in the correct order.
   1105	 *
   1106	 * For example, if we get an EFI in one checkpoint and the EFD in the
   1107	 * next (e.g. due to log forces), we do not want the checkpoint with
   1108	 * the EFD to be committed before the checkpoint with the EFI.  Hence
   1109	 * we must strictly order the commit records of the checkpoints so
   1110	 * that: a) the checkpoint callbacks are attached to the iclogs in the
   1111	 * correct order; and b) the checkpoints are replayed in correct order
   1112	 * in log recovery.
   1113	 *
   1114	 * Hence we need to add this context to the committing context list so
   1115	 * that higher sequences will wait for us to write out a commit record
   1116	 * before they do.
   1117	 *
   1118	 * xfs_log_force_seq requires us to mirror the new sequence into the cil
   1119	 * structure atomically with the addition of this sequence to the
   1120	 * committing list. This also ensures that we can do unlocked checks
   1121	 * against the current sequence in log forces without risking
   1122	 * deferencing a freed context pointer.
   1123	 */
   1124	spin_lock(&cil->xc_push_lock);
   1125	xlog_cil_ctx_switch(cil, new_ctx);
   1126	spin_unlock(&cil->xc_push_lock);
   1127	up_write(&cil->xc_ctx_lock);
   1128
   1129	/*
   1130	 * Build a checkpoint transaction header and write it to the log to
   1131	 * begin the transaction. We need to account for the space used by the
   1132	 * transaction header here as it is not accounted for in xlog_write().
   1133	 */
   1134	xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
   1135	num_bytes += lvhdr.lv_bytes;
   1136
   1137	error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes);
   1138	if (error)
   1139		goto out_abort_free_ticket;
   1140
   1141	error = xlog_cil_write_commit_record(ctx);
   1142	if (error)
   1143		goto out_abort_free_ticket;
   1144
   1145	xfs_log_ticket_ungrant(log, ctx->ticket);
   1146
   1147	/*
   1148	 * If the checkpoint spans multiple iclogs, wait for all previous iclogs
   1149	 * to complete before we submit the commit_iclog. We can't use state
   1150	 * checks for this - ACTIVE can be either a past completed iclog or a
   1151	 * future iclog being filled, while WANT_SYNC through SYNC_DONE can be a
   1152	 * past or future iclog awaiting IO or ordered IO completion to be run.
   1153	 * In the latter case, if it's a future iclog and we wait on it, the we
   1154	 * will hang because it won't get processed through to ic_force_wait
   1155	 * wakeup until this commit_iclog is written to disk.  Hence we use the
   1156	 * iclog header lsn and compare it to the commit lsn to determine if we
   1157	 * need to wait on iclogs or not.
   1158	 */
   1159	spin_lock(&log->l_icloglock);
   1160	if (ctx->start_lsn != ctx->commit_lsn) {
   1161		xfs_lsn_t	plsn;
   1162
   1163		plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn);
   1164		if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) {
   1165			/*
   1166			 * Waiting on ic_force_wait orders the completion of
   1167			 * iclogs older than ic_prev. Hence we only need to wait
   1168			 * on the most recent older iclog here.
   1169			 */
   1170			xlog_wait_on_iclog(ctx->commit_iclog->ic_prev);
   1171			spin_lock(&log->l_icloglock);
   1172		}
   1173
   1174		/*
   1175		 * We need to issue a pre-flush so that the ordering for this
   1176		 * checkpoint is correctly preserved down to stable storage.
   1177		 */
   1178		ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
   1179	}
   1180
   1181	/*
   1182	 * The commit iclog must be written to stable storage to guarantee
   1183	 * journal IO vs metadata writeback IO is correctly ordered on stable
   1184	 * storage.
   1185	 *
   1186	 * If the push caller needs the commit to be immediately stable and the
   1187	 * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it
   1188	 * will be written when released, switch it's state to WANT_SYNC right
   1189	 * now.
   1190	 */
   1191	ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA;
   1192	if (push_commit_stable &&
   1193	    ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
   1194		xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
   1195	xlog_state_release_iclog(log, ctx->commit_iclog);
   1196
   1197	/* Not safe to reference ctx now! */
   1198
   1199	spin_unlock(&log->l_icloglock);
   1200	xlog_cil_cleanup_whiteouts(&whiteouts);
   1201	return;
   1202
   1203out_skip:
   1204	up_write(&cil->xc_ctx_lock);
   1205	xfs_log_ticket_put(new_ctx->ticket);
   1206	kmem_free(new_ctx);
   1207	return;
   1208
   1209out_abort_free_ticket:
   1210	xfs_log_ticket_ungrant(log, ctx->ticket);
   1211	ASSERT(xlog_is_shutdown(log));
   1212	xlog_cil_cleanup_whiteouts(&whiteouts);
   1213	if (!ctx->commit_iclog) {
   1214		xlog_cil_committed(ctx);
   1215		return;
   1216	}
   1217	spin_lock(&log->l_icloglock);
   1218	xlog_state_release_iclog(log, ctx->commit_iclog);
   1219	/* Not safe to reference ctx now! */
   1220	spin_unlock(&log->l_icloglock);
   1221}
   1222
   1223/*
   1224 * We need to push CIL every so often so we don't cache more than we can fit in
   1225 * the log. The limit really is that a checkpoint can't be more than half the
   1226 * log (the current checkpoint is not allowed to overwrite the previous
   1227 * checkpoint), but commit latency and memory usage limit this to a smaller
   1228 * size.
   1229 */
   1230static void
   1231xlog_cil_push_background(
   1232	struct xlog	*log) __releases(cil->xc_ctx_lock)
   1233{
   1234	struct xfs_cil	*cil = log->l_cilp;
   1235
   1236	/*
   1237	 * The cil won't be empty because we are called while holding the
   1238	 * context lock so whatever we added to the CIL will still be there
   1239	 */
   1240	ASSERT(!list_empty(&cil->xc_cil));
   1241
   1242	/*
   1243	 * Don't do a background push if we haven't used up all the
   1244	 * space available yet.
   1245	 */
   1246	if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) {
   1247		up_read(&cil->xc_ctx_lock);
   1248		return;
   1249	}
   1250
   1251	spin_lock(&cil->xc_push_lock);
   1252	if (cil->xc_push_seq < cil->xc_current_sequence) {
   1253		cil->xc_push_seq = cil->xc_current_sequence;
   1254		queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work);
   1255	}
   1256
   1257	/*
   1258	 * Drop the context lock now, we can't hold that if we need to sleep
   1259	 * because we are over the blocking threshold. The push_lock is still
   1260	 * held, so blocking threshold sleep/wakeup is still correctly
   1261	 * serialised here.
   1262	 */
   1263	up_read(&cil->xc_ctx_lock);
   1264
   1265	/*
   1266	 * If we are well over the space limit, throttle the work that is being
   1267	 * done until the push work on this context has begun. Enforce the hard
   1268	 * throttle on all transaction commits once it has been activated, even
   1269	 * if the committing transactions have resulted in the space usage
   1270	 * dipping back down under the hard limit.
   1271	 *
   1272	 * The ctx->xc_push_lock provides the serialisation necessary for safely
   1273	 * using the lockless waitqueue_active() check in this context.
   1274	 */
   1275	if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
   1276	    waitqueue_active(&cil->xc_push_wait)) {
   1277		trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
   1278		ASSERT(cil->xc_ctx->space_used < log->l_logsize);
   1279		xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
   1280		return;
   1281	}
   1282
   1283	spin_unlock(&cil->xc_push_lock);
   1284
   1285}
   1286
   1287/*
   1288 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence
   1289 * number that is passed. When it returns, the work will be queued for
   1290 * @push_seq, but it won't be completed.
   1291 *
   1292 * If the caller is performing a synchronous force, we will flush the workqueue
   1293 * to get previously queued work moving to minimise the wait time they will
   1294 * undergo waiting for all outstanding pushes to complete. The caller is
   1295 * expected to do the required waiting for push_seq to complete.
   1296 *
   1297 * If the caller is performing an async push, we need to ensure that the
   1298 * checkpoint is fully flushed out of the iclogs when we finish the push. If we
   1299 * don't do this, then the commit record may remain sitting in memory in an
   1300 * ACTIVE iclog. This then requires another full log force to push to disk,
   1301 * which defeats the purpose of having an async, non-blocking CIL force
   1302 * mechanism. Hence in this case we need to pass a flag to the push work to
   1303 * indicate it needs to flush the commit record itself.
   1304 */
   1305static void
   1306xlog_cil_push_now(
   1307	struct xlog	*log,
   1308	xfs_lsn_t	push_seq,
   1309	bool		async)
   1310{
   1311	struct xfs_cil	*cil = log->l_cilp;
   1312
   1313	if (!cil)
   1314		return;
   1315
   1316	ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
   1317
   1318	/* start on any pending background push to minimise wait time on it */
   1319	if (!async)
   1320		flush_workqueue(cil->xc_push_wq);
   1321
   1322	spin_lock(&cil->xc_push_lock);
   1323
   1324	/*
   1325	 * If this is an async flush request, we always need to set the
   1326	 * xc_push_commit_stable flag even if something else has already queued
   1327	 * a push. The flush caller is asking for the CIL to be on stable
   1328	 * storage when the next push completes, so regardless of who has queued
   1329	 * the push, the flush requires stable semantics from it.
   1330	 */
   1331	cil->xc_push_commit_stable = async;
   1332
   1333	/*
   1334	 * If the CIL is empty or we've already pushed the sequence then
   1335	 * there's no more work that we need to do.
   1336	 */
   1337	if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
   1338		spin_unlock(&cil->xc_push_lock);
   1339		return;
   1340	}
   1341
   1342	cil->xc_push_seq = push_seq;
   1343	queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work);
   1344	spin_unlock(&cil->xc_push_lock);
   1345}
   1346
   1347bool
   1348xlog_cil_empty(
   1349	struct xlog	*log)
   1350{
   1351	struct xfs_cil	*cil = log->l_cilp;
   1352	bool		empty = false;
   1353
   1354	spin_lock(&cil->xc_push_lock);
   1355	if (list_empty(&cil->xc_cil))
   1356		empty = true;
   1357	spin_unlock(&cil->xc_push_lock);
   1358	return empty;
   1359}
   1360
   1361/*
   1362 * If there are intent done items in this transaction and the related intent was
   1363 * committed in the current (same) CIL checkpoint, we don't need to write either
   1364 * the intent or intent done item to the journal as the change will be
   1365 * journalled atomically within this checkpoint. As we cannot remove items from
   1366 * the CIL here, mark the related intent with a whiteout so that the CIL push
   1367 * can remove it rather than writing it to the journal. Then remove the intent
   1368 * done item from the current transaction and release it so it doesn't get put
   1369 * into the CIL at all.
   1370 */
   1371static uint32_t
   1372xlog_cil_process_intents(
   1373	struct xfs_cil		*cil,
   1374	struct xfs_trans	*tp)
   1375{
   1376	struct xfs_log_item	*lip, *ilip, *next;
   1377	uint32_t		len = 0;
   1378
   1379	list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
   1380		if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE))
   1381			continue;
   1382
   1383		ilip = lip->li_ops->iop_intent(lip);
   1384		if (!ilip || !xlog_item_in_current_chkpt(cil, ilip))
   1385			continue;
   1386		set_bit(XFS_LI_WHITEOUT, &ilip->li_flags);
   1387		trace_xfs_cil_whiteout_mark(ilip);
   1388		len += ilip->li_lv->lv_bytes;
   1389		kmem_free(ilip->li_lv);
   1390		ilip->li_lv = NULL;
   1391
   1392		xfs_trans_del_item(lip);
   1393		lip->li_ops->iop_release(lip);
   1394	}
   1395	return len;
   1396}
   1397
   1398/*
   1399 * Commit a transaction with the given vector to the Committed Item List.
   1400 *
   1401 * To do this, we need to format the item, pin it in memory if required and
   1402 * account for the space used by the transaction. Once we have done that we
   1403 * need to release the unused reservation for the transaction, attach the
   1404 * transaction to the checkpoint context so we carry the busy extents through
   1405 * to checkpoint completion, and then unlock all the items in the transaction.
   1406 *
   1407 * Called with the context lock already held in read mode to lock out
   1408 * background commit, returns without it held once background commits are
   1409 * allowed again.
   1410 */
   1411void
   1412xlog_cil_commit(
   1413	struct xlog		*log,
   1414	struct xfs_trans	*tp,
   1415	xfs_csn_t		*commit_seq,
   1416	bool			regrant)
   1417{
   1418	struct xfs_cil		*cil = log->l_cilp;
   1419	struct xfs_log_item	*lip, *next;
   1420	uint32_t		released_space = 0;
   1421
   1422	/*
   1423	 * Do all necessary memory allocation before we lock the CIL.
   1424	 * This ensures the allocation does not deadlock with a CIL
   1425	 * push in memory reclaim (e.g. from kswapd).
   1426	 */
   1427	xlog_cil_alloc_shadow_bufs(log, tp);
   1428
   1429	/* lock out background commit */
   1430	down_read(&cil->xc_ctx_lock);
   1431
   1432	if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE)
   1433		released_space = xlog_cil_process_intents(cil, tp);
   1434
   1435	xlog_cil_insert_items(log, tp, released_space);
   1436
   1437	if (regrant && !xlog_is_shutdown(log))
   1438		xfs_log_ticket_regrant(log, tp->t_ticket);
   1439	else
   1440		xfs_log_ticket_ungrant(log, tp->t_ticket);
   1441	tp->t_ticket = NULL;
   1442	xfs_trans_unreserve_and_mod_sb(tp);
   1443
   1444	/*
   1445	 * Once all the items of the transaction have been copied to the CIL,
   1446	 * the items can be unlocked and possibly freed.
   1447	 *
   1448	 * This needs to be done before we drop the CIL context lock because we
   1449	 * have to update state in the log items and unlock them before they go
   1450	 * to disk. If we don't, then the CIL checkpoint can race with us and
   1451	 * we can run checkpoint completion before we've updated and unlocked
   1452	 * the log items. This affects (at least) processing of stale buffers,
   1453	 * inodes and EFIs.
   1454	 */
   1455	trace_xfs_trans_commit_items(tp, _RET_IP_);
   1456	list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
   1457		xfs_trans_del_item(lip);
   1458		if (lip->li_ops->iop_committing)
   1459			lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence);
   1460	}
   1461	if (commit_seq)
   1462		*commit_seq = cil->xc_ctx->sequence;
   1463
   1464	/* xlog_cil_push_background() releases cil->xc_ctx_lock */
   1465	xlog_cil_push_background(log);
   1466}
   1467
   1468/*
   1469 * Flush the CIL to stable storage but don't wait for it to complete. This
   1470 * requires the CIL push to ensure the commit record for the push hits the disk,
   1471 * but otherwise is no different to a push done from a log force.
   1472 */
   1473void
   1474xlog_cil_flush(
   1475	struct xlog	*log)
   1476{
   1477	xfs_csn_t	seq = log->l_cilp->xc_current_sequence;
   1478
   1479	trace_xfs_log_force(log->l_mp, seq, _RET_IP_);
   1480	xlog_cil_push_now(log, seq, true);
   1481
   1482	/*
   1483	 * If the CIL is empty, make sure that any previous checkpoint that may
   1484	 * still be in an active iclog is pushed to stable storage.
   1485	 */
   1486	if (list_empty(&log->l_cilp->xc_cil))
   1487		xfs_log_force(log->l_mp, 0);
   1488}
   1489
   1490/*
   1491 * Conditionally push the CIL based on the sequence passed in.
   1492 *
   1493 * We only need to push if we haven't already pushed the sequence number given.
   1494 * Hence the only time we will trigger a push here is if the push sequence is
   1495 * the same as the current context.
   1496 *
   1497 * We return the current commit lsn to allow the callers to determine if a
   1498 * iclog flush is necessary following this call.
   1499 */
   1500xfs_lsn_t
   1501xlog_cil_force_seq(
   1502	struct xlog	*log,
   1503	xfs_csn_t	sequence)
   1504{
   1505	struct xfs_cil		*cil = log->l_cilp;
   1506	struct xfs_cil_ctx	*ctx;
   1507	xfs_lsn_t		commit_lsn = NULLCOMMITLSN;
   1508
   1509	ASSERT(sequence <= cil->xc_current_sequence);
   1510
   1511	if (!sequence)
   1512		sequence = cil->xc_current_sequence;
   1513	trace_xfs_log_force(log->l_mp, sequence, _RET_IP_);
   1514
   1515	/*
   1516	 * check to see if we need to force out the current context.
   1517	 * xlog_cil_push() handles racing pushes for the same sequence,
   1518	 * so no need to deal with it here.
   1519	 */
   1520restart:
   1521	xlog_cil_push_now(log, sequence, false);
   1522
   1523	/*
   1524	 * See if we can find a previous sequence still committing.
   1525	 * We need to wait for all previous sequence commits to complete
   1526	 * before allowing the force of push_seq to go ahead. Hence block
   1527	 * on commits for those as well.
   1528	 */
   1529	spin_lock(&cil->xc_push_lock);
   1530	list_for_each_entry(ctx, &cil->xc_committing, committing) {
   1531		/*
   1532		 * Avoid getting stuck in this loop because we were woken by the
   1533		 * shutdown, but then went back to sleep once already in the
   1534		 * shutdown state.
   1535		 */
   1536		if (xlog_is_shutdown(log))
   1537			goto out_shutdown;
   1538		if (ctx->sequence > sequence)
   1539			continue;
   1540		if (!ctx->commit_lsn) {
   1541			/*
   1542			 * It is still being pushed! Wait for the push to
   1543			 * complete, then start again from the beginning.
   1544			 */
   1545			XFS_STATS_INC(log->l_mp, xs_log_force_sleep);
   1546			xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
   1547			goto restart;
   1548		}
   1549		if (ctx->sequence != sequence)
   1550			continue;
   1551		/* found it! */
   1552		commit_lsn = ctx->commit_lsn;
   1553	}
   1554
   1555	/*
   1556	 * The call to xlog_cil_push_now() executes the push in the background.
   1557	 * Hence by the time we have got here it our sequence may not have been
   1558	 * pushed yet. This is true if the current sequence still matches the
   1559	 * push sequence after the above wait loop and the CIL still contains
   1560	 * dirty objects. This is guaranteed by the push code first adding the
   1561	 * context to the committing list before emptying the CIL.
   1562	 *
   1563	 * Hence if we don't find the context in the committing list and the
   1564	 * current sequence number is unchanged then the CIL contents are
   1565	 * significant.  If the CIL is empty, if means there was nothing to push
   1566	 * and that means there is nothing to wait for. If the CIL is not empty,
   1567	 * it means we haven't yet started the push, because if it had started
   1568	 * we would have found the context on the committing list.
   1569	 */
   1570	if (sequence == cil->xc_current_sequence &&
   1571	    !list_empty(&cil->xc_cil)) {
   1572		spin_unlock(&cil->xc_push_lock);
   1573		goto restart;
   1574	}
   1575
   1576	spin_unlock(&cil->xc_push_lock);
   1577	return commit_lsn;
   1578
   1579	/*
   1580	 * We detected a shutdown in progress. We need to trigger the log force
   1581	 * to pass through it's iclog state machine error handling, even though
   1582	 * we are already in a shutdown state. Hence we can't return
   1583	 * NULLCOMMITLSN here as that has special meaning to log forces (i.e.
   1584	 * LSN is already stable), so we return a zero LSN instead.
   1585	 */
   1586out_shutdown:
   1587	spin_unlock(&cil->xc_push_lock);
   1588	return 0;
   1589}
   1590
   1591/*
   1592 * Perform initial CIL structure initialisation.
   1593 */
   1594int
   1595xlog_cil_init(
   1596	struct xlog	*log)
   1597{
   1598	struct xfs_cil	*cil;
   1599	struct xfs_cil_ctx *ctx;
   1600
   1601	cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
   1602	if (!cil)
   1603		return -ENOMEM;
   1604	/*
   1605	 * Limit the CIL pipeline depth to 4 concurrent works to bound the
   1606	 * concurrency the log spinlocks will be exposed to.
   1607	 */
   1608	cil->xc_push_wq = alloc_workqueue("xfs-cil/%s",
   1609			XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND),
   1610			4, log->l_mp->m_super->s_id);
   1611	if (!cil->xc_push_wq)
   1612		goto out_destroy_cil;
   1613
   1614	INIT_LIST_HEAD(&cil->xc_cil);
   1615	INIT_LIST_HEAD(&cil->xc_committing);
   1616	spin_lock_init(&cil->xc_cil_lock);
   1617	spin_lock_init(&cil->xc_push_lock);
   1618	init_waitqueue_head(&cil->xc_push_wait);
   1619	init_rwsem(&cil->xc_ctx_lock);
   1620	init_waitqueue_head(&cil->xc_start_wait);
   1621	init_waitqueue_head(&cil->xc_commit_wait);
   1622	cil->xc_log = log;
   1623	log->l_cilp = cil;
   1624
   1625	ctx = xlog_cil_ctx_alloc();
   1626	xlog_cil_ctx_switch(cil, ctx);
   1627
   1628	return 0;
   1629
   1630out_destroy_cil:
   1631	kmem_free(cil);
   1632	return -ENOMEM;
   1633}
   1634
   1635void
   1636xlog_cil_destroy(
   1637	struct xlog	*log)
   1638{
   1639	if (log->l_cilp->xc_ctx) {
   1640		if (log->l_cilp->xc_ctx->ticket)
   1641			xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
   1642		kmem_free(log->l_cilp->xc_ctx);
   1643	}
   1644
   1645	ASSERT(list_empty(&log->l_cilp->xc_cil));
   1646	destroy_workqueue(log->l_cilp->xc_push_wq);
   1647	kmem_free(log->l_cilp);
   1648}
   1649