cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

drbd_req.c (56759B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3   drbd_req.c
      4
      5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
      6
      7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
      8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
      9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
     10
     11
     12 */
     13
     14#include <linux/module.h>
     15
     16#include <linux/slab.h>
     17#include <linux/drbd.h>
     18#include "drbd_int.h"
     19#include "drbd_req.h"
     20
     21
     22static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
     23
     24static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
     25{
     26	struct drbd_request *req;
     27
     28	req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
     29	if (!req)
     30		return NULL;
     31	memset(req, 0, sizeof(*req));
     32
     33	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev, bio_src,
     34					   GFP_NOIO, &drbd_io_bio_set);
     35	req->private_bio->bi_private = req;
     36	req->private_bio->bi_end_io = drbd_request_endio;
     37
     38	req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
     39		      | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
     40		      | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
     41	req->device = device;
     42	req->master_bio = bio_src;
     43	req->epoch = 0;
     44
     45	drbd_clear_interval(&req->i);
     46	req->i.sector     = bio_src->bi_iter.bi_sector;
     47	req->i.size      = bio_src->bi_iter.bi_size;
     48	req->i.local = true;
     49	req->i.waiting = false;
     50
     51	INIT_LIST_HEAD(&req->tl_requests);
     52	INIT_LIST_HEAD(&req->w.list);
     53	INIT_LIST_HEAD(&req->req_pending_master_completion);
     54	INIT_LIST_HEAD(&req->req_pending_local);
     55
     56	/* one reference to be put by __drbd_make_request */
     57	atomic_set(&req->completion_ref, 1);
     58	/* one kref as long as completion_ref > 0 */
     59	kref_init(&req->kref);
     60	return req;
     61}
     62
     63static void drbd_remove_request_interval(struct rb_root *root,
     64					 struct drbd_request *req)
     65{
     66	struct drbd_device *device = req->device;
     67	struct drbd_interval *i = &req->i;
     68
     69	drbd_remove_interval(root, i);
     70
     71	/* Wake up any processes waiting for this request to complete.  */
     72	if (i->waiting)
     73		wake_up(&device->misc_wait);
     74}
     75
     76void drbd_req_destroy(struct kref *kref)
     77{
     78	struct drbd_request *req = container_of(kref, struct drbd_request, kref);
     79	struct drbd_device *device = req->device;
     80	const unsigned s = req->rq_state;
     81
     82	if ((req->master_bio && !(s & RQ_POSTPONED)) ||
     83		atomic_read(&req->completion_ref) ||
     84		(s & RQ_LOCAL_PENDING) ||
     85		((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
     86		drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
     87				s, atomic_read(&req->completion_ref));
     88		return;
     89	}
     90
     91	/* If called from mod_rq_state (expected normal case) or
     92	 * drbd_send_and_submit (the less likely normal path), this holds the
     93	 * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
     94	 * though it may be still empty (never added to the transfer log).
     95	 *
     96	 * If called from do_retry(), we do NOT hold the req_lock, but we are
     97	 * still allowed to unconditionally list_del(&req->tl_requests),
     98	 * because it will be on a local on-stack list only. */
     99	list_del_init(&req->tl_requests);
    100
    101	/* finally remove the request from the conflict detection
    102	 * respective block_id verification interval tree. */
    103	if (!drbd_interval_empty(&req->i)) {
    104		struct rb_root *root;
    105
    106		if (s & RQ_WRITE)
    107			root = &device->write_requests;
    108		else
    109			root = &device->read_requests;
    110		drbd_remove_request_interval(root, req);
    111	} else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
    112		drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
    113			s, (unsigned long long)req->i.sector, req->i.size);
    114
    115	/* if it was a write, we may have to set the corresponding
    116	 * bit(s) out-of-sync first. If it had a local part, we need to
    117	 * release the reference to the activity log. */
    118	if (s & RQ_WRITE) {
    119		/* Set out-of-sync unless both OK flags are set
    120		 * (local only or remote failed).
    121		 * Other places where we set out-of-sync:
    122		 * READ with local io-error */
    123
    124		/* There is a special case:
    125		 * we may notice late that IO was suspended,
    126		 * and postpone, or schedule for retry, a write,
    127		 * before it even was submitted or sent.
    128		 * In that case we do not want to touch the bitmap at all.
    129		 */
    130		if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
    131			if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
    132				drbd_set_out_of_sync(device, req->i.sector, req->i.size);
    133
    134			if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
    135				drbd_set_in_sync(device, req->i.sector, req->i.size);
    136		}
    137
    138		/* one might be tempted to move the drbd_al_complete_io
    139		 * to the local io completion callback drbd_request_endio.
    140		 * but, if this was a mirror write, we may only
    141		 * drbd_al_complete_io after this is RQ_NET_DONE,
    142		 * otherwise the extent could be dropped from the al
    143		 * before it has actually been written on the peer.
    144		 * if we crash before our peer knows about the request,
    145		 * but after the extent has been dropped from the al,
    146		 * we would forget to resync the corresponding extent.
    147		 */
    148		if (s & RQ_IN_ACT_LOG) {
    149			if (get_ldev_if_state(device, D_FAILED)) {
    150				drbd_al_complete_io(device, &req->i);
    151				put_ldev(device);
    152			} else if (__ratelimit(&drbd_ratelimit_state)) {
    153				drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
    154					 "but my Disk seems to have failed :(\n",
    155					 (unsigned long long) req->i.sector, req->i.size);
    156			}
    157		}
    158	}
    159
    160	mempool_free(req, &drbd_request_mempool);
    161}
    162
    163static void wake_all_senders(struct drbd_connection *connection)
    164{
    165	wake_up(&connection->sender_work.q_wait);
    166}
    167
    168/* must hold resource->req_lock */
    169void start_new_tl_epoch(struct drbd_connection *connection)
    170{
    171	/* no point closing an epoch, if it is empty, anyways. */
    172	if (connection->current_tle_writes == 0)
    173		return;
    174
    175	connection->current_tle_writes = 0;
    176	atomic_inc(&connection->current_tle_nr);
    177	wake_all_senders(connection);
    178}
    179
    180void complete_master_bio(struct drbd_device *device,
    181		struct bio_and_error *m)
    182{
    183	if (unlikely(m->error))
    184		m->bio->bi_status = errno_to_blk_status(m->error);
    185	bio_endio(m->bio);
    186	dec_ap_bio(device);
    187}
    188
    189
    190/* Helper for __req_mod().
    191 * Set m->bio to the master bio, if it is fit to be completed,
    192 * or leave it alone (it is initialized to NULL in __req_mod),
    193 * if it has already been completed, or cannot be completed yet.
    194 * If m->bio is set, the error status to be returned is placed in m->error.
    195 */
    196static
    197void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
    198{
    199	const unsigned s = req->rq_state;
    200	struct drbd_device *device = req->device;
    201	int error, ok;
    202
    203	/* we must not complete the master bio, while it is
    204	 *	still being processed by _drbd_send_zc_bio (drbd_send_dblock)
    205	 *	not yet acknowledged by the peer
    206	 *	not yet completed by the local io subsystem
    207	 * these flags may get cleared in any order by
    208	 *	the worker,
    209	 *	the receiver,
    210	 *	the bio_endio completion callbacks.
    211	 */
    212	if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
    213	    (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
    214	    (s & RQ_COMPLETION_SUSP)) {
    215		drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
    216		return;
    217	}
    218
    219	if (!req->master_bio) {
    220		drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
    221		return;
    222	}
    223
    224	/*
    225	 * figure out whether to report success or failure.
    226	 *
    227	 * report success when at least one of the operations succeeded.
    228	 * or, to put the other way,
    229	 * only report failure, when both operations failed.
    230	 *
    231	 * what to do about the failures is handled elsewhere.
    232	 * what we need to do here is just: complete the master_bio.
    233	 *
    234	 * local completion error, if any, has been stored as ERR_PTR
    235	 * in private_bio within drbd_request_endio.
    236	 */
    237	ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
    238	error = PTR_ERR(req->private_bio);
    239
    240	/* Before we can signal completion to the upper layers,
    241	 * we may need to close the current transfer log epoch.
    242	 * We are within the request lock, so we can simply compare
    243	 * the request epoch number with the current transfer log
    244	 * epoch number.  If they match, increase the current_tle_nr,
    245	 * and reset the transfer log epoch write_cnt.
    246	 */
    247	if (op_is_write(bio_op(req->master_bio)) &&
    248	    req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
    249		start_new_tl_epoch(first_peer_device(device)->connection);
    250
    251	/* Update disk stats */
    252	bio_end_io_acct(req->master_bio, req->start_jif);
    253
    254	/* If READ failed,
    255	 * have it be pushed back to the retry work queue,
    256	 * so it will re-enter __drbd_make_request(),
    257	 * and be re-assigned to a suitable local or remote path,
    258	 * or failed if we do not have access to good data anymore.
    259	 *
    260	 * Unless it was failed early by __drbd_make_request(),
    261	 * because no path was available, in which case
    262	 * it was not even added to the transfer_log.
    263	 *
    264	 * read-ahead may fail, and will not be retried.
    265	 *
    266	 * WRITE should have used all available paths already.
    267	 */
    268	if (!ok &&
    269	    bio_op(req->master_bio) == REQ_OP_READ &&
    270	    !(req->master_bio->bi_opf & REQ_RAHEAD) &&
    271	    !list_empty(&req->tl_requests))
    272		req->rq_state |= RQ_POSTPONED;
    273
    274	if (!(req->rq_state & RQ_POSTPONED)) {
    275		m->error = ok ? 0 : (error ?: -EIO);
    276		m->bio = req->master_bio;
    277		req->master_bio = NULL;
    278		/* We leave it in the tree, to be able to verify later
    279		 * write-acks in protocol != C during resync.
    280		 * But we mark it as "complete", so it won't be counted as
    281		 * conflict in a multi-primary setup. */
    282		req->i.completed = true;
    283	}
    284
    285	if (req->i.waiting)
    286		wake_up(&device->misc_wait);
    287
    288	/* Either we are about to complete to upper layers,
    289	 * or we will restart this request.
    290	 * In either case, the request object will be destroyed soon,
    291	 * so better remove it from all lists. */
    292	list_del_init(&req->req_pending_master_completion);
    293}
    294
    295/* still holds resource->req_lock */
    296static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
    297{
    298	struct drbd_device *device = req->device;
    299	D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
    300
    301	if (!put)
    302		return;
    303
    304	if (!atomic_sub_and_test(put, &req->completion_ref))
    305		return;
    306
    307	drbd_req_complete(req, m);
    308
    309	/* local completion may still come in later,
    310	 * we need to keep the req object around. */
    311	if (req->rq_state & RQ_LOCAL_ABORTED)
    312		return;
    313
    314	if (req->rq_state & RQ_POSTPONED) {
    315		/* don't destroy the req object just yet,
    316		 * but queue it for retry */
    317		drbd_restart_request(req);
    318		return;
    319	}
    320
    321	kref_put(&req->kref, drbd_req_destroy);
    322}
    323
    324static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
    325{
    326	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    327	if (!connection)
    328		return;
    329	if (connection->req_next == NULL)
    330		connection->req_next = req;
    331}
    332
    333static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
    334{
    335	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    336	struct drbd_request *iter = req;
    337	if (!connection)
    338		return;
    339	if (connection->req_next != req)
    340		return;
    341
    342	req = NULL;
    343	list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
    344		const unsigned int s = iter->rq_state;
    345
    346		if (s & RQ_NET_QUEUED) {
    347			req = iter;
    348			break;
    349		}
    350	}
    351	connection->req_next = req;
    352}
    353
    354static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
    355{
    356	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    357	if (!connection)
    358		return;
    359	if (connection->req_ack_pending == NULL)
    360		connection->req_ack_pending = req;
    361}
    362
    363static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
    364{
    365	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    366	struct drbd_request *iter = req;
    367	if (!connection)
    368		return;
    369	if (connection->req_ack_pending != req)
    370		return;
    371
    372	req = NULL;
    373	list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
    374		const unsigned int s = iter->rq_state;
    375
    376		if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
    377			req = iter;
    378			break;
    379		}
    380	}
    381	connection->req_ack_pending = req;
    382}
    383
    384static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
    385{
    386	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    387	if (!connection)
    388		return;
    389	if (connection->req_not_net_done == NULL)
    390		connection->req_not_net_done = req;
    391}
    392
    393static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
    394{
    395	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
    396	struct drbd_request *iter = req;
    397	if (!connection)
    398		return;
    399	if (connection->req_not_net_done != req)
    400		return;
    401
    402	req = NULL;
    403	list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
    404		const unsigned int s = iter->rq_state;
    405
    406		if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
    407			req = iter;
    408			break;
    409		}
    410	}
    411	connection->req_not_net_done = req;
    412}
    413
    414/* I'd like this to be the only place that manipulates
    415 * req->completion_ref and req->kref. */
    416static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
    417		int clear, int set)
    418{
    419	struct drbd_device *device = req->device;
    420	struct drbd_peer_device *peer_device = first_peer_device(device);
    421	unsigned s = req->rq_state;
    422	int c_put = 0;
    423
    424	if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
    425		set |= RQ_COMPLETION_SUSP;
    426
    427	/* apply */
    428
    429	req->rq_state &= ~clear;
    430	req->rq_state |= set;
    431
    432	/* no change? */
    433	if (req->rq_state == s)
    434		return;
    435
    436	/* intent: get references */
    437
    438	kref_get(&req->kref);
    439
    440	if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
    441		atomic_inc(&req->completion_ref);
    442
    443	if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
    444		inc_ap_pending(device);
    445		atomic_inc(&req->completion_ref);
    446	}
    447
    448	if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
    449		atomic_inc(&req->completion_ref);
    450		set_if_null_req_next(peer_device, req);
    451	}
    452
    453	if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
    454		kref_get(&req->kref); /* wait for the DONE */
    455
    456	if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
    457		/* potentially already completed in the ack_receiver thread */
    458		if (!(s & RQ_NET_DONE)) {
    459			atomic_add(req->i.size >> 9, &device->ap_in_flight);
    460			set_if_null_req_not_net_done(peer_device, req);
    461		}
    462		if (req->rq_state & RQ_NET_PENDING)
    463			set_if_null_req_ack_pending(peer_device, req);
    464	}
    465
    466	if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
    467		atomic_inc(&req->completion_ref);
    468
    469	/* progress: put references */
    470
    471	if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
    472		++c_put;
    473
    474	if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
    475		D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
    476		++c_put;
    477	}
    478
    479	if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
    480		if (req->rq_state & RQ_LOCAL_ABORTED)
    481			kref_put(&req->kref, drbd_req_destroy);
    482		else
    483			++c_put;
    484		list_del_init(&req->req_pending_local);
    485	}
    486
    487	if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
    488		dec_ap_pending(device);
    489		++c_put;
    490		req->acked_jif = jiffies;
    491		advance_conn_req_ack_pending(peer_device, req);
    492	}
    493
    494	if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
    495		++c_put;
    496		advance_conn_req_next(peer_device, req);
    497	}
    498
    499	if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
    500		if (s & RQ_NET_SENT)
    501			atomic_sub(req->i.size >> 9, &device->ap_in_flight);
    502		if (s & RQ_EXP_BARR_ACK)
    503			kref_put(&req->kref, drbd_req_destroy);
    504		req->net_done_jif = jiffies;
    505
    506		/* in ahead/behind mode, or just in case,
    507		 * before we finally destroy this request,
    508		 * the caching pointers must not reference it anymore */
    509		advance_conn_req_next(peer_device, req);
    510		advance_conn_req_ack_pending(peer_device, req);
    511		advance_conn_req_not_net_done(peer_device, req);
    512	}
    513
    514	/* potentially complete and destroy */
    515
    516	/* If we made progress, retry conflicting peer requests, if any. */
    517	if (req->i.waiting)
    518		wake_up(&device->misc_wait);
    519
    520	drbd_req_put_completion_ref(req, m, c_put);
    521	kref_put(&req->kref, drbd_req_destroy);
    522}
    523
    524static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
    525{
    526        char b[BDEVNAME_SIZE];
    527
    528	if (!__ratelimit(&drbd_ratelimit_state))
    529		return;
    530
    531	drbd_warn(device, "local %s IO error sector %llu+%u on %s\n",
    532			(req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
    533			(unsigned long long)req->i.sector,
    534			req->i.size >> 9,
    535			bdevname(device->ldev->backing_bdev, b));
    536}
    537
    538/* Helper for HANDED_OVER_TO_NETWORK.
    539 * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
    540 * Is it also still "PENDING"?
    541 * --> If so, clear PENDING and set NET_OK below.
    542 * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
    543 * (and we must not set RQ_NET_OK) */
    544static inline bool is_pending_write_protocol_A(struct drbd_request *req)
    545{
    546	return (req->rq_state &
    547		   (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
    548		== (RQ_WRITE|RQ_NET_PENDING);
    549}
    550
    551/* obviously this could be coded as many single functions
    552 * instead of one huge switch,
    553 * or by putting the code directly in the respective locations
    554 * (as it has been before).
    555 *
    556 * but having it this way
    557 *  enforces that it is all in this one place, where it is easier to audit,
    558 *  it makes it obvious that whatever "event" "happens" to a request should
    559 *  happen "atomically" within the req_lock,
    560 *  and it enforces that we have to think in a very structured manner
    561 *  about the "events" that may happen to a request during its life time ...
    562 */
    563int __req_mod(struct drbd_request *req, enum drbd_req_event what,
    564		struct bio_and_error *m)
    565{
    566	struct drbd_device *const device = req->device;
    567	struct drbd_peer_device *const peer_device = first_peer_device(device);
    568	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
    569	struct net_conf *nc;
    570	int p, rv = 0;
    571
    572	if (m)
    573		m->bio = NULL;
    574
    575	switch (what) {
    576	default:
    577		drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
    578		break;
    579
    580	/* does not happen...
    581	 * initialization done in drbd_req_new
    582	case CREATED:
    583		break;
    584		*/
    585
    586	case TO_BE_SENT: /* via network */
    587		/* reached via __drbd_make_request
    588		 * and from w_read_retry_remote */
    589		D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
    590		rcu_read_lock();
    591		nc = rcu_dereference(connection->net_conf);
    592		p = nc->wire_protocol;
    593		rcu_read_unlock();
    594		req->rq_state |=
    595			p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
    596			p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
    597		mod_rq_state(req, m, 0, RQ_NET_PENDING);
    598		break;
    599
    600	case TO_BE_SUBMITTED: /* locally */
    601		/* reached via __drbd_make_request */
    602		D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
    603		mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
    604		break;
    605
    606	case COMPLETED_OK:
    607		if (req->rq_state & RQ_WRITE)
    608			device->writ_cnt += req->i.size >> 9;
    609		else
    610			device->read_cnt += req->i.size >> 9;
    611
    612		mod_rq_state(req, m, RQ_LOCAL_PENDING,
    613				RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
    614		break;
    615
    616	case ABORT_DISK_IO:
    617		mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
    618		break;
    619
    620	case WRITE_COMPLETED_WITH_ERROR:
    621		drbd_report_io_error(device, req);
    622		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
    623		mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
    624		break;
    625
    626	case READ_COMPLETED_WITH_ERROR:
    627		drbd_set_out_of_sync(device, req->i.sector, req->i.size);
    628		drbd_report_io_error(device, req);
    629		__drbd_chk_io_error(device, DRBD_READ_ERROR);
    630		fallthrough;
    631	case READ_AHEAD_COMPLETED_WITH_ERROR:
    632		/* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
    633		mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
    634		break;
    635
    636	case DISCARD_COMPLETED_NOTSUPP:
    637	case DISCARD_COMPLETED_WITH_ERROR:
    638		/* I'd rather not detach from local disk just because it
    639		 * failed a REQ_OP_DISCARD. */
    640		mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
    641		break;
    642
    643	case QUEUE_FOR_NET_READ:
    644		/* READ, and
    645		 * no local disk,
    646		 * or target area marked as invalid,
    647		 * or just got an io-error. */
    648		/* from __drbd_make_request
    649		 * or from bio_endio during read io-error recovery */
    650
    651		/* So we can verify the handle in the answer packet.
    652		 * Corresponding drbd_remove_request_interval is in
    653		 * drbd_req_complete() */
    654		D_ASSERT(device, drbd_interval_empty(&req->i));
    655		drbd_insert_interval(&device->read_requests, &req->i);
    656
    657		set_bit(UNPLUG_REMOTE, &device->flags);
    658
    659		D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
    660		D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
    661		mod_rq_state(req, m, 0, RQ_NET_QUEUED);
    662		req->w.cb = w_send_read_req;
    663		drbd_queue_work(&connection->sender_work,
    664				&req->w);
    665		break;
    666
    667	case QUEUE_FOR_NET_WRITE:
    668		/* assert something? */
    669		/* from __drbd_make_request only */
    670
    671		/* Corresponding drbd_remove_request_interval is in
    672		 * drbd_req_complete() */
    673		D_ASSERT(device, drbd_interval_empty(&req->i));
    674		drbd_insert_interval(&device->write_requests, &req->i);
    675
    676		/* NOTE
    677		 * In case the req ended up on the transfer log before being
    678		 * queued on the worker, it could lead to this request being
    679		 * missed during cleanup after connection loss.
    680		 * So we have to do both operations here,
    681		 * within the same lock that protects the transfer log.
    682		 *
    683		 * _req_add_to_epoch(req); this has to be after the
    684		 * _maybe_start_new_epoch(req); which happened in
    685		 * __drbd_make_request, because we now may set the bit
    686		 * again ourselves to close the current epoch.
    687		 *
    688		 * Add req to the (now) current epoch (barrier). */
    689
    690		/* otherwise we may lose an unplug, which may cause some remote
    691		 * io-scheduler timeout to expire, increasing maximum latency,
    692		 * hurting performance. */
    693		set_bit(UNPLUG_REMOTE, &device->flags);
    694
    695		/* queue work item to send data */
    696		D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
    697		mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
    698		req->w.cb =  w_send_dblock;
    699		drbd_queue_work(&connection->sender_work,
    700				&req->w);
    701
    702		/* close the epoch, in case it outgrew the limit */
    703		rcu_read_lock();
    704		nc = rcu_dereference(connection->net_conf);
    705		p = nc->max_epoch_size;
    706		rcu_read_unlock();
    707		if (connection->current_tle_writes >= p)
    708			start_new_tl_epoch(connection);
    709
    710		break;
    711
    712	case QUEUE_FOR_SEND_OOS:
    713		mod_rq_state(req, m, 0, RQ_NET_QUEUED);
    714		req->w.cb =  w_send_out_of_sync;
    715		drbd_queue_work(&connection->sender_work,
    716				&req->w);
    717		break;
    718
    719	case READ_RETRY_REMOTE_CANCELED:
    720	case SEND_CANCELED:
    721	case SEND_FAILED:
    722		/* real cleanup will be done from tl_clear.  just update flags
    723		 * so it is no longer marked as on the worker queue */
    724		mod_rq_state(req, m, RQ_NET_QUEUED, 0);
    725		break;
    726
    727	case HANDED_OVER_TO_NETWORK:
    728		/* assert something? */
    729		if (is_pending_write_protocol_A(req))
    730			/* this is what is dangerous about protocol A:
    731			 * pretend it was successfully written on the peer. */
    732			mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
    733						RQ_NET_SENT|RQ_NET_OK);
    734		else
    735			mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
    736		/* It is still not yet RQ_NET_DONE until the
    737		 * corresponding epoch barrier got acked as well,
    738		 * so we know what to dirty on connection loss. */
    739		break;
    740
    741	case OOS_HANDED_TO_NETWORK:
    742		/* Was not set PENDING, no longer QUEUED, so is now DONE
    743		 * as far as this connection is concerned. */
    744		mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
    745		break;
    746
    747	case CONNECTION_LOST_WHILE_PENDING:
    748		/* transfer log cleanup after connection loss */
    749		mod_rq_state(req, m,
    750				RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
    751				RQ_NET_DONE);
    752		break;
    753
    754	case CONFLICT_RESOLVED:
    755		/* for superseded conflicting writes of multiple primaries,
    756		 * there is no need to keep anything in the tl, potential
    757		 * node crashes are covered by the activity log.
    758		 *
    759		 * If this request had been marked as RQ_POSTPONED before,
    760		 * it will actually not be completed, but "restarted",
    761		 * resubmitted from the retry worker context. */
    762		D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
    763		D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
    764		mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
    765		break;
    766
    767	case WRITE_ACKED_BY_PEER_AND_SIS:
    768		req->rq_state |= RQ_NET_SIS;
    769		fallthrough;
    770	case WRITE_ACKED_BY_PEER:
    771		/* Normal operation protocol C: successfully written on peer.
    772		 * During resync, even in protocol != C,
    773		 * we requested an explicit write ack anyways.
    774		 * Which means we cannot even assert anything here.
    775		 * Nothing more to do here.
    776		 * We want to keep the tl in place for all protocols, to cater
    777		 * for volatile write-back caches on lower level devices. */
    778		goto ack_common;
    779	case RECV_ACKED_BY_PEER:
    780		D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
    781		/* protocol B; pretends to be successfully written on peer.
    782		 * see also notes above in HANDED_OVER_TO_NETWORK about
    783		 * protocol != C */
    784	ack_common:
    785		mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
    786		break;
    787
    788	case POSTPONE_WRITE:
    789		D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
    790		/* If this node has already detected the write conflict, the
    791		 * worker will be waiting on misc_wait.  Wake it up once this
    792		 * request has completed locally.
    793		 */
    794		D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
    795		req->rq_state |= RQ_POSTPONED;
    796		if (req->i.waiting)
    797			wake_up(&device->misc_wait);
    798		/* Do not clear RQ_NET_PENDING. This request will make further
    799		 * progress via restart_conflicting_writes() or
    800		 * fail_postponed_requests(). Hopefully. */
    801		break;
    802
    803	case NEG_ACKED:
    804		mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
    805		break;
    806
    807	case FAIL_FROZEN_DISK_IO:
    808		if (!(req->rq_state & RQ_LOCAL_COMPLETED))
    809			break;
    810		mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
    811		break;
    812
    813	case RESTART_FROZEN_DISK_IO:
    814		if (!(req->rq_state & RQ_LOCAL_COMPLETED))
    815			break;
    816
    817		mod_rq_state(req, m,
    818				RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
    819				RQ_LOCAL_PENDING);
    820
    821		rv = MR_READ;
    822		if (bio_data_dir(req->master_bio) == WRITE)
    823			rv = MR_WRITE;
    824
    825		get_ldev(device); /* always succeeds in this call path */
    826		req->w.cb = w_restart_disk_io;
    827		drbd_queue_work(&connection->sender_work,
    828				&req->w);
    829		break;
    830
    831	case RESEND:
    832		/* Simply complete (local only) READs. */
    833		if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
    834			mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
    835			break;
    836		}
    837
    838		/* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
    839		   before the connection loss (B&C only); only P_BARRIER_ACK
    840		   (or the local completion?) was missing when we suspended.
    841		   Throwing them out of the TL here by pretending we got a BARRIER_ACK.
    842		   During connection handshake, we ensure that the peer was not rebooted. */
    843		if (!(req->rq_state & RQ_NET_OK)) {
    844			/* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync?
    845			 * in that case we must not set RQ_NET_PENDING. */
    846
    847			mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
    848			if (req->w.cb) {
    849				/* w.cb expected to be w_send_dblock, or w_send_read_req */
    850				drbd_queue_work(&connection->sender_work,
    851						&req->w);
    852				rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
    853			} /* else: FIXME can this happen? */
    854			break;
    855		}
    856		fallthrough;	/* to BARRIER_ACKED */
    857
    858	case BARRIER_ACKED:
    859		/* barrier ack for READ requests does not make sense */
    860		if (!(req->rq_state & RQ_WRITE))
    861			break;
    862
    863		if (req->rq_state & RQ_NET_PENDING) {
    864			/* barrier came in before all requests were acked.
    865			 * this is bad, because if the connection is lost now,
    866			 * we won't be able to clean them up... */
    867			drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
    868		}
    869		/* Allowed to complete requests, even while suspended.
    870		 * As this is called for all requests within a matching epoch,
    871		 * we need to filter, and only set RQ_NET_DONE for those that
    872		 * have actually been on the wire. */
    873		mod_rq_state(req, m, RQ_COMPLETION_SUSP,
    874				(req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
    875		break;
    876
    877	case DATA_RECEIVED:
    878		D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
    879		mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
    880		break;
    881
    882	case QUEUE_AS_DRBD_BARRIER:
    883		start_new_tl_epoch(connection);
    884		mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
    885		break;
    886	}
    887
    888	return rv;
    889}
    890
    891/* we may do a local read if:
    892 * - we are consistent (of course),
    893 * - or we are generally inconsistent,
    894 *   BUT we are still/already IN SYNC for this area.
    895 *   since size may be bigger than BM_BLOCK_SIZE,
    896 *   we may need to check several bits.
    897 */
    898static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
    899{
    900	unsigned long sbnr, ebnr;
    901	sector_t esector, nr_sectors;
    902
    903	if (device->state.disk == D_UP_TO_DATE)
    904		return true;
    905	if (device->state.disk != D_INCONSISTENT)
    906		return false;
    907	esector = sector + (size >> 9) - 1;
    908	nr_sectors = get_capacity(device->vdisk);
    909	D_ASSERT(device, sector  < nr_sectors);
    910	D_ASSERT(device, esector < nr_sectors);
    911
    912	sbnr = BM_SECT_TO_BIT(sector);
    913	ebnr = BM_SECT_TO_BIT(esector);
    914
    915	return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
    916}
    917
    918static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
    919		enum drbd_read_balancing rbm)
    920{
    921	int stripe_shift;
    922
    923	switch (rbm) {
    924	case RB_CONGESTED_REMOTE:
    925		return false;
    926	case RB_LEAST_PENDING:
    927		return atomic_read(&device->local_cnt) >
    928			atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
    929	case RB_32K_STRIPING:  /* stripe_shift = 15 */
    930	case RB_64K_STRIPING:
    931	case RB_128K_STRIPING:
    932	case RB_256K_STRIPING:
    933	case RB_512K_STRIPING:
    934	case RB_1M_STRIPING:   /* stripe_shift = 20 */
    935		stripe_shift = (rbm - RB_32K_STRIPING + 15);
    936		return (sector >> (stripe_shift - 9)) & 1;
    937	case RB_ROUND_ROBIN:
    938		return test_and_change_bit(READ_BALANCE_RR, &device->flags);
    939	case RB_PREFER_REMOTE:
    940		return true;
    941	case RB_PREFER_LOCAL:
    942	default:
    943		return false;
    944	}
    945}
    946
    947/*
    948 * complete_conflicting_writes  -  wait for any conflicting write requests
    949 *
    950 * The write_requests tree contains all active write requests which we
    951 * currently know about.  Wait for any requests to complete which conflict with
    952 * the new one.
    953 *
    954 * Only way out: remove the conflicting intervals from the tree.
    955 */
    956static void complete_conflicting_writes(struct drbd_request *req)
    957{
    958	DEFINE_WAIT(wait);
    959	struct drbd_device *device = req->device;
    960	struct drbd_interval *i;
    961	sector_t sector = req->i.sector;
    962	int size = req->i.size;
    963
    964	for (;;) {
    965		drbd_for_each_overlap(i, &device->write_requests, sector, size) {
    966			/* Ignore, if already completed to upper layers. */
    967			if (i->completed)
    968				continue;
    969			/* Handle the first found overlap.  After the schedule
    970			 * we have to restart the tree walk. */
    971			break;
    972		}
    973		if (!i)	/* if any */
    974			break;
    975
    976		/* Indicate to wake up device->misc_wait on progress.  */
    977		prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
    978		i->waiting = true;
    979		spin_unlock_irq(&device->resource->req_lock);
    980		schedule();
    981		spin_lock_irq(&device->resource->req_lock);
    982	}
    983	finish_wait(&device->misc_wait, &wait);
    984}
    985
    986/* called within req_lock */
    987static void maybe_pull_ahead(struct drbd_device *device)
    988{
    989	struct drbd_connection *connection = first_peer_device(device)->connection;
    990	struct net_conf *nc;
    991	bool congested = false;
    992	enum drbd_on_congestion on_congestion;
    993
    994	rcu_read_lock();
    995	nc = rcu_dereference(connection->net_conf);
    996	on_congestion = nc ? nc->on_congestion : OC_BLOCK;
    997	rcu_read_unlock();
    998	if (on_congestion == OC_BLOCK ||
    999	    connection->agreed_pro_version < 96)
   1000		return;
   1001
   1002	if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
   1003		return; /* nothing to do ... */
   1004
   1005	/* If I don't even have good local storage, we can not reasonably try
   1006	 * to pull ahead of the peer. We also need the local reference to make
   1007	 * sure device->act_log is there.
   1008	 */
   1009	if (!get_ldev_if_state(device, D_UP_TO_DATE))
   1010		return;
   1011
   1012	if (nc->cong_fill &&
   1013	    atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
   1014		drbd_info(device, "Congestion-fill threshold reached\n");
   1015		congested = true;
   1016	}
   1017
   1018	if (device->act_log->used >= nc->cong_extents) {
   1019		drbd_info(device, "Congestion-extents threshold reached\n");
   1020		congested = true;
   1021	}
   1022
   1023	if (congested) {
   1024		/* start a new epoch for non-mirrored writes */
   1025		start_new_tl_epoch(first_peer_device(device)->connection);
   1026
   1027		if (on_congestion == OC_PULL_AHEAD)
   1028			_drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
   1029		else  /*nc->on_congestion == OC_DISCONNECT */
   1030			_drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
   1031	}
   1032	put_ldev(device);
   1033}
   1034
   1035/* If this returns false, and req->private_bio is still set,
   1036 * this should be submitted locally.
   1037 *
   1038 * If it returns false, but req->private_bio is not set,
   1039 * we do not have access to good data :(
   1040 *
   1041 * Otherwise, this destroys req->private_bio, if any,
   1042 * and returns true.
   1043 */
   1044static bool do_remote_read(struct drbd_request *req)
   1045{
   1046	struct drbd_device *device = req->device;
   1047	enum drbd_read_balancing rbm;
   1048
   1049	if (req->private_bio) {
   1050		if (!drbd_may_do_local_read(device,
   1051					req->i.sector, req->i.size)) {
   1052			bio_put(req->private_bio);
   1053			req->private_bio = NULL;
   1054			put_ldev(device);
   1055		}
   1056	}
   1057
   1058	if (device->state.pdsk != D_UP_TO_DATE)
   1059		return false;
   1060
   1061	if (req->private_bio == NULL)
   1062		return true;
   1063
   1064	/* TODO: improve read balancing decisions, take into account drbd
   1065	 * protocol, pending requests etc. */
   1066
   1067	rcu_read_lock();
   1068	rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
   1069	rcu_read_unlock();
   1070
   1071	if (rbm == RB_PREFER_LOCAL && req->private_bio)
   1072		return false; /* submit locally */
   1073
   1074	if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
   1075		if (req->private_bio) {
   1076			bio_put(req->private_bio);
   1077			req->private_bio = NULL;
   1078			put_ldev(device);
   1079		}
   1080		return true;
   1081	}
   1082
   1083	return false;
   1084}
   1085
   1086bool drbd_should_do_remote(union drbd_dev_state s)
   1087{
   1088	return s.pdsk == D_UP_TO_DATE ||
   1089		(s.pdsk >= D_INCONSISTENT &&
   1090		 s.conn >= C_WF_BITMAP_T &&
   1091		 s.conn < C_AHEAD);
   1092	/* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T.
   1093	   That is equivalent since before 96 IO was frozen in the C_WF_BITMAP*
   1094	   states. */
   1095}
   1096
   1097static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
   1098{
   1099	return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
   1100	/* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
   1101	   since we enter state C_AHEAD only if proto >= 96 */
   1102}
   1103
   1104/* returns number of connections (== 1, for drbd 8.4)
   1105 * expected to actually write this data,
   1106 * which does NOT include those that we are L_AHEAD for. */
   1107static int drbd_process_write_request(struct drbd_request *req)
   1108{
   1109	struct drbd_device *device = req->device;
   1110	int remote, send_oos;
   1111
   1112	remote = drbd_should_do_remote(device->state);
   1113	send_oos = drbd_should_send_out_of_sync(device->state);
   1114
   1115	/* Need to replicate writes.  Unless it is an empty flush,
   1116	 * which is better mapped to a DRBD P_BARRIER packet,
   1117	 * also for drbd wire protocol compatibility reasons.
   1118	 * If this was a flush, just start a new epoch.
   1119	 * Unless the current epoch was empty anyways, or we are not currently
   1120	 * replicating, in which case there is no point. */
   1121	if (unlikely(req->i.size == 0)) {
   1122		/* The only size==0 bios we expect are empty flushes. */
   1123		D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
   1124		if (remote)
   1125			_req_mod(req, QUEUE_AS_DRBD_BARRIER);
   1126		return remote;
   1127	}
   1128
   1129	if (!remote && !send_oos)
   1130		return 0;
   1131
   1132	D_ASSERT(device, !(remote && send_oos));
   1133
   1134	if (remote) {
   1135		_req_mod(req, TO_BE_SENT);
   1136		_req_mod(req, QUEUE_FOR_NET_WRITE);
   1137	} else if (drbd_set_out_of_sync(device, req->i.sector, req->i.size))
   1138		_req_mod(req, QUEUE_FOR_SEND_OOS);
   1139
   1140	return remote;
   1141}
   1142
   1143static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
   1144{
   1145	int err = drbd_issue_discard_or_zero_out(req->device,
   1146				req->i.sector, req->i.size >> 9, flags);
   1147	if (err)
   1148		req->private_bio->bi_status = BLK_STS_IOERR;
   1149	bio_endio(req->private_bio);
   1150}
   1151
   1152static void
   1153drbd_submit_req_private_bio(struct drbd_request *req)
   1154{
   1155	struct drbd_device *device = req->device;
   1156	struct bio *bio = req->private_bio;
   1157	unsigned int type;
   1158
   1159	if (bio_op(bio) != REQ_OP_READ)
   1160		type = DRBD_FAULT_DT_WR;
   1161	else if (bio->bi_opf & REQ_RAHEAD)
   1162		type = DRBD_FAULT_DT_RA;
   1163	else
   1164		type = DRBD_FAULT_DT_RD;
   1165
   1166	/* State may have changed since we grabbed our reference on the
   1167	 * ->ldev member. Double check, and short-circuit to endio.
   1168	 * In case the last activity log transaction failed to get on
   1169	 * stable storage, and this is a WRITE, we may not even submit
   1170	 * this bio. */
   1171	if (get_ldev(device)) {
   1172		if (drbd_insert_fault(device, type))
   1173			bio_io_error(bio);
   1174		else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
   1175			drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
   1176			    ((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
   1177		else if (bio_op(bio) == REQ_OP_DISCARD)
   1178			drbd_process_discard_or_zeroes_req(req, EE_TRIM);
   1179		else
   1180			submit_bio_noacct(bio);
   1181		put_ldev(device);
   1182	} else
   1183		bio_io_error(bio);
   1184}
   1185
   1186static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
   1187{
   1188	spin_lock_irq(&device->resource->req_lock);
   1189	list_add_tail(&req->tl_requests, &device->submit.writes);
   1190	list_add_tail(&req->req_pending_master_completion,
   1191			&device->pending_master_completion[1 /* WRITE */]);
   1192	spin_unlock_irq(&device->resource->req_lock);
   1193	queue_work(device->submit.wq, &device->submit.worker);
   1194	/* do_submit() may sleep internally on al_wait, too */
   1195	wake_up(&device->al_wait);
   1196}
   1197
   1198/* returns the new drbd_request pointer, if the caller is expected to
   1199 * drbd_send_and_submit() it (to save latency), or NULL if we queued the
   1200 * request on the submitter thread.
   1201 * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
   1202 */
   1203static struct drbd_request *
   1204drbd_request_prepare(struct drbd_device *device, struct bio *bio)
   1205{
   1206	const int rw = bio_data_dir(bio);
   1207	struct drbd_request *req;
   1208
   1209	/* allocate outside of all locks; */
   1210	req = drbd_req_new(device, bio);
   1211	if (!req) {
   1212		dec_ap_bio(device);
   1213		/* only pass the error to the upper layers.
   1214		 * if user cannot handle io errors, that's not our business. */
   1215		drbd_err(device, "could not kmalloc() req\n");
   1216		bio->bi_status = BLK_STS_RESOURCE;
   1217		bio_endio(bio);
   1218		return ERR_PTR(-ENOMEM);
   1219	}
   1220
   1221	/* Update disk stats */
   1222	req->start_jif = bio_start_io_acct(req->master_bio);
   1223
   1224	if (!get_ldev(device)) {
   1225		bio_put(req->private_bio);
   1226		req->private_bio = NULL;
   1227	}
   1228
   1229	/* process discards always from our submitter thread */
   1230	if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
   1231	    bio_op(bio) == REQ_OP_DISCARD)
   1232		goto queue_for_submitter_thread;
   1233
   1234	if (rw == WRITE && req->private_bio && req->i.size
   1235	&& !test_bit(AL_SUSPENDED, &device->flags)) {
   1236		if (!drbd_al_begin_io_fastpath(device, &req->i))
   1237			goto queue_for_submitter_thread;
   1238		req->rq_state |= RQ_IN_ACT_LOG;
   1239		req->in_actlog_jif = jiffies;
   1240	}
   1241	return req;
   1242
   1243 queue_for_submitter_thread:
   1244	atomic_inc(&device->ap_actlog_cnt);
   1245	drbd_queue_write(device, req);
   1246	return NULL;
   1247}
   1248
   1249/* Require at least one path to current data.
   1250 * We don't want to allow writes on C_STANDALONE D_INCONSISTENT:
   1251 * We would not allow to read what was written,
   1252 * we would not have bumped the data generation uuids,
   1253 * we would cause data divergence for all the wrong reasons.
   1254 *
   1255 * If we don't see at least one D_UP_TO_DATE, we will fail this request,
   1256 * which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO,
   1257 * and queues for retry later.
   1258 */
   1259static bool may_do_writes(struct drbd_device *device)
   1260{
   1261	const union drbd_dev_state s = device->state;
   1262	return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
   1263}
   1264
   1265struct drbd_plug_cb {
   1266	struct blk_plug_cb cb;
   1267	struct drbd_request *most_recent_req;
   1268	/* do we need more? */
   1269};
   1270
   1271static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
   1272{
   1273	struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
   1274	struct drbd_resource *resource = plug->cb.data;
   1275	struct drbd_request *req = plug->most_recent_req;
   1276
   1277	kfree(cb);
   1278	if (!req)
   1279		return;
   1280
   1281	spin_lock_irq(&resource->req_lock);
   1282	/* In case the sender did not process it yet, raise the flag to
   1283	 * have it followed with P_UNPLUG_REMOTE just after. */
   1284	req->rq_state |= RQ_UNPLUG;
   1285	/* but also queue a generic unplug */
   1286	drbd_queue_unplug(req->device);
   1287	kref_put(&req->kref, drbd_req_destroy);
   1288	spin_unlock_irq(&resource->req_lock);
   1289}
   1290
   1291static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
   1292{
   1293	/* A lot of text to say
   1294	 * return (struct drbd_plug_cb*)blk_check_plugged(); */
   1295	struct drbd_plug_cb *plug;
   1296	struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
   1297
   1298	if (cb)
   1299		plug = container_of(cb, struct drbd_plug_cb, cb);
   1300	else
   1301		plug = NULL;
   1302	return plug;
   1303}
   1304
   1305static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
   1306{
   1307	struct drbd_request *tmp = plug->most_recent_req;
   1308	/* Will be sent to some peer.
   1309	 * Remember to tag it with UNPLUG_REMOTE on unplug */
   1310	kref_get(&req->kref);
   1311	plug->most_recent_req = req;
   1312	if (tmp)
   1313		kref_put(&tmp->kref, drbd_req_destroy);
   1314}
   1315
   1316static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
   1317{
   1318	struct drbd_resource *resource = device->resource;
   1319	const int rw = bio_data_dir(req->master_bio);
   1320	struct bio_and_error m = { NULL, };
   1321	bool no_remote = false;
   1322	bool submit_private_bio = false;
   1323
   1324	spin_lock_irq(&resource->req_lock);
   1325	if (rw == WRITE) {
   1326		/* This may temporarily give up the req_lock,
   1327		 * but will re-aquire it before it returns here.
   1328		 * Needs to be before the check on drbd_suspended() */
   1329		complete_conflicting_writes(req);
   1330		/* no more giving up req_lock from now on! */
   1331
   1332		/* check for congestion, and potentially stop sending
   1333		 * full data updates, but start sending "dirty bits" only. */
   1334		maybe_pull_ahead(device);
   1335	}
   1336
   1337
   1338	if (drbd_suspended(device)) {
   1339		/* push back and retry: */
   1340		req->rq_state |= RQ_POSTPONED;
   1341		if (req->private_bio) {
   1342			bio_put(req->private_bio);
   1343			req->private_bio = NULL;
   1344			put_ldev(device);
   1345		}
   1346		goto out;
   1347	}
   1348
   1349	/* We fail READ early, if we can not serve it.
   1350	 * We must do this before req is registered on any lists.
   1351	 * Otherwise, drbd_req_complete() will queue failed READ for retry. */
   1352	if (rw != WRITE) {
   1353		if (!do_remote_read(req) && !req->private_bio)
   1354			goto nodata;
   1355	}
   1356
   1357	/* which transfer log epoch does this belong to? */
   1358	req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
   1359
   1360	/* no point in adding empty flushes to the transfer log,
   1361	 * they are mapped to drbd barriers already. */
   1362	if (likely(req->i.size!=0)) {
   1363		if (rw == WRITE)
   1364			first_peer_device(device)->connection->current_tle_writes++;
   1365
   1366		list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
   1367	}
   1368
   1369	if (rw == WRITE) {
   1370		if (req->private_bio && !may_do_writes(device)) {
   1371			bio_put(req->private_bio);
   1372			req->private_bio = NULL;
   1373			put_ldev(device);
   1374			goto nodata;
   1375		}
   1376		if (!drbd_process_write_request(req))
   1377			no_remote = true;
   1378	} else {
   1379		/* We either have a private_bio, or we can read from remote.
   1380		 * Otherwise we had done the goto nodata above. */
   1381		if (req->private_bio == NULL) {
   1382			_req_mod(req, TO_BE_SENT);
   1383			_req_mod(req, QUEUE_FOR_NET_READ);
   1384		} else
   1385			no_remote = true;
   1386	}
   1387
   1388	if (no_remote == false) {
   1389		struct drbd_plug_cb *plug = drbd_check_plugged(resource);
   1390		if (plug)
   1391			drbd_update_plug(plug, req);
   1392	}
   1393
   1394	/* If it took the fast path in drbd_request_prepare, add it here.
   1395	 * The slow path has added it already. */
   1396	if (list_empty(&req->req_pending_master_completion))
   1397		list_add_tail(&req->req_pending_master_completion,
   1398			&device->pending_master_completion[rw == WRITE]);
   1399	if (req->private_bio) {
   1400		/* needs to be marked within the same spinlock */
   1401		req->pre_submit_jif = jiffies;
   1402		list_add_tail(&req->req_pending_local,
   1403			&device->pending_completion[rw == WRITE]);
   1404		_req_mod(req, TO_BE_SUBMITTED);
   1405		/* but we need to give up the spinlock to submit */
   1406		submit_private_bio = true;
   1407	} else if (no_remote) {
   1408nodata:
   1409		if (__ratelimit(&drbd_ratelimit_state))
   1410			drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
   1411					(unsigned long long)req->i.sector, req->i.size >> 9);
   1412		/* A write may have been queued for send_oos, however.
   1413		 * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
   1414	}
   1415
   1416out:
   1417	drbd_req_put_completion_ref(req, &m, 1);
   1418	spin_unlock_irq(&resource->req_lock);
   1419
   1420	/* Even though above is a kref_put(), this is safe.
   1421	 * As long as we still need to submit our private bio,
   1422	 * we hold a completion ref, and the request cannot disappear.
   1423	 * If however this request did not even have a private bio to submit
   1424	 * (e.g. remote read), req may already be invalid now.
   1425	 * That's why we cannot check on req->private_bio. */
   1426	if (submit_private_bio)
   1427		drbd_submit_req_private_bio(req);
   1428	if (m.bio)
   1429		complete_master_bio(device, &m);
   1430}
   1431
   1432void __drbd_make_request(struct drbd_device *device, struct bio *bio)
   1433{
   1434	struct drbd_request *req = drbd_request_prepare(device, bio);
   1435	if (IS_ERR_OR_NULL(req))
   1436		return;
   1437	drbd_send_and_submit(device, req);
   1438}
   1439
   1440static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
   1441{
   1442	struct blk_plug plug;
   1443	struct drbd_request *req, *tmp;
   1444
   1445	blk_start_plug(&plug);
   1446	list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
   1447		const int rw = bio_data_dir(req->master_bio);
   1448
   1449		if (rw == WRITE /* rw != WRITE should not even end up here! */
   1450		&& req->private_bio && req->i.size
   1451		&& !test_bit(AL_SUSPENDED, &device->flags)) {
   1452			if (!drbd_al_begin_io_fastpath(device, &req->i))
   1453				continue;
   1454
   1455			req->rq_state |= RQ_IN_ACT_LOG;
   1456			req->in_actlog_jif = jiffies;
   1457			atomic_dec(&device->ap_actlog_cnt);
   1458		}
   1459
   1460		list_del_init(&req->tl_requests);
   1461		drbd_send_and_submit(device, req);
   1462	}
   1463	blk_finish_plug(&plug);
   1464}
   1465
   1466static bool prepare_al_transaction_nonblock(struct drbd_device *device,
   1467					    struct list_head *incoming,
   1468					    struct list_head *pending,
   1469					    struct list_head *later)
   1470{
   1471	struct drbd_request *req;
   1472	int wake = 0;
   1473	int err;
   1474
   1475	spin_lock_irq(&device->al_lock);
   1476	while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
   1477		err = drbd_al_begin_io_nonblock(device, &req->i);
   1478		if (err == -ENOBUFS)
   1479			break;
   1480		if (err == -EBUSY)
   1481			wake = 1;
   1482		if (err)
   1483			list_move_tail(&req->tl_requests, later);
   1484		else
   1485			list_move_tail(&req->tl_requests, pending);
   1486	}
   1487	spin_unlock_irq(&device->al_lock);
   1488	if (wake)
   1489		wake_up(&device->al_wait);
   1490	return !list_empty(pending);
   1491}
   1492
   1493static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
   1494{
   1495	struct blk_plug plug;
   1496	struct drbd_request *req;
   1497
   1498	blk_start_plug(&plug);
   1499	while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
   1500		req->rq_state |= RQ_IN_ACT_LOG;
   1501		req->in_actlog_jif = jiffies;
   1502		atomic_dec(&device->ap_actlog_cnt);
   1503		list_del_init(&req->tl_requests);
   1504		drbd_send_and_submit(device, req);
   1505	}
   1506	blk_finish_plug(&plug);
   1507}
   1508
   1509void do_submit(struct work_struct *ws)
   1510{
   1511	struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
   1512	LIST_HEAD(incoming);	/* from drbd_make_request() */
   1513	LIST_HEAD(pending);	/* to be submitted after next AL-transaction commit */
   1514	LIST_HEAD(busy);	/* blocked by resync requests */
   1515
   1516	/* grab new incoming requests */
   1517	spin_lock_irq(&device->resource->req_lock);
   1518	list_splice_tail_init(&device->submit.writes, &incoming);
   1519	spin_unlock_irq(&device->resource->req_lock);
   1520
   1521	for (;;) {
   1522		DEFINE_WAIT(wait);
   1523
   1524		/* move used-to-be-busy back to front of incoming */
   1525		list_splice_init(&busy, &incoming);
   1526		submit_fast_path(device, &incoming);
   1527		if (list_empty(&incoming))
   1528			break;
   1529
   1530		for (;;) {
   1531			prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
   1532
   1533			list_splice_init(&busy, &incoming);
   1534			prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
   1535			if (!list_empty(&pending))
   1536				break;
   1537
   1538			schedule();
   1539
   1540			/* If all currently "hot" activity log extents are kept busy by
   1541			 * incoming requests, we still must not totally starve new
   1542			 * requests to "cold" extents.
   1543			 * Something left on &incoming means there had not been
   1544			 * enough update slots available, and the activity log
   1545			 * has been marked as "starving".
   1546			 *
   1547			 * Try again now, without looking for new requests,
   1548			 * effectively blocking all new requests until we made
   1549			 * at least _some_ progress with what we currently have.
   1550			 */
   1551			if (!list_empty(&incoming))
   1552				continue;
   1553
   1554			/* Nothing moved to pending, but nothing left
   1555			 * on incoming: all moved to busy!
   1556			 * Grab new and iterate. */
   1557			spin_lock_irq(&device->resource->req_lock);
   1558			list_splice_tail_init(&device->submit.writes, &incoming);
   1559			spin_unlock_irq(&device->resource->req_lock);
   1560		}
   1561		finish_wait(&device->al_wait, &wait);
   1562
   1563		/* If the transaction was full, before all incoming requests
   1564		 * had been processed, skip ahead to commit, and iterate
   1565		 * without splicing in more incoming requests from upper layers.
   1566		 *
   1567		 * Else, if all incoming have been processed,
   1568		 * they have become either "pending" (to be submitted after
   1569		 * next transaction commit) or "busy" (blocked by resync).
   1570		 *
   1571		 * Maybe more was queued, while we prepared the transaction?
   1572		 * Try to stuff those into this transaction as well.
   1573		 * Be strictly non-blocking here,
   1574		 * we already have something to commit.
   1575		 *
   1576		 * Commit if we don't make any more progres.
   1577		 */
   1578
   1579		while (list_empty(&incoming)) {
   1580			LIST_HEAD(more_pending);
   1581			LIST_HEAD(more_incoming);
   1582			bool made_progress;
   1583
   1584			/* It is ok to look outside the lock,
   1585			 * it's only an optimization anyways */
   1586			if (list_empty(&device->submit.writes))
   1587				break;
   1588
   1589			spin_lock_irq(&device->resource->req_lock);
   1590			list_splice_tail_init(&device->submit.writes, &more_incoming);
   1591			spin_unlock_irq(&device->resource->req_lock);
   1592
   1593			if (list_empty(&more_incoming))
   1594				break;
   1595
   1596			made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
   1597
   1598			list_splice_tail_init(&more_pending, &pending);
   1599			list_splice_tail_init(&more_incoming, &incoming);
   1600			if (!made_progress)
   1601				break;
   1602		}
   1603
   1604		drbd_al_begin_io_commit(device);
   1605		send_and_submit_pending(device, &pending);
   1606	}
   1607}
   1608
   1609void drbd_submit_bio(struct bio *bio)
   1610{
   1611	struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
   1612
   1613	blk_queue_split(&bio);
   1614
   1615	/*
   1616	 * what we "blindly" assume:
   1617	 */
   1618	D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
   1619
   1620	inc_ap_bio(device);
   1621	__drbd_make_request(device, bio);
   1622}
   1623
   1624static bool net_timeout_reached(struct drbd_request *net_req,
   1625		struct drbd_connection *connection,
   1626		unsigned long now, unsigned long ent,
   1627		unsigned int ko_count, unsigned int timeout)
   1628{
   1629	struct drbd_device *device = net_req->device;
   1630
   1631	if (!time_after(now, net_req->pre_send_jif + ent))
   1632		return false;
   1633
   1634	if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
   1635		return false;
   1636
   1637	if (net_req->rq_state & RQ_NET_PENDING) {
   1638		drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
   1639			jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
   1640		return true;
   1641	}
   1642
   1643	/* We received an ACK already (or are using protocol A),
   1644	 * but are waiting for the epoch closing barrier ack.
   1645	 * Check if we sent the barrier already.  We should not blame the peer
   1646	 * for being unresponsive, if we did not even ask it yet. */
   1647	if (net_req->epoch == connection->send.current_epoch_nr) {
   1648		drbd_warn(device,
   1649			"We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
   1650			jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
   1651		return false;
   1652	}
   1653
   1654	/* Worst case: we may have been blocked for whatever reason, then
   1655	 * suddenly are able to send a lot of requests (and epoch separating
   1656	 * barriers) in quick succession.
   1657	 * The timestamp of the net_req may be much too old and not correspond
   1658	 * to the sending time of the relevant unack'ed barrier packet, so
   1659	 * would trigger a spurious timeout.  The latest barrier packet may
   1660	 * have a too recent timestamp to trigger the timeout, potentially miss
   1661	 * a timeout.  Right now we don't have a place to conveniently store
   1662	 * these timestamps.
   1663	 * But in this particular situation, the application requests are still
   1664	 * completed to upper layers, DRBD should still "feel" responsive.
   1665	 * No need yet to kill this connection, it may still recover.
   1666	 * If not, eventually we will have queued enough into the network for
   1667	 * us to block. From that point of view, the timestamp of the last sent
   1668	 * barrier packet is relevant enough.
   1669	 */
   1670	if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
   1671		drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
   1672			connection->send.last_sent_barrier_jif, now,
   1673			jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
   1674		return true;
   1675	}
   1676	return false;
   1677}
   1678
   1679/* A request is considered timed out, if
   1680 * - we have some effective timeout from the configuration,
   1681 *   with some state restrictions applied,
   1682 * - the oldest request is waiting for a response from the network
   1683 *   resp. the local disk,
   1684 * - the oldest request is in fact older than the effective timeout,
   1685 * - the connection was established (resp. disk was attached)
   1686 *   for longer than the timeout already.
   1687 * Note that for 32bit jiffies and very stable connections/disks,
   1688 * we may have a wrap around, which is catched by
   1689 *   !time_in_range(now, last_..._jif, last_..._jif + timeout).
   1690 *
   1691 * Side effect: once per 32bit wrap-around interval, which means every
   1692 * ~198 days with 250 HZ, we have a window where the timeout would need
   1693 * to expire twice (worst case) to become effective. Good enough.
   1694 */
   1695
   1696void request_timer_fn(struct timer_list *t)
   1697{
   1698	struct drbd_device *device = from_timer(device, t, request_timer);
   1699	struct drbd_connection *connection = first_peer_device(device)->connection;
   1700	struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
   1701	struct net_conf *nc;
   1702	unsigned long oldest_submit_jif;
   1703	unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
   1704	unsigned long now;
   1705	unsigned int ko_count = 0, timeout = 0;
   1706
   1707	rcu_read_lock();
   1708	nc = rcu_dereference(connection->net_conf);
   1709	if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
   1710		ko_count = nc->ko_count;
   1711		timeout = nc->timeout;
   1712	}
   1713
   1714	if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
   1715		dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
   1716		put_ldev(device);
   1717	}
   1718	rcu_read_unlock();
   1719
   1720
   1721	ent = timeout * HZ/10 * ko_count;
   1722	et = min_not_zero(dt, ent);
   1723
   1724	if (!et)
   1725		return; /* Recurring timer stopped */
   1726
   1727	now = jiffies;
   1728	nt = now + et;
   1729
   1730	spin_lock_irq(&device->resource->req_lock);
   1731	req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
   1732	req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
   1733
   1734	/* maybe the oldest request waiting for the peer is in fact still
   1735	 * blocking in tcp sendmsg.  That's ok, though, that's handled via the
   1736	 * socket send timeout, requesting a ping, and bumping ko-count in
   1737	 * we_should_drop_the_connection().
   1738	 */
   1739
   1740	/* check the oldest request we did successfully sent,
   1741	 * but which is still waiting for an ACK. */
   1742	req_peer = connection->req_ack_pending;
   1743
   1744	/* if we don't have such request (e.g. protocoll A)
   1745	 * check the oldest requests which is still waiting on its epoch
   1746	 * closing barrier ack. */
   1747	if (!req_peer)
   1748		req_peer = connection->req_not_net_done;
   1749
   1750	/* evaluate the oldest peer request only in one timer! */
   1751	if (req_peer && req_peer->device != device)
   1752		req_peer = NULL;
   1753
   1754	/* do we have something to evaluate? */
   1755	if (req_peer == NULL && req_write == NULL && req_read == NULL)
   1756		goto out;
   1757
   1758	oldest_submit_jif =
   1759		(req_write && req_read)
   1760		? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
   1761		  ? req_write->pre_submit_jif : req_read->pre_submit_jif )
   1762		: req_write ? req_write->pre_submit_jif
   1763		: req_read ? req_read->pre_submit_jif : now;
   1764
   1765	if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
   1766		_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
   1767
   1768	if (dt && oldest_submit_jif != now &&
   1769		 time_after(now, oldest_submit_jif + dt) &&
   1770		!time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
   1771		drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
   1772		__drbd_chk_io_error(device, DRBD_FORCE_DETACH);
   1773	}
   1774
   1775	/* Reschedule timer for the nearest not already expired timeout.
   1776	 * Fallback to now + min(effective network timeout, disk timeout). */
   1777	ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
   1778		? req_peer->pre_send_jif + ent : now + et;
   1779	dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
   1780		? oldest_submit_jif + dt : now + et;
   1781	nt = time_before(ent, dt) ? ent : dt;
   1782out:
   1783	spin_unlock_irq(&device->resource->req_lock);
   1784	mod_timer(&device->request_timer, nt);
   1785}