cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

drbd_worker.c (65439B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3   drbd_worker.c
      4
      5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
      6
      7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
      8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
      9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
     10
     11
     12*/
     13
     14#include <linux/module.h>
     15#include <linux/drbd.h>
     16#include <linux/sched/signal.h>
     17#include <linux/wait.h>
     18#include <linux/mm.h>
     19#include <linux/memcontrol.h>
     20#include <linux/mm_inline.h>
     21#include <linux/slab.h>
     22#include <linux/random.h>
     23#include <linux/string.h>
     24#include <linux/scatterlist.h>
     25#include <linux/part_stat.h>
     26
     27#include "drbd_int.h"
     28#include "drbd_protocol.h"
     29#include "drbd_req.h"
     30
     31static int make_ov_request(struct drbd_device *, int);
     32static int make_resync_request(struct drbd_device *, int);
     33
     34/* endio handlers:
     35 *   drbd_md_endio (defined here)
     36 *   drbd_request_endio (defined here)
     37 *   drbd_peer_request_endio (defined here)
     38 *   drbd_bm_endio (defined in drbd_bitmap.c)
     39 *
     40 * For all these callbacks, note the following:
     41 * The callbacks will be called in irq context by the IDE drivers,
     42 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
     43 * Try to get the locking right :)
     44 *
     45 */
     46
     47/* used for synchronous meta data and bitmap IO
     48 * submitted by drbd_md_sync_page_io()
     49 */
     50void drbd_md_endio(struct bio *bio)
     51{
     52	struct drbd_device *device;
     53
     54	device = bio->bi_private;
     55	device->md_io.error = blk_status_to_errno(bio->bi_status);
     56
     57	/* special case: drbd_md_read() during drbd_adm_attach() */
     58	if (device->ldev)
     59		put_ldev(device);
     60	bio_put(bio);
     61
     62	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
     63	 * to timeout on the lower level device, and eventually detach from it.
     64	 * If this io completion runs after that timeout expired, this
     65	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
     66	 * During normal operation, this only puts that extra reference
     67	 * down to 1 again.
     68	 * Make sure we first drop the reference, and only then signal
     69	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
     70	 * next drbd_md_sync_page_io(), that we trigger the
     71	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
     72	 */
     73	drbd_md_put_buffer(device);
     74	device->md_io.done = 1;
     75	wake_up(&device->misc_wait);
     76}
     77
     78/* reads on behalf of the partner,
     79 * "submitted" by the receiver
     80 */
     81static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
     82{
     83	unsigned long flags = 0;
     84	struct drbd_peer_device *peer_device = peer_req->peer_device;
     85	struct drbd_device *device = peer_device->device;
     86
     87	spin_lock_irqsave(&device->resource->req_lock, flags);
     88	device->read_cnt += peer_req->i.size >> 9;
     89	list_del(&peer_req->w.list);
     90	if (list_empty(&device->read_ee))
     91		wake_up(&device->ee_wait);
     92	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
     93		__drbd_chk_io_error(device, DRBD_READ_ERROR);
     94	spin_unlock_irqrestore(&device->resource->req_lock, flags);
     95
     96	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
     97	put_ldev(device);
     98}
     99
    100/* writes on behalf of the partner, or resync writes,
    101 * "submitted" by the receiver, final stage.  */
    102void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
    103{
    104	unsigned long flags = 0;
    105	struct drbd_peer_device *peer_device = peer_req->peer_device;
    106	struct drbd_device *device = peer_device->device;
    107	struct drbd_connection *connection = peer_device->connection;
    108	struct drbd_interval i;
    109	int do_wake;
    110	u64 block_id;
    111	int do_al_complete_io;
    112
    113	/* after we moved peer_req to done_ee,
    114	 * we may no longer access it,
    115	 * it may be freed/reused already!
    116	 * (as soon as we release the req_lock) */
    117	i = peer_req->i;
    118	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
    119	block_id = peer_req->block_id;
    120	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
    121
    122	if (peer_req->flags & EE_WAS_ERROR) {
    123		/* In protocol != C, we usually do not send write acks.
    124		 * In case of a write error, send the neg ack anyways. */
    125		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
    126			inc_unacked(device);
    127		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
    128	}
    129
    130	spin_lock_irqsave(&device->resource->req_lock, flags);
    131	device->writ_cnt += peer_req->i.size >> 9;
    132	list_move_tail(&peer_req->w.list, &device->done_ee);
    133
    134	/*
    135	 * Do not remove from the write_requests tree here: we did not send the
    136	 * Ack yet and did not wake possibly waiting conflicting requests.
    137	 * Removed from the tree from "drbd_process_done_ee" within the
    138	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
    139	 * _drbd_clear_done_ee.
    140	 */
    141
    142	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
    143
    144	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
    145	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
    146	if (peer_req->flags & EE_WAS_ERROR)
    147		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
    148
    149	if (connection->cstate >= C_WF_REPORT_PARAMS) {
    150		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
    151		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
    152			kref_put(&device->kref, drbd_destroy_device);
    153	}
    154	spin_unlock_irqrestore(&device->resource->req_lock, flags);
    155
    156	if (block_id == ID_SYNCER)
    157		drbd_rs_complete_io(device, i.sector);
    158
    159	if (do_wake)
    160		wake_up(&device->ee_wait);
    161
    162	if (do_al_complete_io)
    163		drbd_al_complete_io(device, &i);
    164
    165	put_ldev(device);
    166}
    167
    168/* writes on behalf of the partner, or resync writes,
    169 * "submitted" by the receiver.
    170 */
    171void drbd_peer_request_endio(struct bio *bio)
    172{
    173	struct drbd_peer_request *peer_req = bio->bi_private;
    174	struct drbd_device *device = peer_req->peer_device->device;
    175	bool is_write = bio_data_dir(bio) == WRITE;
    176	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
    177			  bio_op(bio) == REQ_OP_DISCARD;
    178
    179	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
    180		drbd_warn(device, "%s: error=%d s=%llus\n",
    181				is_write ? (is_discard ? "discard" : "write")
    182					: "read", bio->bi_status,
    183				(unsigned long long)peer_req->i.sector);
    184
    185	if (bio->bi_status)
    186		set_bit(__EE_WAS_ERROR, &peer_req->flags);
    187
    188	bio_put(bio); /* no need for the bio anymore */
    189	if (atomic_dec_and_test(&peer_req->pending_bios)) {
    190		if (is_write)
    191			drbd_endio_write_sec_final(peer_req);
    192		else
    193			drbd_endio_read_sec_final(peer_req);
    194	}
    195}
    196
    197static void
    198drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
    199{
    200	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
    201		device->minor, device->resource->name, device->vnr);
    202}
    203
    204/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
    205 */
    206void drbd_request_endio(struct bio *bio)
    207{
    208	unsigned long flags;
    209	struct drbd_request *req = bio->bi_private;
    210	struct drbd_device *device = req->device;
    211	struct bio_and_error m;
    212	enum drbd_req_event what;
    213
    214	/* If this request was aborted locally before,
    215	 * but now was completed "successfully",
    216	 * chances are that this caused arbitrary data corruption.
    217	 *
    218	 * "aborting" requests, or force-detaching the disk, is intended for
    219	 * completely blocked/hung local backing devices which do no longer
    220	 * complete requests at all, not even do error completions.  In this
    221	 * situation, usually a hard-reset and failover is the only way out.
    222	 *
    223	 * By "aborting", basically faking a local error-completion,
    224	 * we allow for a more graceful swichover by cleanly migrating services.
    225	 * Still the affected node has to be rebooted "soon".
    226	 *
    227	 * By completing these requests, we allow the upper layers to re-use
    228	 * the associated data pages.
    229	 *
    230	 * If later the local backing device "recovers", and now DMAs some data
    231	 * from disk into the original request pages, in the best case it will
    232	 * just put random data into unused pages; but typically it will corrupt
    233	 * meanwhile completely unrelated data, causing all sorts of damage.
    234	 *
    235	 * Which means delayed successful completion,
    236	 * especially for READ requests,
    237	 * is a reason to panic().
    238	 *
    239	 * We assume that a delayed *error* completion is OK,
    240	 * though we still will complain noisily about it.
    241	 */
    242	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
    243		if (__ratelimit(&drbd_ratelimit_state))
    244			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
    245
    246		if (!bio->bi_status)
    247			drbd_panic_after_delayed_completion_of_aborted_request(device);
    248	}
    249
    250	/* to avoid recursion in __req_mod */
    251	if (unlikely(bio->bi_status)) {
    252		switch (bio_op(bio)) {
    253		case REQ_OP_WRITE_ZEROES:
    254		case REQ_OP_DISCARD:
    255			if (bio->bi_status == BLK_STS_NOTSUPP)
    256				what = DISCARD_COMPLETED_NOTSUPP;
    257			else
    258				what = DISCARD_COMPLETED_WITH_ERROR;
    259			break;
    260		case REQ_OP_READ:
    261			if (bio->bi_opf & REQ_RAHEAD)
    262				what = READ_AHEAD_COMPLETED_WITH_ERROR;
    263			else
    264				what = READ_COMPLETED_WITH_ERROR;
    265			break;
    266		default:
    267			what = WRITE_COMPLETED_WITH_ERROR;
    268			break;
    269		}
    270	} else {
    271		what = COMPLETED_OK;
    272	}
    273
    274	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
    275	bio_put(bio);
    276
    277	/* not req_mod(), we need irqsave here! */
    278	spin_lock_irqsave(&device->resource->req_lock, flags);
    279	__req_mod(req, what, &m);
    280	spin_unlock_irqrestore(&device->resource->req_lock, flags);
    281	put_ldev(device);
    282
    283	if (m.bio)
    284		complete_master_bio(device, &m);
    285}
    286
    287void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
    288{
    289	SHASH_DESC_ON_STACK(desc, tfm);
    290	struct page *page = peer_req->pages;
    291	struct page *tmp;
    292	unsigned len;
    293	void *src;
    294
    295	desc->tfm = tfm;
    296
    297	crypto_shash_init(desc);
    298
    299	src = kmap_atomic(page);
    300	while ((tmp = page_chain_next(page))) {
    301		/* all but the last page will be fully used */
    302		crypto_shash_update(desc, src, PAGE_SIZE);
    303		kunmap_atomic(src);
    304		page = tmp;
    305		src = kmap_atomic(page);
    306	}
    307	/* and now the last, possibly only partially used page */
    308	len = peer_req->i.size & (PAGE_SIZE - 1);
    309	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
    310	kunmap_atomic(src);
    311
    312	crypto_shash_final(desc, digest);
    313	shash_desc_zero(desc);
    314}
    315
    316void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
    317{
    318	SHASH_DESC_ON_STACK(desc, tfm);
    319	struct bio_vec bvec;
    320	struct bvec_iter iter;
    321
    322	desc->tfm = tfm;
    323
    324	crypto_shash_init(desc);
    325
    326	bio_for_each_segment(bvec, bio, iter) {
    327		u8 *src;
    328
    329		src = bvec_kmap_local(&bvec);
    330		crypto_shash_update(desc, src, bvec.bv_len);
    331		kunmap_local(src);
    332	}
    333	crypto_shash_final(desc, digest);
    334	shash_desc_zero(desc);
    335}
    336
    337/* MAYBE merge common code with w_e_end_ov_req */
    338static int w_e_send_csum(struct drbd_work *w, int cancel)
    339{
    340	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
    341	struct drbd_peer_device *peer_device = peer_req->peer_device;
    342	struct drbd_device *device = peer_device->device;
    343	int digest_size;
    344	void *digest;
    345	int err = 0;
    346
    347	if (unlikely(cancel))
    348		goto out;
    349
    350	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
    351		goto out;
    352
    353	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
    354	digest = kmalloc(digest_size, GFP_NOIO);
    355	if (digest) {
    356		sector_t sector = peer_req->i.sector;
    357		unsigned int size = peer_req->i.size;
    358		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
    359		/* Free peer_req and pages before send.
    360		 * In case we block on congestion, we could otherwise run into
    361		 * some distributed deadlock, if the other side blocks on
    362		 * congestion as well, because our receiver blocks in
    363		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
    364		drbd_free_peer_req(device, peer_req);
    365		peer_req = NULL;
    366		inc_rs_pending(device);
    367		err = drbd_send_drequest_csum(peer_device, sector, size,
    368					      digest, digest_size,
    369					      P_CSUM_RS_REQUEST);
    370		kfree(digest);
    371	} else {
    372		drbd_err(device, "kmalloc() of digest failed.\n");
    373		err = -ENOMEM;
    374	}
    375
    376out:
    377	if (peer_req)
    378		drbd_free_peer_req(device, peer_req);
    379
    380	if (unlikely(err))
    381		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
    382	return err;
    383}
    384
    385#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
    386
    387static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
    388{
    389	struct drbd_device *device = peer_device->device;
    390	struct drbd_peer_request *peer_req;
    391
    392	if (!get_ldev(device))
    393		return -EIO;
    394
    395	/* GFP_TRY, because if there is no memory available right now, this may
    396	 * be rescheduled for later. It is "only" background resync, after all. */
    397	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
    398				       size, size, GFP_TRY);
    399	if (!peer_req)
    400		goto defer;
    401
    402	peer_req->w.cb = w_e_send_csum;
    403	spin_lock_irq(&device->resource->req_lock);
    404	list_add_tail(&peer_req->w.list, &device->read_ee);
    405	spin_unlock_irq(&device->resource->req_lock);
    406
    407	atomic_add(size >> 9, &device->rs_sect_ev);
    408	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
    409				     DRBD_FAULT_RS_RD) == 0)
    410		return 0;
    411
    412	/* If it failed because of ENOMEM, retry should help.  If it failed
    413	 * because bio_add_page failed (probably broken lower level driver),
    414	 * retry may or may not help.
    415	 * If it does not, you may need to force disconnect. */
    416	spin_lock_irq(&device->resource->req_lock);
    417	list_del(&peer_req->w.list);
    418	spin_unlock_irq(&device->resource->req_lock);
    419
    420	drbd_free_peer_req(device, peer_req);
    421defer:
    422	put_ldev(device);
    423	return -EAGAIN;
    424}
    425
    426int w_resync_timer(struct drbd_work *w, int cancel)
    427{
    428	struct drbd_device *device =
    429		container_of(w, struct drbd_device, resync_work);
    430
    431	switch (device->state.conn) {
    432	case C_VERIFY_S:
    433		make_ov_request(device, cancel);
    434		break;
    435	case C_SYNC_TARGET:
    436		make_resync_request(device, cancel);
    437		break;
    438	}
    439
    440	return 0;
    441}
    442
    443void resync_timer_fn(struct timer_list *t)
    444{
    445	struct drbd_device *device = from_timer(device, t, resync_timer);
    446
    447	drbd_queue_work_if_unqueued(
    448		&first_peer_device(device)->connection->sender_work,
    449		&device->resync_work);
    450}
    451
    452static void fifo_set(struct fifo_buffer *fb, int value)
    453{
    454	int i;
    455
    456	for (i = 0; i < fb->size; i++)
    457		fb->values[i] = value;
    458}
    459
    460static int fifo_push(struct fifo_buffer *fb, int value)
    461{
    462	int ov;
    463
    464	ov = fb->values[fb->head_index];
    465	fb->values[fb->head_index++] = value;
    466
    467	if (fb->head_index >= fb->size)
    468		fb->head_index = 0;
    469
    470	return ov;
    471}
    472
    473static void fifo_add_val(struct fifo_buffer *fb, int value)
    474{
    475	int i;
    476
    477	for (i = 0; i < fb->size; i++)
    478		fb->values[i] += value;
    479}
    480
    481struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
    482{
    483	struct fifo_buffer *fb;
    484
    485	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
    486	if (!fb)
    487		return NULL;
    488
    489	fb->head_index = 0;
    490	fb->size = fifo_size;
    491	fb->total = 0;
    492
    493	return fb;
    494}
    495
    496static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
    497{
    498	struct disk_conf *dc;
    499	unsigned int want;     /* The number of sectors we want in-flight */
    500	int req_sect; /* Number of sectors to request in this turn */
    501	int correction; /* Number of sectors more we need in-flight */
    502	int cps; /* correction per invocation of drbd_rs_controller() */
    503	int steps; /* Number of time steps to plan ahead */
    504	int curr_corr;
    505	int max_sect;
    506	struct fifo_buffer *plan;
    507
    508	dc = rcu_dereference(device->ldev->disk_conf);
    509	plan = rcu_dereference(device->rs_plan_s);
    510
    511	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
    512
    513	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
    514		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
    515	} else { /* normal path */
    516		want = dc->c_fill_target ? dc->c_fill_target :
    517			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
    518	}
    519
    520	correction = want - device->rs_in_flight - plan->total;
    521
    522	/* Plan ahead */
    523	cps = correction / steps;
    524	fifo_add_val(plan, cps);
    525	plan->total += cps * steps;
    526
    527	/* What we do in this step */
    528	curr_corr = fifo_push(plan, 0);
    529	plan->total -= curr_corr;
    530
    531	req_sect = sect_in + curr_corr;
    532	if (req_sect < 0)
    533		req_sect = 0;
    534
    535	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
    536	if (req_sect > max_sect)
    537		req_sect = max_sect;
    538
    539	/*
    540	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
    541		 sect_in, device->rs_in_flight, want, correction,
    542		 steps, cps, device->rs_planed, curr_corr, req_sect);
    543	*/
    544
    545	return req_sect;
    546}
    547
    548static int drbd_rs_number_requests(struct drbd_device *device)
    549{
    550	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
    551	int number, mxb;
    552
    553	sect_in = atomic_xchg(&device->rs_sect_in, 0);
    554	device->rs_in_flight -= sect_in;
    555
    556	rcu_read_lock();
    557	mxb = drbd_get_max_buffers(device) / 2;
    558	if (rcu_dereference(device->rs_plan_s)->size) {
    559		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
    560		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
    561	} else {
    562		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
    563		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
    564	}
    565	rcu_read_unlock();
    566
    567	/* Don't have more than "max-buffers"/2 in-flight.
    568	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
    569	 * potentially causing a distributed deadlock on congestion during
    570	 * online-verify or (checksum-based) resync, if max-buffers,
    571	 * socket buffer sizes and resync rate settings are mis-configured. */
    572
    573	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
    574	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
    575	 * "number of pages" (typically also 4k),
    576	 * but "rs_in_flight" is in "sectors" (512 Byte). */
    577	if (mxb - device->rs_in_flight/8 < number)
    578		number = mxb - device->rs_in_flight/8;
    579
    580	return number;
    581}
    582
    583static int make_resync_request(struct drbd_device *const device, int cancel)
    584{
    585	struct drbd_peer_device *const peer_device = first_peer_device(device);
    586	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
    587	unsigned long bit;
    588	sector_t sector;
    589	const sector_t capacity = get_capacity(device->vdisk);
    590	int max_bio_size;
    591	int number, rollback_i, size;
    592	int align, requeue = 0;
    593	int i = 0;
    594	int discard_granularity = 0;
    595
    596	if (unlikely(cancel))
    597		return 0;
    598
    599	if (device->rs_total == 0) {
    600		/* empty resync? */
    601		drbd_resync_finished(device);
    602		return 0;
    603	}
    604
    605	if (!get_ldev(device)) {
    606		/* Since we only need to access device->rsync a
    607		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
    608		   to continue resync with a broken disk makes no sense at
    609		   all */
    610		drbd_err(device, "Disk broke down during resync!\n");
    611		return 0;
    612	}
    613
    614	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
    615		rcu_read_lock();
    616		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
    617		rcu_read_unlock();
    618	}
    619
    620	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
    621	number = drbd_rs_number_requests(device);
    622	if (number <= 0)
    623		goto requeue;
    624
    625	for (i = 0; i < number; i++) {
    626		/* Stop generating RS requests when half of the send buffer is filled,
    627		 * but notify TCP that we'd like to have more space. */
    628		mutex_lock(&connection->data.mutex);
    629		if (connection->data.socket) {
    630			struct sock *sk = connection->data.socket->sk;
    631			int queued = sk->sk_wmem_queued;
    632			int sndbuf = sk->sk_sndbuf;
    633			if (queued > sndbuf / 2) {
    634				requeue = 1;
    635				if (sk->sk_socket)
    636					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
    637			}
    638		} else
    639			requeue = 1;
    640		mutex_unlock(&connection->data.mutex);
    641		if (requeue)
    642			goto requeue;
    643
    644next_sector:
    645		size = BM_BLOCK_SIZE;
    646		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
    647
    648		if (bit == DRBD_END_OF_BITMAP) {
    649			device->bm_resync_fo = drbd_bm_bits(device);
    650			put_ldev(device);
    651			return 0;
    652		}
    653
    654		sector = BM_BIT_TO_SECT(bit);
    655
    656		if (drbd_try_rs_begin_io(device, sector)) {
    657			device->bm_resync_fo = bit;
    658			goto requeue;
    659		}
    660		device->bm_resync_fo = bit + 1;
    661
    662		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
    663			drbd_rs_complete_io(device, sector);
    664			goto next_sector;
    665		}
    666
    667#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
    668		/* try to find some adjacent bits.
    669		 * we stop if we have already the maximum req size.
    670		 *
    671		 * Additionally always align bigger requests, in order to
    672		 * be prepared for all stripe sizes of software RAIDs.
    673		 */
    674		align = 1;
    675		rollback_i = i;
    676		while (i < number) {
    677			if (size + BM_BLOCK_SIZE > max_bio_size)
    678				break;
    679
    680			/* Be always aligned */
    681			if (sector & ((1<<(align+3))-1))
    682				break;
    683
    684			if (discard_granularity && size == discard_granularity)
    685				break;
    686
    687			/* do not cross extent boundaries */
    688			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
    689				break;
    690			/* now, is it actually dirty, after all?
    691			 * caution, drbd_bm_test_bit is tri-state for some
    692			 * obscure reason; ( b == 0 ) would get the out-of-band
    693			 * only accidentally right because of the "oddly sized"
    694			 * adjustment below */
    695			if (drbd_bm_test_bit(device, bit+1) != 1)
    696				break;
    697			bit++;
    698			size += BM_BLOCK_SIZE;
    699			if ((BM_BLOCK_SIZE << align) <= size)
    700				align++;
    701			i++;
    702		}
    703		/* if we merged some,
    704		 * reset the offset to start the next drbd_bm_find_next from */
    705		if (size > BM_BLOCK_SIZE)
    706			device->bm_resync_fo = bit + 1;
    707#endif
    708
    709		/* adjust very last sectors, in case we are oddly sized */
    710		if (sector + (size>>9) > capacity)
    711			size = (capacity-sector)<<9;
    712
    713		if (device->use_csums) {
    714			switch (read_for_csum(peer_device, sector, size)) {
    715			case -EIO: /* Disk failure */
    716				put_ldev(device);
    717				return -EIO;
    718			case -EAGAIN: /* allocation failed, or ldev busy */
    719				drbd_rs_complete_io(device, sector);
    720				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
    721				i = rollback_i;
    722				goto requeue;
    723			case 0:
    724				/* everything ok */
    725				break;
    726			default:
    727				BUG();
    728			}
    729		} else {
    730			int err;
    731
    732			inc_rs_pending(device);
    733			err = drbd_send_drequest(peer_device,
    734						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
    735						 sector, size, ID_SYNCER);
    736			if (err) {
    737				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
    738				dec_rs_pending(device);
    739				put_ldev(device);
    740				return err;
    741			}
    742		}
    743	}
    744
    745	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
    746		/* last syncer _request_ was sent,
    747		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
    748		 * next sync group will resume), as soon as we receive the last
    749		 * resync data block, and the last bit is cleared.
    750		 * until then resync "work" is "inactive" ...
    751		 */
    752		put_ldev(device);
    753		return 0;
    754	}
    755
    756 requeue:
    757	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
    758	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
    759	put_ldev(device);
    760	return 0;
    761}
    762
    763static int make_ov_request(struct drbd_device *device, int cancel)
    764{
    765	int number, i, size;
    766	sector_t sector;
    767	const sector_t capacity = get_capacity(device->vdisk);
    768	bool stop_sector_reached = false;
    769
    770	if (unlikely(cancel))
    771		return 1;
    772
    773	number = drbd_rs_number_requests(device);
    774
    775	sector = device->ov_position;
    776	for (i = 0; i < number; i++) {
    777		if (sector >= capacity)
    778			return 1;
    779
    780		/* We check for "finished" only in the reply path:
    781		 * w_e_end_ov_reply().
    782		 * We need to send at least one request out. */
    783		stop_sector_reached = i > 0
    784			&& verify_can_do_stop_sector(device)
    785			&& sector >= device->ov_stop_sector;
    786		if (stop_sector_reached)
    787			break;
    788
    789		size = BM_BLOCK_SIZE;
    790
    791		if (drbd_try_rs_begin_io(device, sector)) {
    792			device->ov_position = sector;
    793			goto requeue;
    794		}
    795
    796		if (sector + (size>>9) > capacity)
    797			size = (capacity-sector)<<9;
    798
    799		inc_rs_pending(device);
    800		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
    801			dec_rs_pending(device);
    802			return 0;
    803		}
    804		sector += BM_SECT_PER_BIT;
    805	}
    806	device->ov_position = sector;
    807
    808 requeue:
    809	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
    810	if (i == 0 || !stop_sector_reached)
    811		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
    812	return 1;
    813}
    814
    815int w_ov_finished(struct drbd_work *w, int cancel)
    816{
    817	struct drbd_device_work *dw =
    818		container_of(w, struct drbd_device_work, w);
    819	struct drbd_device *device = dw->device;
    820	kfree(dw);
    821	ov_out_of_sync_print(device);
    822	drbd_resync_finished(device);
    823
    824	return 0;
    825}
    826
    827static int w_resync_finished(struct drbd_work *w, int cancel)
    828{
    829	struct drbd_device_work *dw =
    830		container_of(w, struct drbd_device_work, w);
    831	struct drbd_device *device = dw->device;
    832	kfree(dw);
    833
    834	drbd_resync_finished(device);
    835
    836	return 0;
    837}
    838
    839static void ping_peer(struct drbd_device *device)
    840{
    841	struct drbd_connection *connection = first_peer_device(device)->connection;
    842
    843	clear_bit(GOT_PING_ACK, &connection->flags);
    844	request_ping(connection);
    845	wait_event(connection->ping_wait,
    846		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
    847}
    848
    849int drbd_resync_finished(struct drbd_device *device)
    850{
    851	struct drbd_connection *connection = first_peer_device(device)->connection;
    852	unsigned long db, dt, dbdt;
    853	unsigned long n_oos;
    854	union drbd_state os, ns;
    855	struct drbd_device_work *dw;
    856	char *khelper_cmd = NULL;
    857	int verify_done = 0;
    858
    859	/* Remove all elements from the resync LRU. Since future actions
    860	 * might set bits in the (main) bitmap, then the entries in the
    861	 * resync LRU would be wrong. */
    862	if (drbd_rs_del_all(device)) {
    863		/* In case this is not possible now, most probably because
    864		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
    865		 * queue (or even the read operations for those packets
    866		 * is not finished by now).   Retry in 100ms. */
    867
    868		schedule_timeout_interruptible(HZ / 10);
    869		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
    870		if (dw) {
    871			dw->w.cb = w_resync_finished;
    872			dw->device = device;
    873			drbd_queue_work(&connection->sender_work, &dw->w);
    874			return 1;
    875		}
    876		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
    877	}
    878
    879	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
    880	if (dt <= 0)
    881		dt = 1;
    882
    883	db = device->rs_total;
    884	/* adjust for verify start and stop sectors, respective reached position */
    885	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
    886		db -= device->ov_left;
    887
    888	dbdt = Bit2KB(db/dt);
    889	device->rs_paused /= HZ;
    890
    891	if (!get_ldev(device))
    892		goto out;
    893
    894	ping_peer(device);
    895
    896	spin_lock_irq(&device->resource->req_lock);
    897	os = drbd_read_state(device);
    898
    899	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
    900
    901	/* This protects us against multiple calls (that can happen in the presence
    902	   of application IO), and against connectivity loss just before we arrive here. */
    903	if (os.conn <= C_CONNECTED)
    904		goto out_unlock;
    905
    906	ns = os;
    907	ns.conn = C_CONNECTED;
    908
    909	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
    910	     verify_done ? "Online verify" : "Resync",
    911	     dt + device->rs_paused, device->rs_paused, dbdt);
    912
    913	n_oos = drbd_bm_total_weight(device);
    914
    915	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
    916		if (n_oos) {
    917			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
    918			      n_oos, Bit2KB(1));
    919			khelper_cmd = "out-of-sync";
    920		}
    921	} else {
    922		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
    923
    924		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
    925			khelper_cmd = "after-resync-target";
    926
    927		if (device->use_csums && device->rs_total) {
    928			const unsigned long s = device->rs_same_csum;
    929			const unsigned long t = device->rs_total;
    930			const int ratio =
    931				(t == 0)     ? 0 :
    932			(t < 100000) ? ((s*100)/t) : (s/(t/100));
    933			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
    934			     "transferred %luK total %luK\n",
    935			     ratio,
    936			     Bit2KB(device->rs_same_csum),
    937			     Bit2KB(device->rs_total - device->rs_same_csum),
    938			     Bit2KB(device->rs_total));
    939		}
    940	}
    941
    942	if (device->rs_failed) {
    943		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
    944
    945		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
    946			ns.disk = D_INCONSISTENT;
    947			ns.pdsk = D_UP_TO_DATE;
    948		} else {
    949			ns.disk = D_UP_TO_DATE;
    950			ns.pdsk = D_INCONSISTENT;
    951		}
    952	} else {
    953		ns.disk = D_UP_TO_DATE;
    954		ns.pdsk = D_UP_TO_DATE;
    955
    956		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
    957			if (device->p_uuid) {
    958				int i;
    959				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
    960					_drbd_uuid_set(device, i, device->p_uuid[i]);
    961				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
    962				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
    963			} else {
    964				drbd_err(device, "device->p_uuid is NULL! BUG\n");
    965			}
    966		}
    967
    968		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
    969			/* for verify runs, we don't update uuids here,
    970			 * so there would be nothing to report. */
    971			drbd_uuid_set_bm(device, 0UL);
    972			drbd_print_uuids(device, "updated UUIDs");
    973			if (device->p_uuid) {
    974				/* Now the two UUID sets are equal, update what we
    975				 * know of the peer. */
    976				int i;
    977				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
    978					device->p_uuid[i] = device->ldev->md.uuid[i];
    979			}
    980		}
    981	}
    982
    983	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
    984out_unlock:
    985	spin_unlock_irq(&device->resource->req_lock);
    986
    987	/* If we have been sync source, and have an effective fencing-policy,
    988	 * once *all* volumes are back in sync, call "unfence". */
    989	if (os.conn == C_SYNC_SOURCE) {
    990		enum drbd_disk_state disk_state = D_MASK;
    991		enum drbd_disk_state pdsk_state = D_MASK;
    992		enum drbd_fencing_p fp = FP_DONT_CARE;
    993
    994		rcu_read_lock();
    995		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
    996		if (fp != FP_DONT_CARE) {
    997			struct drbd_peer_device *peer_device;
    998			int vnr;
    999			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   1000				struct drbd_device *device = peer_device->device;
   1001				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
   1002				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
   1003			}
   1004		}
   1005		rcu_read_unlock();
   1006		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
   1007			conn_khelper(connection, "unfence-peer");
   1008	}
   1009
   1010	put_ldev(device);
   1011out:
   1012	device->rs_total  = 0;
   1013	device->rs_failed = 0;
   1014	device->rs_paused = 0;
   1015
   1016	/* reset start sector, if we reached end of device */
   1017	if (verify_done && device->ov_left == 0)
   1018		device->ov_start_sector = 0;
   1019
   1020	drbd_md_sync(device);
   1021
   1022	if (khelper_cmd)
   1023		drbd_khelper(device, khelper_cmd);
   1024
   1025	return 1;
   1026}
   1027
   1028/* helper */
   1029static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
   1030{
   1031	if (drbd_peer_req_has_active_page(peer_req)) {
   1032		/* This might happen if sendpage() has not finished */
   1033		int i = PFN_UP(peer_req->i.size);
   1034		atomic_add(i, &device->pp_in_use_by_net);
   1035		atomic_sub(i, &device->pp_in_use);
   1036		spin_lock_irq(&device->resource->req_lock);
   1037		list_add_tail(&peer_req->w.list, &device->net_ee);
   1038		spin_unlock_irq(&device->resource->req_lock);
   1039		wake_up(&drbd_pp_wait);
   1040	} else
   1041		drbd_free_peer_req(device, peer_req);
   1042}
   1043
   1044/**
   1045 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
   1046 * @w:		work object.
   1047 * @cancel:	The connection will be closed anyways
   1048 */
   1049int w_e_end_data_req(struct drbd_work *w, int cancel)
   1050{
   1051	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
   1052	struct drbd_peer_device *peer_device = peer_req->peer_device;
   1053	struct drbd_device *device = peer_device->device;
   1054	int err;
   1055
   1056	if (unlikely(cancel)) {
   1057		drbd_free_peer_req(device, peer_req);
   1058		dec_unacked(device);
   1059		return 0;
   1060	}
   1061
   1062	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   1063		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
   1064	} else {
   1065		if (__ratelimit(&drbd_ratelimit_state))
   1066			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
   1067			    (unsigned long long)peer_req->i.sector);
   1068
   1069		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
   1070	}
   1071
   1072	dec_unacked(device);
   1073
   1074	move_to_net_ee_or_free(device, peer_req);
   1075
   1076	if (unlikely(err))
   1077		drbd_err(device, "drbd_send_block() failed\n");
   1078	return err;
   1079}
   1080
   1081static bool all_zero(struct drbd_peer_request *peer_req)
   1082{
   1083	struct page *page = peer_req->pages;
   1084	unsigned int len = peer_req->i.size;
   1085
   1086	page_chain_for_each(page) {
   1087		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
   1088		unsigned int i, words = l / sizeof(long);
   1089		unsigned long *d;
   1090
   1091		d = kmap_atomic(page);
   1092		for (i = 0; i < words; i++) {
   1093			if (d[i]) {
   1094				kunmap_atomic(d);
   1095				return false;
   1096			}
   1097		}
   1098		kunmap_atomic(d);
   1099		len -= l;
   1100	}
   1101
   1102	return true;
   1103}
   1104
   1105/**
   1106 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
   1107 * @w:		work object.
   1108 * @cancel:	The connection will be closed anyways
   1109 */
   1110int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
   1111{
   1112	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
   1113	struct drbd_peer_device *peer_device = peer_req->peer_device;
   1114	struct drbd_device *device = peer_device->device;
   1115	int err;
   1116
   1117	if (unlikely(cancel)) {
   1118		drbd_free_peer_req(device, peer_req);
   1119		dec_unacked(device);
   1120		return 0;
   1121	}
   1122
   1123	if (get_ldev_if_state(device, D_FAILED)) {
   1124		drbd_rs_complete_io(device, peer_req->i.sector);
   1125		put_ldev(device);
   1126	}
   1127
   1128	if (device->state.conn == C_AHEAD) {
   1129		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
   1130	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   1131		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
   1132			inc_rs_pending(device);
   1133			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
   1134				err = drbd_send_rs_deallocated(peer_device, peer_req);
   1135			else
   1136				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
   1137		} else {
   1138			if (__ratelimit(&drbd_ratelimit_state))
   1139				drbd_err(device, "Not sending RSDataReply, "
   1140				    "partner DISKLESS!\n");
   1141			err = 0;
   1142		}
   1143	} else {
   1144		if (__ratelimit(&drbd_ratelimit_state))
   1145			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
   1146			    (unsigned long long)peer_req->i.sector);
   1147
   1148		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
   1149
   1150		/* update resync data with failure */
   1151		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
   1152	}
   1153
   1154	dec_unacked(device);
   1155
   1156	move_to_net_ee_or_free(device, peer_req);
   1157
   1158	if (unlikely(err))
   1159		drbd_err(device, "drbd_send_block() failed\n");
   1160	return err;
   1161}
   1162
   1163int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
   1164{
   1165	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
   1166	struct drbd_peer_device *peer_device = peer_req->peer_device;
   1167	struct drbd_device *device = peer_device->device;
   1168	struct digest_info *di;
   1169	int digest_size;
   1170	void *digest = NULL;
   1171	int err, eq = 0;
   1172
   1173	if (unlikely(cancel)) {
   1174		drbd_free_peer_req(device, peer_req);
   1175		dec_unacked(device);
   1176		return 0;
   1177	}
   1178
   1179	if (get_ldev(device)) {
   1180		drbd_rs_complete_io(device, peer_req->i.sector);
   1181		put_ldev(device);
   1182	}
   1183
   1184	di = peer_req->digest;
   1185
   1186	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   1187		/* quick hack to try to avoid a race against reconfiguration.
   1188		 * a real fix would be much more involved,
   1189		 * introducing more locking mechanisms */
   1190		if (peer_device->connection->csums_tfm) {
   1191			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
   1192			D_ASSERT(device, digest_size == di->digest_size);
   1193			digest = kmalloc(digest_size, GFP_NOIO);
   1194		}
   1195		if (digest) {
   1196			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
   1197			eq = !memcmp(digest, di->digest, digest_size);
   1198			kfree(digest);
   1199		}
   1200
   1201		if (eq) {
   1202			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
   1203			/* rs_same_csums unit is BM_BLOCK_SIZE */
   1204			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
   1205			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
   1206		} else {
   1207			inc_rs_pending(device);
   1208			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
   1209			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
   1210			kfree(di);
   1211			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
   1212		}
   1213	} else {
   1214		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
   1215		if (__ratelimit(&drbd_ratelimit_state))
   1216			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
   1217	}
   1218
   1219	dec_unacked(device);
   1220	move_to_net_ee_or_free(device, peer_req);
   1221
   1222	if (unlikely(err))
   1223		drbd_err(device, "drbd_send_block/ack() failed\n");
   1224	return err;
   1225}
   1226
   1227int w_e_end_ov_req(struct drbd_work *w, int cancel)
   1228{
   1229	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
   1230	struct drbd_peer_device *peer_device = peer_req->peer_device;
   1231	struct drbd_device *device = peer_device->device;
   1232	sector_t sector = peer_req->i.sector;
   1233	unsigned int size = peer_req->i.size;
   1234	int digest_size;
   1235	void *digest;
   1236	int err = 0;
   1237
   1238	if (unlikely(cancel))
   1239		goto out;
   1240
   1241	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
   1242	digest = kmalloc(digest_size, GFP_NOIO);
   1243	if (!digest) {
   1244		err = 1;	/* terminate the connection in case the allocation failed */
   1245		goto out;
   1246	}
   1247
   1248	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
   1249		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
   1250	else
   1251		memset(digest, 0, digest_size);
   1252
   1253	/* Free e and pages before send.
   1254	 * In case we block on congestion, we could otherwise run into
   1255	 * some distributed deadlock, if the other side blocks on
   1256	 * congestion as well, because our receiver blocks in
   1257	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
   1258	drbd_free_peer_req(device, peer_req);
   1259	peer_req = NULL;
   1260	inc_rs_pending(device);
   1261	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
   1262	if (err)
   1263		dec_rs_pending(device);
   1264	kfree(digest);
   1265
   1266out:
   1267	if (peer_req)
   1268		drbd_free_peer_req(device, peer_req);
   1269	dec_unacked(device);
   1270	return err;
   1271}
   1272
   1273void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
   1274{
   1275	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
   1276		device->ov_last_oos_size += size>>9;
   1277	} else {
   1278		device->ov_last_oos_start = sector;
   1279		device->ov_last_oos_size = size>>9;
   1280	}
   1281	drbd_set_out_of_sync(device, sector, size);
   1282}
   1283
   1284int w_e_end_ov_reply(struct drbd_work *w, int cancel)
   1285{
   1286	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
   1287	struct drbd_peer_device *peer_device = peer_req->peer_device;
   1288	struct drbd_device *device = peer_device->device;
   1289	struct digest_info *di;
   1290	void *digest;
   1291	sector_t sector = peer_req->i.sector;
   1292	unsigned int size = peer_req->i.size;
   1293	int digest_size;
   1294	int err, eq = 0;
   1295	bool stop_sector_reached = false;
   1296
   1297	if (unlikely(cancel)) {
   1298		drbd_free_peer_req(device, peer_req);
   1299		dec_unacked(device);
   1300		return 0;
   1301	}
   1302
   1303	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
   1304	 * the resync lru has been cleaned up already */
   1305	if (get_ldev(device)) {
   1306		drbd_rs_complete_io(device, peer_req->i.sector);
   1307		put_ldev(device);
   1308	}
   1309
   1310	di = peer_req->digest;
   1311
   1312	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   1313		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
   1314		digest = kmalloc(digest_size, GFP_NOIO);
   1315		if (digest) {
   1316			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
   1317
   1318			D_ASSERT(device, digest_size == di->digest_size);
   1319			eq = !memcmp(digest, di->digest, digest_size);
   1320			kfree(digest);
   1321		}
   1322	}
   1323
   1324	/* Free peer_req and pages before send.
   1325	 * In case we block on congestion, we could otherwise run into
   1326	 * some distributed deadlock, if the other side blocks on
   1327	 * congestion as well, because our receiver blocks in
   1328	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
   1329	drbd_free_peer_req(device, peer_req);
   1330	if (!eq)
   1331		drbd_ov_out_of_sync_found(device, sector, size);
   1332	else
   1333		ov_out_of_sync_print(device);
   1334
   1335	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
   1336			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
   1337
   1338	dec_unacked(device);
   1339
   1340	--device->ov_left;
   1341
   1342	/* let's advance progress step marks only for every other megabyte */
   1343	if ((device->ov_left & 0x200) == 0x200)
   1344		drbd_advance_rs_marks(device, device->ov_left);
   1345
   1346	stop_sector_reached = verify_can_do_stop_sector(device) &&
   1347		(sector + (size>>9)) >= device->ov_stop_sector;
   1348
   1349	if (device->ov_left == 0 || stop_sector_reached) {
   1350		ov_out_of_sync_print(device);
   1351		drbd_resync_finished(device);
   1352	}
   1353
   1354	return err;
   1355}
   1356
   1357/* FIXME
   1358 * We need to track the number of pending barrier acks,
   1359 * and to be able to wait for them.
   1360 * See also comment in drbd_adm_attach before drbd_suspend_io.
   1361 */
   1362static int drbd_send_barrier(struct drbd_connection *connection)
   1363{
   1364	struct p_barrier *p;
   1365	struct drbd_socket *sock;
   1366
   1367	sock = &connection->data;
   1368	p = conn_prepare_command(connection, sock);
   1369	if (!p)
   1370		return -EIO;
   1371	p->barrier = connection->send.current_epoch_nr;
   1372	p->pad = 0;
   1373	connection->send.current_epoch_writes = 0;
   1374	connection->send.last_sent_barrier_jif = jiffies;
   1375
   1376	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
   1377}
   1378
   1379static int pd_send_unplug_remote(struct drbd_peer_device *pd)
   1380{
   1381	struct drbd_socket *sock = &pd->connection->data;
   1382	if (!drbd_prepare_command(pd, sock))
   1383		return -EIO;
   1384	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
   1385}
   1386
   1387int w_send_write_hint(struct drbd_work *w, int cancel)
   1388{
   1389	struct drbd_device *device =
   1390		container_of(w, struct drbd_device, unplug_work);
   1391
   1392	if (cancel)
   1393		return 0;
   1394	return pd_send_unplug_remote(first_peer_device(device));
   1395}
   1396
   1397static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
   1398{
   1399	if (!connection->send.seen_any_write_yet) {
   1400		connection->send.seen_any_write_yet = true;
   1401		connection->send.current_epoch_nr = epoch;
   1402		connection->send.current_epoch_writes = 0;
   1403		connection->send.last_sent_barrier_jif = jiffies;
   1404	}
   1405}
   1406
   1407static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
   1408{
   1409	/* re-init if first write on this connection */
   1410	if (!connection->send.seen_any_write_yet)
   1411		return;
   1412	if (connection->send.current_epoch_nr != epoch) {
   1413		if (connection->send.current_epoch_writes)
   1414			drbd_send_barrier(connection);
   1415		connection->send.current_epoch_nr = epoch;
   1416	}
   1417}
   1418
   1419int w_send_out_of_sync(struct drbd_work *w, int cancel)
   1420{
   1421	struct drbd_request *req = container_of(w, struct drbd_request, w);
   1422	struct drbd_device *device = req->device;
   1423	struct drbd_peer_device *const peer_device = first_peer_device(device);
   1424	struct drbd_connection *const connection = peer_device->connection;
   1425	int err;
   1426
   1427	if (unlikely(cancel)) {
   1428		req_mod(req, SEND_CANCELED);
   1429		return 0;
   1430	}
   1431	req->pre_send_jif = jiffies;
   1432
   1433	/* this time, no connection->send.current_epoch_writes++;
   1434	 * If it was sent, it was the closing barrier for the last
   1435	 * replicated epoch, before we went into AHEAD mode.
   1436	 * No more barriers will be sent, until we leave AHEAD mode again. */
   1437	maybe_send_barrier(connection, req->epoch);
   1438
   1439	err = drbd_send_out_of_sync(peer_device, req);
   1440	req_mod(req, OOS_HANDED_TO_NETWORK);
   1441
   1442	return err;
   1443}
   1444
   1445/**
   1446 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
   1447 * @w:		work object.
   1448 * @cancel:	The connection will be closed anyways
   1449 */
   1450int w_send_dblock(struct drbd_work *w, int cancel)
   1451{
   1452	struct drbd_request *req = container_of(w, struct drbd_request, w);
   1453	struct drbd_device *device = req->device;
   1454	struct drbd_peer_device *const peer_device = first_peer_device(device);
   1455	struct drbd_connection *connection = peer_device->connection;
   1456	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
   1457	int err;
   1458
   1459	if (unlikely(cancel)) {
   1460		req_mod(req, SEND_CANCELED);
   1461		return 0;
   1462	}
   1463	req->pre_send_jif = jiffies;
   1464
   1465	re_init_if_first_write(connection, req->epoch);
   1466	maybe_send_barrier(connection, req->epoch);
   1467	connection->send.current_epoch_writes++;
   1468
   1469	err = drbd_send_dblock(peer_device, req);
   1470	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
   1471
   1472	if (do_send_unplug && !err)
   1473		pd_send_unplug_remote(peer_device);
   1474
   1475	return err;
   1476}
   1477
   1478/**
   1479 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
   1480 * @w:		work object.
   1481 * @cancel:	The connection will be closed anyways
   1482 */
   1483int w_send_read_req(struct drbd_work *w, int cancel)
   1484{
   1485	struct drbd_request *req = container_of(w, struct drbd_request, w);
   1486	struct drbd_device *device = req->device;
   1487	struct drbd_peer_device *const peer_device = first_peer_device(device);
   1488	struct drbd_connection *connection = peer_device->connection;
   1489	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
   1490	int err;
   1491
   1492	if (unlikely(cancel)) {
   1493		req_mod(req, SEND_CANCELED);
   1494		return 0;
   1495	}
   1496	req->pre_send_jif = jiffies;
   1497
   1498	/* Even read requests may close a write epoch,
   1499	 * if there was any yet. */
   1500	maybe_send_barrier(connection, req->epoch);
   1501
   1502	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
   1503				 (unsigned long)req);
   1504
   1505	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
   1506
   1507	if (do_send_unplug && !err)
   1508		pd_send_unplug_remote(peer_device);
   1509
   1510	return err;
   1511}
   1512
   1513int w_restart_disk_io(struct drbd_work *w, int cancel)
   1514{
   1515	struct drbd_request *req = container_of(w, struct drbd_request, w);
   1516	struct drbd_device *device = req->device;
   1517
   1518	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
   1519		drbd_al_begin_io(device, &req->i);
   1520
   1521	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
   1522					   req->master_bio, GFP_NOIO,
   1523					  &drbd_io_bio_set);
   1524	req->private_bio->bi_private = req;
   1525	req->private_bio->bi_end_io = drbd_request_endio;
   1526	submit_bio_noacct(req->private_bio);
   1527
   1528	return 0;
   1529}
   1530
   1531static int _drbd_may_sync_now(struct drbd_device *device)
   1532{
   1533	struct drbd_device *odev = device;
   1534	int resync_after;
   1535
   1536	while (1) {
   1537		if (!odev->ldev || odev->state.disk == D_DISKLESS)
   1538			return 1;
   1539		rcu_read_lock();
   1540		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
   1541		rcu_read_unlock();
   1542		if (resync_after == -1)
   1543			return 1;
   1544		odev = minor_to_device(resync_after);
   1545		if (!odev)
   1546			return 1;
   1547		if ((odev->state.conn >= C_SYNC_SOURCE &&
   1548		     odev->state.conn <= C_PAUSED_SYNC_T) ||
   1549		    odev->state.aftr_isp || odev->state.peer_isp ||
   1550		    odev->state.user_isp)
   1551			return 0;
   1552	}
   1553}
   1554
   1555/**
   1556 * drbd_pause_after() - Pause resync on all devices that may not resync now
   1557 * @device:	DRBD device.
   1558 *
   1559 * Called from process context only (admin command and after_state_ch).
   1560 */
   1561static bool drbd_pause_after(struct drbd_device *device)
   1562{
   1563	bool changed = false;
   1564	struct drbd_device *odev;
   1565	int i;
   1566
   1567	rcu_read_lock();
   1568	idr_for_each_entry(&drbd_devices, odev, i) {
   1569		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
   1570			continue;
   1571		if (!_drbd_may_sync_now(odev) &&
   1572		    _drbd_set_state(_NS(odev, aftr_isp, 1),
   1573				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
   1574			changed = true;
   1575	}
   1576	rcu_read_unlock();
   1577
   1578	return changed;
   1579}
   1580
   1581/**
   1582 * drbd_resume_next() - Resume resync on all devices that may resync now
   1583 * @device:	DRBD device.
   1584 *
   1585 * Called from process context only (admin command and worker).
   1586 */
   1587static bool drbd_resume_next(struct drbd_device *device)
   1588{
   1589	bool changed = false;
   1590	struct drbd_device *odev;
   1591	int i;
   1592
   1593	rcu_read_lock();
   1594	idr_for_each_entry(&drbd_devices, odev, i) {
   1595		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
   1596			continue;
   1597		if (odev->state.aftr_isp) {
   1598			if (_drbd_may_sync_now(odev) &&
   1599			    _drbd_set_state(_NS(odev, aftr_isp, 0),
   1600					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
   1601				changed = true;
   1602		}
   1603	}
   1604	rcu_read_unlock();
   1605	return changed;
   1606}
   1607
   1608void resume_next_sg(struct drbd_device *device)
   1609{
   1610	lock_all_resources();
   1611	drbd_resume_next(device);
   1612	unlock_all_resources();
   1613}
   1614
   1615void suspend_other_sg(struct drbd_device *device)
   1616{
   1617	lock_all_resources();
   1618	drbd_pause_after(device);
   1619	unlock_all_resources();
   1620}
   1621
   1622/* caller must lock_all_resources() */
   1623enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
   1624{
   1625	struct drbd_device *odev;
   1626	int resync_after;
   1627
   1628	if (o_minor == -1)
   1629		return NO_ERROR;
   1630	if (o_minor < -1 || o_minor > MINORMASK)
   1631		return ERR_RESYNC_AFTER;
   1632
   1633	/* check for loops */
   1634	odev = minor_to_device(o_minor);
   1635	while (1) {
   1636		if (odev == device)
   1637			return ERR_RESYNC_AFTER_CYCLE;
   1638
   1639		/* You are free to depend on diskless, non-existing,
   1640		 * or not yet/no longer existing minors.
   1641		 * We only reject dependency loops.
   1642		 * We cannot follow the dependency chain beyond a detached or
   1643		 * missing minor.
   1644		 */
   1645		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
   1646			return NO_ERROR;
   1647
   1648		rcu_read_lock();
   1649		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
   1650		rcu_read_unlock();
   1651		/* dependency chain ends here, no cycles. */
   1652		if (resync_after == -1)
   1653			return NO_ERROR;
   1654
   1655		/* follow the dependency chain */
   1656		odev = minor_to_device(resync_after);
   1657	}
   1658}
   1659
   1660/* caller must lock_all_resources() */
   1661void drbd_resync_after_changed(struct drbd_device *device)
   1662{
   1663	int changed;
   1664
   1665	do {
   1666		changed  = drbd_pause_after(device);
   1667		changed |= drbd_resume_next(device);
   1668	} while (changed);
   1669}
   1670
   1671void drbd_rs_controller_reset(struct drbd_device *device)
   1672{
   1673	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
   1674	struct fifo_buffer *plan;
   1675
   1676	atomic_set(&device->rs_sect_in, 0);
   1677	atomic_set(&device->rs_sect_ev, 0);
   1678	device->rs_in_flight = 0;
   1679	device->rs_last_events =
   1680		(int)part_stat_read_accum(disk->part0, sectors);
   1681
   1682	/* Updating the RCU protected object in place is necessary since
   1683	   this function gets called from atomic context.
   1684	   It is valid since all other updates also lead to an completely
   1685	   empty fifo */
   1686	rcu_read_lock();
   1687	plan = rcu_dereference(device->rs_plan_s);
   1688	plan->total = 0;
   1689	fifo_set(plan, 0);
   1690	rcu_read_unlock();
   1691}
   1692
   1693void start_resync_timer_fn(struct timer_list *t)
   1694{
   1695	struct drbd_device *device = from_timer(device, t, start_resync_timer);
   1696	drbd_device_post_work(device, RS_START);
   1697}
   1698
   1699static void do_start_resync(struct drbd_device *device)
   1700{
   1701	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
   1702		drbd_warn(device, "postponing start_resync ...\n");
   1703		device->start_resync_timer.expires = jiffies + HZ/10;
   1704		add_timer(&device->start_resync_timer);
   1705		return;
   1706	}
   1707
   1708	drbd_start_resync(device, C_SYNC_SOURCE);
   1709	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
   1710}
   1711
   1712static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
   1713{
   1714	bool csums_after_crash_only;
   1715	rcu_read_lock();
   1716	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
   1717	rcu_read_unlock();
   1718	return connection->agreed_pro_version >= 89 &&		/* supported? */
   1719		connection->csums_tfm &&			/* configured? */
   1720		(csums_after_crash_only == false		/* use for each resync? */
   1721		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
   1722}
   1723
   1724/**
   1725 * drbd_start_resync() - Start the resync process
   1726 * @device:	DRBD device.
   1727 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
   1728 *
   1729 * This function might bring you directly into one of the
   1730 * C_PAUSED_SYNC_* states.
   1731 */
   1732void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
   1733{
   1734	struct drbd_peer_device *peer_device = first_peer_device(device);
   1735	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
   1736	union drbd_state ns;
   1737	int r;
   1738
   1739	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
   1740		drbd_err(device, "Resync already running!\n");
   1741		return;
   1742	}
   1743
   1744	if (!connection) {
   1745		drbd_err(device, "No connection to peer, aborting!\n");
   1746		return;
   1747	}
   1748
   1749	if (!test_bit(B_RS_H_DONE, &device->flags)) {
   1750		if (side == C_SYNC_TARGET) {
   1751			/* Since application IO was locked out during C_WF_BITMAP_T and
   1752			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
   1753			   we check that we might make the data inconsistent. */
   1754			r = drbd_khelper(device, "before-resync-target");
   1755			r = (r >> 8) & 0xff;
   1756			if (r > 0) {
   1757				drbd_info(device, "before-resync-target handler returned %d, "
   1758					 "dropping connection.\n", r);
   1759				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
   1760				return;
   1761			}
   1762		} else /* C_SYNC_SOURCE */ {
   1763			r = drbd_khelper(device, "before-resync-source");
   1764			r = (r >> 8) & 0xff;
   1765			if (r > 0) {
   1766				if (r == 3) {
   1767					drbd_info(device, "before-resync-source handler returned %d, "
   1768						 "ignoring. Old userland tools?", r);
   1769				} else {
   1770					drbd_info(device, "before-resync-source handler returned %d, "
   1771						 "dropping connection.\n", r);
   1772					conn_request_state(connection,
   1773							   NS(conn, C_DISCONNECTING), CS_HARD);
   1774					return;
   1775				}
   1776			}
   1777		}
   1778	}
   1779
   1780	if (current == connection->worker.task) {
   1781		/* The worker should not sleep waiting for state_mutex,
   1782		   that can take long */
   1783		if (!mutex_trylock(device->state_mutex)) {
   1784			set_bit(B_RS_H_DONE, &device->flags);
   1785			device->start_resync_timer.expires = jiffies + HZ/5;
   1786			add_timer(&device->start_resync_timer);
   1787			return;
   1788		}
   1789	} else {
   1790		mutex_lock(device->state_mutex);
   1791	}
   1792
   1793	lock_all_resources();
   1794	clear_bit(B_RS_H_DONE, &device->flags);
   1795	/* Did some connection breakage or IO error race with us? */
   1796	if (device->state.conn < C_CONNECTED
   1797	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
   1798		unlock_all_resources();
   1799		goto out;
   1800	}
   1801
   1802	ns = drbd_read_state(device);
   1803
   1804	ns.aftr_isp = !_drbd_may_sync_now(device);
   1805
   1806	ns.conn = side;
   1807
   1808	if (side == C_SYNC_TARGET)
   1809		ns.disk = D_INCONSISTENT;
   1810	else /* side == C_SYNC_SOURCE */
   1811		ns.pdsk = D_INCONSISTENT;
   1812
   1813	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
   1814	ns = drbd_read_state(device);
   1815
   1816	if (ns.conn < C_CONNECTED)
   1817		r = SS_UNKNOWN_ERROR;
   1818
   1819	if (r == SS_SUCCESS) {
   1820		unsigned long tw = drbd_bm_total_weight(device);
   1821		unsigned long now = jiffies;
   1822		int i;
   1823
   1824		device->rs_failed    = 0;
   1825		device->rs_paused    = 0;
   1826		device->rs_same_csum = 0;
   1827		device->rs_last_sect_ev = 0;
   1828		device->rs_total     = tw;
   1829		device->rs_start     = now;
   1830		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
   1831			device->rs_mark_left[i] = tw;
   1832			device->rs_mark_time[i] = now;
   1833		}
   1834		drbd_pause_after(device);
   1835		/* Forget potentially stale cached per resync extent bit-counts.
   1836		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
   1837		 * disabled, and know the disk state is ok. */
   1838		spin_lock(&device->al_lock);
   1839		lc_reset(device->resync);
   1840		device->resync_locked = 0;
   1841		device->resync_wenr = LC_FREE;
   1842		spin_unlock(&device->al_lock);
   1843	}
   1844	unlock_all_resources();
   1845
   1846	if (r == SS_SUCCESS) {
   1847		wake_up(&device->al_wait); /* for lc_reset() above */
   1848		/* reset rs_last_bcast when a resync or verify is started,
   1849		 * to deal with potential jiffies wrap. */
   1850		device->rs_last_bcast = jiffies - HZ;
   1851
   1852		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
   1853		     drbd_conn_str(ns.conn),
   1854		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
   1855		     (unsigned long) device->rs_total);
   1856		if (side == C_SYNC_TARGET) {
   1857			device->bm_resync_fo = 0;
   1858			device->use_csums = use_checksum_based_resync(connection, device);
   1859		} else {
   1860			device->use_csums = false;
   1861		}
   1862
   1863		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
   1864		 * with w_send_oos, or the sync target will get confused as to
   1865		 * how much bits to resync.  We cannot do that always, because for an
   1866		 * empty resync and protocol < 95, we need to do it here, as we call
   1867		 * drbd_resync_finished from here in that case.
   1868		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
   1869		 * and from after_state_ch otherwise. */
   1870		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
   1871			drbd_gen_and_send_sync_uuid(peer_device);
   1872
   1873		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
   1874			/* This still has a race (about when exactly the peers
   1875			 * detect connection loss) that can lead to a full sync
   1876			 * on next handshake. In 8.3.9 we fixed this with explicit
   1877			 * resync-finished notifications, but the fix
   1878			 * introduces a protocol change.  Sleeping for some
   1879			 * time longer than the ping interval + timeout on the
   1880			 * SyncSource, to give the SyncTarget the chance to
   1881			 * detect connection loss, then waiting for a ping
   1882			 * response (implicit in drbd_resync_finished) reduces
   1883			 * the race considerably, but does not solve it. */
   1884			if (side == C_SYNC_SOURCE) {
   1885				struct net_conf *nc;
   1886				int timeo;
   1887
   1888				rcu_read_lock();
   1889				nc = rcu_dereference(connection->net_conf);
   1890				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
   1891				rcu_read_unlock();
   1892				schedule_timeout_interruptible(timeo);
   1893			}
   1894			drbd_resync_finished(device);
   1895		}
   1896
   1897		drbd_rs_controller_reset(device);
   1898		/* ns.conn may already be != device->state.conn,
   1899		 * we may have been paused in between, or become paused until
   1900		 * the timer triggers.
   1901		 * No matter, that is handled in resync_timer_fn() */
   1902		if (ns.conn == C_SYNC_TARGET)
   1903			mod_timer(&device->resync_timer, jiffies);
   1904
   1905		drbd_md_sync(device);
   1906	}
   1907	put_ldev(device);
   1908out:
   1909	mutex_unlock(device->state_mutex);
   1910}
   1911
   1912static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
   1913{
   1914	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
   1915	device->rs_last_bcast = jiffies;
   1916
   1917	if (!get_ldev(device))
   1918		return;
   1919
   1920	drbd_bm_write_lazy(device, 0);
   1921	if (resync_done && is_sync_state(device->state.conn))
   1922		drbd_resync_finished(device);
   1923
   1924	drbd_bcast_event(device, &sib);
   1925	/* update timestamp, in case it took a while to write out stuff */
   1926	device->rs_last_bcast = jiffies;
   1927	put_ldev(device);
   1928}
   1929
   1930static void drbd_ldev_destroy(struct drbd_device *device)
   1931{
   1932	lc_destroy(device->resync);
   1933	device->resync = NULL;
   1934	lc_destroy(device->act_log);
   1935	device->act_log = NULL;
   1936
   1937	__acquire(local);
   1938	drbd_backing_dev_free(device, device->ldev);
   1939	device->ldev = NULL;
   1940	__release(local);
   1941
   1942	clear_bit(GOING_DISKLESS, &device->flags);
   1943	wake_up(&device->misc_wait);
   1944}
   1945
   1946static void go_diskless(struct drbd_device *device)
   1947{
   1948	D_ASSERT(device, device->state.disk == D_FAILED);
   1949	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
   1950	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
   1951	 * the protected members anymore, though, so once put_ldev reaches zero
   1952	 * again, it will be safe to free them. */
   1953
   1954	/* Try to write changed bitmap pages, read errors may have just
   1955	 * set some bits outside the area covered by the activity log.
   1956	 *
   1957	 * If we have an IO error during the bitmap writeout,
   1958	 * we will want a full sync next time, just in case.
   1959	 * (Do we want a specific meta data flag for this?)
   1960	 *
   1961	 * If that does not make it to stable storage either,
   1962	 * we cannot do anything about that anymore.
   1963	 *
   1964	 * We still need to check if both bitmap and ldev are present, we may
   1965	 * end up here after a failed attach, before ldev was even assigned.
   1966	 */
   1967	if (device->bitmap && device->ldev) {
   1968		/* An interrupted resync or similar is allowed to recounts bits
   1969		 * while we detach.
   1970		 * Any modifications would not be expected anymore, though.
   1971		 */
   1972		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
   1973					"detach", BM_LOCKED_TEST_ALLOWED)) {
   1974			if (test_bit(WAS_READ_ERROR, &device->flags)) {
   1975				drbd_md_set_flag(device, MDF_FULL_SYNC);
   1976				drbd_md_sync(device);
   1977			}
   1978		}
   1979	}
   1980
   1981	drbd_force_state(device, NS(disk, D_DISKLESS));
   1982}
   1983
   1984static int do_md_sync(struct drbd_device *device)
   1985{
   1986	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
   1987	drbd_md_sync(device);
   1988	return 0;
   1989}
   1990
   1991/* only called from drbd_worker thread, no locking */
   1992void __update_timing_details(
   1993		struct drbd_thread_timing_details *tdp,
   1994		unsigned int *cb_nr,
   1995		void *cb,
   1996		const char *fn, const unsigned int line)
   1997{
   1998	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
   1999	struct drbd_thread_timing_details *td = tdp + i;
   2000
   2001	td->start_jif = jiffies;
   2002	td->cb_addr = cb;
   2003	td->caller_fn = fn;
   2004	td->line = line;
   2005	td->cb_nr = *cb_nr;
   2006
   2007	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
   2008	td = tdp + i;
   2009	memset(td, 0, sizeof(*td));
   2010
   2011	++(*cb_nr);
   2012}
   2013
   2014static void do_device_work(struct drbd_device *device, const unsigned long todo)
   2015{
   2016	if (test_bit(MD_SYNC, &todo))
   2017		do_md_sync(device);
   2018	if (test_bit(RS_DONE, &todo) ||
   2019	    test_bit(RS_PROGRESS, &todo))
   2020		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
   2021	if (test_bit(GO_DISKLESS, &todo))
   2022		go_diskless(device);
   2023	if (test_bit(DESTROY_DISK, &todo))
   2024		drbd_ldev_destroy(device);
   2025	if (test_bit(RS_START, &todo))
   2026		do_start_resync(device);
   2027}
   2028
   2029#define DRBD_DEVICE_WORK_MASK	\
   2030	((1UL << GO_DISKLESS)	\
   2031	|(1UL << DESTROY_DISK)	\
   2032	|(1UL << MD_SYNC)	\
   2033	|(1UL << RS_START)	\
   2034	|(1UL << RS_PROGRESS)	\
   2035	|(1UL << RS_DONE)	\
   2036	)
   2037
   2038static unsigned long get_work_bits(unsigned long *flags)
   2039{
   2040	unsigned long old, new;
   2041	do {
   2042		old = *flags;
   2043		new = old & ~DRBD_DEVICE_WORK_MASK;
   2044	} while (cmpxchg(flags, old, new) != old);
   2045	return old & DRBD_DEVICE_WORK_MASK;
   2046}
   2047
   2048static void do_unqueued_work(struct drbd_connection *connection)
   2049{
   2050	struct drbd_peer_device *peer_device;
   2051	int vnr;
   2052
   2053	rcu_read_lock();
   2054	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   2055		struct drbd_device *device = peer_device->device;
   2056		unsigned long todo = get_work_bits(&device->flags);
   2057		if (!todo)
   2058			continue;
   2059
   2060		kref_get(&device->kref);
   2061		rcu_read_unlock();
   2062		do_device_work(device, todo);
   2063		kref_put(&device->kref, drbd_destroy_device);
   2064		rcu_read_lock();
   2065	}
   2066	rcu_read_unlock();
   2067}
   2068
   2069static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
   2070{
   2071	spin_lock_irq(&queue->q_lock);
   2072	list_splice_tail_init(&queue->q, work_list);
   2073	spin_unlock_irq(&queue->q_lock);
   2074	return !list_empty(work_list);
   2075}
   2076
   2077static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
   2078{
   2079	DEFINE_WAIT(wait);
   2080	struct net_conf *nc;
   2081	int uncork, cork;
   2082
   2083	dequeue_work_batch(&connection->sender_work, work_list);
   2084	if (!list_empty(work_list))
   2085		return;
   2086
   2087	/* Still nothing to do?
   2088	 * Maybe we still need to close the current epoch,
   2089	 * even if no new requests are queued yet.
   2090	 *
   2091	 * Also, poke TCP, just in case.
   2092	 * Then wait for new work (or signal). */
   2093	rcu_read_lock();
   2094	nc = rcu_dereference(connection->net_conf);
   2095	uncork = nc ? nc->tcp_cork : 0;
   2096	rcu_read_unlock();
   2097	if (uncork) {
   2098		mutex_lock(&connection->data.mutex);
   2099		if (connection->data.socket)
   2100			tcp_sock_set_cork(connection->data.socket->sk, false);
   2101		mutex_unlock(&connection->data.mutex);
   2102	}
   2103
   2104	for (;;) {
   2105		int send_barrier;
   2106		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
   2107		spin_lock_irq(&connection->resource->req_lock);
   2108		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
   2109		if (!list_empty(&connection->sender_work.q))
   2110			list_splice_tail_init(&connection->sender_work.q, work_list);
   2111		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
   2112		if (!list_empty(work_list) || signal_pending(current)) {
   2113			spin_unlock_irq(&connection->resource->req_lock);
   2114			break;
   2115		}
   2116
   2117		/* We found nothing new to do, no to-be-communicated request,
   2118		 * no other work item.  We may still need to close the last
   2119		 * epoch.  Next incoming request epoch will be connection ->
   2120		 * current transfer log epoch number.  If that is different
   2121		 * from the epoch of the last request we communicated, it is
   2122		 * safe to send the epoch separating barrier now.
   2123		 */
   2124		send_barrier =
   2125			atomic_read(&connection->current_tle_nr) !=
   2126			connection->send.current_epoch_nr;
   2127		spin_unlock_irq(&connection->resource->req_lock);
   2128
   2129		if (send_barrier)
   2130			maybe_send_barrier(connection,
   2131					connection->send.current_epoch_nr + 1);
   2132
   2133		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
   2134			break;
   2135
   2136		/* drbd_send() may have called flush_signals() */
   2137		if (get_t_state(&connection->worker) != RUNNING)
   2138			break;
   2139
   2140		schedule();
   2141		/* may be woken up for other things but new work, too,
   2142		 * e.g. if the current epoch got closed.
   2143		 * In which case we send the barrier above. */
   2144	}
   2145	finish_wait(&connection->sender_work.q_wait, &wait);
   2146
   2147	/* someone may have changed the config while we have been waiting above. */
   2148	rcu_read_lock();
   2149	nc = rcu_dereference(connection->net_conf);
   2150	cork = nc ? nc->tcp_cork : 0;
   2151	rcu_read_unlock();
   2152	mutex_lock(&connection->data.mutex);
   2153	if (connection->data.socket) {
   2154		if (cork)
   2155			tcp_sock_set_cork(connection->data.socket->sk, true);
   2156		else if (!uncork)
   2157			tcp_sock_set_cork(connection->data.socket->sk, false);
   2158	}
   2159	mutex_unlock(&connection->data.mutex);
   2160}
   2161
   2162int drbd_worker(struct drbd_thread *thi)
   2163{
   2164	struct drbd_connection *connection = thi->connection;
   2165	struct drbd_work *w = NULL;
   2166	struct drbd_peer_device *peer_device;
   2167	LIST_HEAD(work_list);
   2168	int vnr;
   2169
   2170	while (get_t_state(thi) == RUNNING) {
   2171		drbd_thread_current_set_cpu(thi);
   2172
   2173		if (list_empty(&work_list)) {
   2174			update_worker_timing_details(connection, wait_for_work);
   2175			wait_for_work(connection, &work_list);
   2176		}
   2177
   2178		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
   2179			update_worker_timing_details(connection, do_unqueued_work);
   2180			do_unqueued_work(connection);
   2181		}
   2182
   2183		if (signal_pending(current)) {
   2184			flush_signals(current);
   2185			if (get_t_state(thi) == RUNNING) {
   2186				drbd_warn(connection, "Worker got an unexpected signal\n");
   2187				continue;
   2188			}
   2189			break;
   2190		}
   2191
   2192		if (get_t_state(thi) != RUNNING)
   2193			break;
   2194
   2195		if (!list_empty(&work_list)) {
   2196			w = list_first_entry(&work_list, struct drbd_work, list);
   2197			list_del_init(&w->list);
   2198			update_worker_timing_details(connection, w->cb);
   2199			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
   2200				continue;
   2201			if (connection->cstate >= C_WF_REPORT_PARAMS)
   2202				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
   2203		}
   2204	}
   2205
   2206	do {
   2207		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
   2208			update_worker_timing_details(connection, do_unqueued_work);
   2209			do_unqueued_work(connection);
   2210		}
   2211		if (!list_empty(&work_list)) {
   2212			w = list_first_entry(&work_list, struct drbd_work, list);
   2213			list_del_init(&w->list);
   2214			update_worker_timing_details(connection, w->cb);
   2215			w->cb(w, 1);
   2216		} else
   2217			dequeue_work_batch(&connection->sender_work, &work_list);
   2218	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
   2219
   2220	rcu_read_lock();
   2221	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   2222		struct drbd_device *device = peer_device->device;
   2223		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
   2224		kref_get(&device->kref);
   2225		rcu_read_unlock();
   2226		drbd_device_cleanup(device);
   2227		kref_put(&device->kref, drbd_destroy_device);
   2228		rcu_read_lock();
   2229	}
   2230	rcu_read_unlock();
   2231
   2232	return 0;
   2233}