cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

drbd_receiver.c (180393B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3   drbd_receiver.c
      4
      5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
      6
      7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
      8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
      9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
     10
     11 */
     12
     13
     14#include <linux/module.h>
     15
     16#include <linux/uaccess.h>
     17#include <net/sock.h>
     18
     19#include <linux/drbd.h>
     20#include <linux/fs.h>
     21#include <linux/file.h>
     22#include <linux/in.h>
     23#include <linux/mm.h>
     24#include <linux/memcontrol.h>
     25#include <linux/mm_inline.h>
     26#include <linux/slab.h>
     27#include <uapi/linux/sched/types.h>
     28#include <linux/sched/signal.h>
     29#include <linux/pkt_sched.h>
     30#define __KERNEL_SYSCALLS__
     31#include <linux/unistd.h>
     32#include <linux/vmalloc.h>
     33#include <linux/random.h>
     34#include <linux/string.h>
     35#include <linux/scatterlist.h>
     36#include <linux/part_stat.h>
     37#include "drbd_int.h"
     38#include "drbd_protocol.h"
     39#include "drbd_req.h"
     40#include "drbd_vli.h"
     41
     42#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
     43
     44struct packet_info {
     45	enum drbd_packet cmd;
     46	unsigned int size;
     47	unsigned int vnr;
     48	void *data;
     49};
     50
     51enum finish_epoch {
     52	FE_STILL_LIVE,
     53	FE_DESTROYED,
     54	FE_RECYCLED,
     55};
     56
     57static int drbd_do_features(struct drbd_connection *connection);
     58static int drbd_do_auth(struct drbd_connection *connection);
     59static int drbd_disconnected(struct drbd_peer_device *);
     60static void conn_wait_active_ee_empty(struct drbd_connection *connection);
     61static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
     62static int e_end_block(struct drbd_work *, int);
     63
     64
     65#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
     66
     67/*
     68 * some helper functions to deal with single linked page lists,
     69 * page->private being our "next" pointer.
     70 */
     71
     72/* If at least n pages are linked at head, get n pages off.
     73 * Otherwise, don't modify head, and return NULL.
     74 * Locking is the responsibility of the caller.
     75 */
     76static struct page *page_chain_del(struct page **head, int n)
     77{
     78	struct page *page;
     79	struct page *tmp;
     80
     81	BUG_ON(!n);
     82	BUG_ON(!head);
     83
     84	page = *head;
     85
     86	if (!page)
     87		return NULL;
     88
     89	while (page) {
     90		tmp = page_chain_next(page);
     91		if (--n == 0)
     92			break; /* found sufficient pages */
     93		if (tmp == NULL)
     94			/* insufficient pages, don't use any of them. */
     95			return NULL;
     96		page = tmp;
     97	}
     98
     99	/* add end of list marker for the returned list */
    100	set_page_private(page, 0);
    101	/* actual return value, and adjustment of head */
    102	page = *head;
    103	*head = tmp;
    104	return page;
    105}
    106
    107/* may be used outside of locks to find the tail of a (usually short)
    108 * "private" page chain, before adding it back to a global chain head
    109 * with page_chain_add() under a spinlock. */
    110static struct page *page_chain_tail(struct page *page, int *len)
    111{
    112	struct page *tmp;
    113	int i = 1;
    114	while ((tmp = page_chain_next(page))) {
    115		++i;
    116		page = tmp;
    117	}
    118	if (len)
    119		*len = i;
    120	return page;
    121}
    122
    123static int page_chain_free(struct page *page)
    124{
    125	struct page *tmp;
    126	int i = 0;
    127	page_chain_for_each_safe(page, tmp) {
    128		put_page(page);
    129		++i;
    130	}
    131	return i;
    132}
    133
    134static void page_chain_add(struct page **head,
    135		struct page *chain_first, struct page *chain_last)
    136{
    137#if 1
    138	struct page *tmp;
    139	tmp = page_chain_tail(chain_first, NULL);
    140	BUG_ON(tmp != chain_last);
    141#endif
    142
    143	/* add chain to head */
    144	set_page_private(chain_last, (unsigned long)*head);
    145	*head = chain_first;
    146}
    147
    148static struct page *__drbd_alloc_pages(struct drbd_device *device,
    149				       unsigned int number)
    150{
    151	struct page *page = NULL;
    152	struct page *tmp = NULL;
    153	unsigned int i = 0;
    154
    155	/* Yes, testing drbd_pp_vacant outside the lock is racy.
    156	 * So what. It saves a spin_lock. */
    157	if (drbd_pp_vacant >= number) {
    158		spin_lock(&drbd_pp_lock);
    159		page = page_chain_del(&drbd_pp_pool, number);
    160		if (page)
    161			drbd_pp_vacant -= number;
    162		spin_unlock(&drbd_pp_lock);
    163		if (page)
    164			return page;
    165	}
    166
    167	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
    168	 * "criss-cross" setup, that might cause write-out on some other DRBD,
    169	 * which in turn might block on the other node at this very place.  */
    170	for (i = 0; i < number; i++) {
    171		tmp = alloc_page(GFP_TRY);
    172		if (!tmp)
    173			break;
    174		set_page_private(tmp, (unsigned long)page);
    175		page = tmp;
    176	}
    177
    178	if (i == number)
    179		return page;
    180
    181	/* Not enough pages immediately available this time.
    182	 * No need to jump around here, drbd_alloc_pages will retry this
    183	 * function "soon". */
    184	if (page) {
    185		tmp = page_chain_tail(page, NULL);
    186		spin_lock(&drbd_pp_lock);
    187		page_chain_add(&drbd_pp_pool, page, tmp);
    188		drbd_pp_vacant += i;
    189		spin_unlock(&drbd_pp_lock);
    190	}
    191	return NULL;
    192}
    193
    194static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
    195					   struct list_head *to_be_freed)
    196{
    197	struct drbd_peer_request *peer_req, *tmp;
    198
    199	/* The EEs are always appended to the end of the list. Since
    200	   they are sent in order over the wire, they have to finish
    201	   in order. As soon as we see the first not finished we can
    202	   stop to examine the list... */
    203
    204	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
    205		if (drbd_peer_req_has_active_page(peer_req))
    206			break;
    207		list_move(&peer_req->w.list, to_be_freed);
    208	}
    209}
    210
    211static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
    212{
    213	LIST_HEAD(reclaimed);
    214	struct drbd_peer_request *peer_req, *t;
    215
    216	spin_lock_irq(&device->resource->req_lock);
    217	reclaim_finished_net_peer_reqs(device, &reclaimed);
    218	spin_unlock_irq(&device->resource->req_lock);
    219	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
    220		drbd_free_net_peer_req(device, peer_req);
    221}
    222
    223static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
    224{
    225	struct drbd_peer_device *peer_device;
    226	int vnr;
    227
    228	rcu_read_lock();
    229	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
    230		struct drbd_device *device = peer_device->device;
    231		if (!atomic_read(&device->pp_in_use_by_net))
    232			continue;
    233
    234		kref_get(&device->kref);
    235		rcu_read_unlock();
    236		drbd_reclaim_net_peer_reqs(device);
    237		kref_put(&device->kref, drbd_destroy_device);
    238		rcu_read_lock();
    239	}
    240	rcu_read_unlock();
    241}
    242
    243/**
    244 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
    245 * @peer_device:	DRBD device.
    246 * @number:		number of pages requested
    247 * @retry:		whether to retry, if not enough pages are available right now
    248 *
    249 * Tries to allocate number pages, first from our own page pool, then from
    250 * the kernel.
    251 * Possibly retry until DRBD frees sufficient pages somewhere else.
    252 *
    253 * If this allocation would exceed the max_buffers setting, we throttle
    254 * allocation (schedule_timeout) to give the system some room to breathe.
    255 *
    256 * We do not use max-buffers as hard limit, because it could lead to
    257 * congestion and further to a distributed deadlock during online-verify or
    258 * (checksum based) resync, if the max-buffers, socket buffer sizes and
    259 * resync-rate settings are mis-configured.
    260 *
    261 * Returns a page chain linked via page->private.
    262 */
    263struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
    264			      bool retry)
    265{
    266	struct drbd_device *device = peer_device->device;
    267	struct page *page = NULL;
    268	struct net_conf *nc;
    269	DEFINE_WAIT(wait);
    270	unsigned int mxb;
    271
    272	rcu_read_lock();
    273	nc = rcu_dereference(peer_device->connection->net_conf);
    274	mxb = nc ? nc->max_buffers : 1000000;
    275	rcu_read_unlock();
    276
    277	if (atomic_read(&device->pp_in_use) < mxb)
    278		page = __drbd_alloc_pages(device, number);
    279
    280	/* Try to keep the fast path fast, but occasionally we need
    281	 * to reclaim the pages we lended to the network stack. */
    282	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
    283		drbd_reclaim_net_peer_reqs(device);
    284
    285	while (page == NULL) {
    286		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
    287
    288		drbd_reclaim_net_peer_reqs(device);
    289
    290		if (atomic_read(&device->pp_in_use) < mxb) {
    291			page = __drbd_alloc_pages(device, number);
    292			if (page)
    293				break;
    294		}
    295
    296		if (!retry)
    297			break;
    298
    299		if (signal_pending(current)) {
    300			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
    301			break;
    302		}
    303
    304		if (schedule_timeout(HZ/10) == 0)
    305			mxb = UINT_MAX;
    306	}
    307	finish_wait(&drbd_pp_wait, &wait);
    308
    309	if (page)
    310		atomic_add(number, &device->pp_in_use);
    311	return page;
    312}
    313
    314/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
    315 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
    316 * Either links the page chain back to the global pool,
    317 * or returns all pages to the system. */
    318static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
    319{
    320	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
    321	int i;
    322
    323	if (page == NULL)
    324		return;
    325
    326	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
    327		i = page_chain_free(page);
    328	else {
    329		struct page *tmp;
    330		tmp = page_chain_tail(page, &i);
    331		spin_lock(&drbd_pp_lock);
    332		page_chain_add(&drbd_pp_pool, page, tmp);
    333		drbd_pp_vacant += i;
    334		spin_unlock(&drbd_pp_lock);
    335	}
    336	i = atomic_sub_return(i, a);
    337	if (i < 0)
    338		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
    339			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
    340	wake_up(&drbd_pp_wait);
    341}
    342
    343/*
    344You need to hold the req_lock:
    345 _drbd_wait_ee_list_empty()
    346
    347You must not have the req_lock:
    348 drbd_free_peer_req()
    349 drbd_alloc_peer_req()
    350 drbd_free_peer_reqs()
    351 drbd_ee_fix_bhs()
    352 drbd_finish_peer_reqs()
    353 drbd_clear_done_ee()
    354 drbd_wait_ee_list_empty()
    355*/
    356
    357/* normal: payload_size == request size (bi_size)
    358 * w_same: payload_size == logical_block_size
    359 * trim: payload_size == 0 */
    360struct drbd_peer_request *
    361drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
    362		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
    363{
    364	struct drbd_device *device = peer_device->device;
    365	struct drbd_peer_request *peer_req;
    366	struct page *page = NULL;
    367	unsigned int nr_pages = PFN_UP(payload_size);
    368
    369	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
    370		return NULL;
    371
    372	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
    373	if (!peer_req) {
    374		if (!(gfp_mask & __GFP_NOWARN))
    375			drbd_err(device, "%s: allocation failed\n", __func__);
    376		return NULL;
    377	}
    378
    379	if (nr_pages) {
    380		page = drbd_alloc_pages(peer_device, nr_pages,
    381					gfpflags_allow_blocking(gfp_mask));
    382		if (!page)
    383			goto fail;
    384	}
    385
    386	memset(peer_req, 0, sizeof(*peer_req));
    387	INIT_LIST_HEAD(&peer_req->w.list);
    388	drbd_clear_interval(&peer_req->i);
    389	peer_req->i.size = request_size;
    390	peer_req->i.sector = sector;
    391	peer_req->submit_jif = jiffies;
    392	peer_req->peer_device = peer_device;
    393	peer_req->pages = page;
    394	/*
    395	 * The block_id is opaque to the receiver.  It is not endianness
    396	 * converted, and sent back to the sender unchanged.
    397	 */
    398	peer_req->block_id = id;
    399
    400	return peer_req;
    401
    402 fail:
    403	mempool_free(peer_req, &drbd_ee_mempool);
    404	return NULL;
    405}
    406
    407void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
    408		       int is_net)
    409{
    410	might_sleep();
    411	if (peer_req->flags & EE_HAS_DIGEST)
    412		kfree(peer_req->digest);
    413	drbd_free_pages(device, peer_req->pages, is_net);
    414	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
    415	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
    416	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
    417		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
    418		drbd_al_complete_io(device, &peer_req->i);
    419	}
    420	mempool_free(peer_req, &drbd_ee_mempool);
    421}
    422
    423int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
    424{
    425	LIST_HEAD(work_list);
    426	struct drbd_peer_request *peer_req, *t;
    427	int count = 0;
    428	int is_net = list == &device->net_ee;
    429
    430	spin_lock_irq(&device->resource->req_lock);
    431	list_splice_init(list, &work_list);
    432	spin_unlock_irq(&device->resource->req_lock);
    433
    434	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
    435		__drbd_free_peer_req(device, peer_req, is_net);
    436		count++;
    437	}
    438	return count;
    439}
    440
    441/*
    442 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
    443 */
    444static int drbd_finish_peer_reqs(struct drbd_device *device)
    445{
    446	LIST_HEAD(work_list);
    447	LIST_HEAD(reclaimed);
    448	struct drbd_peer_request *peer_req, *t;
    449	int err = 0;
    450
    451	spin_lock_irq(&device->resource->req_lock);
    452	reclaim_finished_net_peer_reqs(device, &reclaimed);
    453	list_splice_init(&device->done_ee, &work_list);
    454	spin_unlock_irq(&device->resource->req_lock);
    455
    456	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
    457		drbd_free_net_peer_req(device, peer_req);
    458
    459	/* possible callbacks here:
    460	 * e_end_block, and e_end_resync_block, e_send_superseded.
    461	 * all ignore the last argument.
    462	 */
    463	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
    464		int err2;
    465
    466		/* list_del not necessary, next/prev members not touched */
    467		err2 = peer_req->w.cb(&peer_req->w, !!err);
    468		if (!err)
    469			err = err2;
    470		drbd_free_peer_req(device, peer_req);
    471	}
    472	wake_up(&device->ee_wait);
    473
    474	return err;
    475}
    476
    477static void _drbd_wait_ee_list_empty(struct drbd_device *device,
    478				     struct list_head *head)
    479{
    480	DEFINE_WAIT(wait);
    481
    482	/* avoids spin_lock/unlock
    483	 * and calling prepare_to_wait in the fast path */
    484	while (!list_empty(head)) {
    485		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
    486		spin_unlock_irq(&device->resource->req_lock);
    487		io_schedule();
    488		finish_wait(&device->ee_wait, &wait);
    489		spin_lock_irq(&device->resource->req_lock);
    490	}
    491}
    492
    493static void drbd_wait_ee_list_empty(struct drbd_device *device,
    494				    struct list_head *head)
    495{
    496	spin_lock_irq(&device->resource->req_lock);
    497	_drbd_wait_ee_list_empty(device, head);
    498	spin_unlock_irq(&device->resource->req_lock);
    499}
    500
    501static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
    502{
    503	struct kvec iov = {
    504		.iov_base = buf,
    505		.iov_len = size,
    506	};
    507	struct msghdr msg = {
    508		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
    509	};
    510	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
    511	return sock_recvmsg(sock, &msg, msg.msg_flags);
    512}
    513
    514static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
    515{
    516	int rv;
    517
    518	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
    519
    520	if (rv < 0) {
    521		if (rv == -ECONNRESET)
    522			drbd_info(connection, "sock was reset by peer\n");
    523		else if (rv != -ERESTARTSYS)
    524			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
    525	} else if (rv == 0) {
    526		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
    527			long t;
    528			rcu_read_lock();
    529			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
    530			rcu_read_unlock();
    531
    532			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
    533
    534			if (t)
    535				goto out;
    536		}
    537		drbd_info(connection, "sock was shut down by peer\n");
    538	}
    539
    540	if (rv != size)
    541		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
    542
    543out:
    544	return rv;
    545}
    546
    547static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
    548{
    549	int err;
    550
    551	err = drbd_recv(connection, buf, size);
    552	if (err != size) {
    553		if (err >= 0)
    554			err = -EIO;
    555	} else
    556		err = 0;
    557	return err;
    558}
    559
    560static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
    561{
    562	int err;
    563
    564	err = drbd_recv_all(connection, buf, size);
    565	if (err && !signal_pending(current))
    566		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
    567	return err;
    568}
    569
    570/* quoting tcp(7):
    571 *   On individual connections, the socket buffer size must be set prior to the
    572 *   listen(2) or connect(2) calls in order to have it take effect.
    573 * This is our wrapper to do so.
    574 */
    575static void drbd_setbufsize(struct socket *sock, unsigned int snd,
    576		unsigned int rcv)
    577{
    578	/* open coded SO_SNDBUF, SO_RCVBUF */
    579	if (snd) {
    580		sock->sk->sk_sndbuf = snd;
    581		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
    582	}
    583	if (rcv) {
    584		sock->sk->sk_rcvbuf = rcv;
    585		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
    586	}
    587}
    588
    589static struct socket *drbd_try_connect(struct drbd_connection *connection)
    590{
    591	const char *what;
    592	struct socket *sock;
    593	struct sockaddr_in6 src_in6;
    594	struct sockaddr_in6 peer_in6;
    595	struct net_conf *nc;
    596	int err, peer_addr_len, my_addr_len;
    597	int sndbuf_size, rcvbuf_size, connect_int;
    598	int disconnect_on_error = 1;
    599
    600	rcu_read_lock();
    601	nc = rcu_dereference(connection->net_conf);
    602	if (!nc) {
    603		rcu_read_unlock();
    604		return NULL;
    605	}
    606	sndbuf_size = nc->sndbuf_size;
    607	rcvbuf_size = nc->rcvbuf_size;
    608	connect_int = nc->connect_int;
    609	rcu_read_unlock();
    610
    611	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
    612	memcpy(&src_in6, &connection->my_addr, my_addr_len);
    613
    614	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
    615		src_in6.sin6_port = 0;
    616	else
    617		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
    618
    619	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
    620	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
    621
    622	what = "sock_create_kern";
    623	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
    624			       SOCK_STREAM, IPPROTO_TCP, &sock);
    625	if (err < 0) {
    626		sock = NULL;
    627		goto out;
    628	}
    629
    630	sock->sk->sk_rcvtimeo =
    631	sock->sk->sk_sndtimeo = connect_int * HZ;
    632	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
    633
    634       /* explicitly bind to the configured IP as source IP
    635	*  for the outgoing connections.
    636	*  This is needed for multihomed hosts and to be
    637	*  able to use lo: interfaces for drbd.
    638	* Make sure to use 0 as port number, so linux selects
    639	*  a free one dynamically.
    640	*/
    641	what = "bind before connect";
    642	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
    643	if (err < 0)
    644		goto out;
    645
    646	/* connect may fail, peer not yet available.
    647	 * stay C_WF_CONNECTION, don't go Disconnecting! */
    648	disconnect_on_error = 0;
    649	what = "connect";
    650	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
    651
    652out:
    653	if (err < 0) {
    654		if (sock) {
    655			sock_release(sock);
    656			sock = NULL;
    657		}
    658		switch (-err) {
    659			/* timeout, busy, signal pending */
    660		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
    661		case EINTR: case ERESTARTSYS:
    662			/* peer not (yet) available, network problem */
    663		case ECONNREFUSED: case ENETUNREACH:
    664		case EHOSTDOWN:    case EHOSTUNREACH:
    665			disconnect_on_error = 0;
    666			break;
    667		default:
    668			drbd_err(connection, "%s failed, err = %d\n", what, err);
    669		}
    670		if (disconnect_on_error)
    671			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
    672	}
    673
    674	return sock;
    675}
    676
    677struct accept_wait_data {
    678	struct drbd_connection *connection;
    679	struct socket *s_listen;
    680	struct completion door_bell;
    681	void (*original_sk_state_change)(struct sock *sk);
    682
    683};
    684
    685static void drbd_incoming_connection(struct sock *sk)
    686{
    687	struct accept_wait_data *ad = sk->sk_user_data;
    688	void (*state_change)(struct sock *sk);
    689
    690	state_change = ad->original_sk_state_change;
    691	if (sk->sk_state == TCP_ESTABLISHED)
    692		complete(&ad->door_bell);
    693	state_change(sk);
    694}
    695
    696static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
    697{
    698	int err, sndbuf_size, rcvbuf_size, my_addr_len;
    699	struct sockaddr_in6 my_addr;
    700	struct socket *s_listen;
    701	struct net_conf *nc;
    702	const char *what;
    703
    704	rcu_read_lock();
    705	nc = rcu_dereference(connection->net_conf);
    706	if (!nc) {
    707		rcu_read_unlock();
    708		return -EIO;
    709	}
    710	sndbuf_size = nc->sndbuf_size;
    711	rcvbuf_size = nc->rcvbuf_size;
    712	rcu_read_unlock();
    713
    714	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
    715	memcpy(&my_addr, &connection->my_addr, my_addr_len);
    716
    717	what = "sock_create_kern";
    718	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
    719			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
    720	if (err) {
    721		s_listen = NULL;
    722		goto out;
    723	}
    724
    725	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
    726	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
    727
    728	what = "bind before listen";
    729	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
    730	if (err < 0)
    731		goto out;
    732
    733	ad->s_listen = s_listen;
    734	write_lock_bh(&s_listen->sk->sk_callback_lock);
    735	ad->original_sk_state_change = s_listen->sk->sk_state_change;
    736	s_listen->sk->sk_state_change = drbd_incoming_connection;
    737	s_listen->sk->sk_user_data = ad;
    738	write_unlock_bh(&s_listen->sk->sk_callback_lock);
    739
    740	what = "listen";
    741	err = s_listen->ops->listen(s_listen, 5);
    742	if (err < 0)
    743		goto out;
    744
    745	return 0;
    746out:
    747	if (s_listen)
    748		sock_release(s_listen);
    749	if (err < 0) {
    750		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
    751			drbd_err(connection, "%s failed, err = %d\n", what, err);
    752			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
    753		}
    754	}
    755
    756	return -EIO;
    757}
    758
    759static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
    760{
    761	write_lock_bh(&sk->sk_callback_lock);
    762	sk->sk_state_change = ad->original_sk_state_change;
    763	sk->sk_user_data = NULL;
    764	write_unlock_bh(&sk->sk_callback_lock);
    765}
    766
    767static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
    768{
    769	int timeo, connect_int, err = 0;
    770	struct socket *s_estab = NULL;
    771	struct net_conf *nc;
    772
    773	rcu_read_lock();
    774	nc = rcu_dereference(connection->net_conf);
    775	if (!nc) {
    776		rcu_read_unlock();
    777		return NULL;
    778	}
    779	connect_int = nc->connect_int;
    780	rcu_read_unlock();
    781
    782	timeo = connect_int * HZ;
    783	/* 28.5% random jitter */
    784	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
    785
    786	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
    787	if (err <= 0)
    788		return NULL;
    789
    790	err = kernel_accept(ad->s_listen, &s_estab, 0);
    791	if (err < 0) {
    792		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
    793			drbd_err(connection, "accept failed, err = %d\n", err);
    794			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
    795		}
    796	}
    797
    798	if (s_estab)
    799		unregister_state_change(s_estab->sk, ad);
    800
    801	return s_estab;
    802}
    803
    804static int decode_header(struct drbd_connection *, void *, struct packet_info *);
    805
    806static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
    807			     enum drbd_packet cmd)
    808{
    809	if (!conn_prepare_command(connection, sock))
    810		return -EIO;
    811	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
    812}
    813
    814static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
    815{
    816	unsigned int header_size = drbd_header_size(connection);
    817	struct packet_info pi;
    818	struct net_conf *nc;
    819	int err;
    820
    821	rcu_read_lock();
    822	nc = rcu_dereference(connection->net_conf);
    823	if (!nc) {
    824		rcu_read_unlock();
    825		return -EIO;
    826	}
    827	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
    828	rcu_read_unlock();
    829
    830	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
    831	if (err != header_size) {
    832		if (err >= 0)
    833			err = -EIO;
    834		return err;
    835	}
    836	err = decode_header(connection, connection->data.rbuf, &pi);
    837	if (err)
    838		return err;
    839	return pi.cmd;
    840}
    841
    842/**
    843 * drbd_socket_okay() - Free the socket if its connection is not okay
    844 * @sock:	pointer to the pointer to the socket.
    845 */
    846static bool drbd_socket_okay(struct socket **sock)
    847{
    848	int rr;
    849	char tb[4];
    850
    851	if (!*sock)
    852		return false;
    853
    854	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
    855
    856	if (rr > 0 || rr == -EAGAIN) {
    857		return true;
    858	} else {
    859		sock_release(*sock);
    860		*sock = NULL;
    861		return false;
    862	}
    863}
    864
    865static bool connection_established(struct drbd_connection *connection,
    866				   struct socket **sock1,
    867				   struct socket **sock2)
    868{
    869	struct net_conf *nc;
    870	int timeout;
    871	bool ok;
    872
    873	if (!*sock1 || !*sock2)
    874		return false;
    875
    876	rcu_read_lock();
    877	nc = rcu_dereference(connection->net_conf);
    878	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
    879	rcu_read_unlock();
    880	schedule_timeout_interruptible(timeout);
    881
    882	ok = drbd_socket_okay(sock1);
    883	ok = drbd_socket_okay(sock2) && ok;
    884
    885	return ok;
    886}
    887
    888/* Gets called if a connection is established, or if a new minor gets created
    889   in a connection */
    890int drbd_connected(struct drbd_peer_device *peer_device)
    891{
    892	struct drbd_device *device = peer_device->device;
    893	int err;
    894
    895	atomic_set(&device->packet_seq, 0);
    896	device->peer_seq = 0;
    897
    898	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
    899		&peer_device->connection->cstate_mutex :
    900		&device->own_state_mutex;
    901
    902	err = drbd_send_sync_param(peer_device);
    903	if (!err)
    904		err = drbd_send_sizes(peer_device, 0, 0);
    905	if (!err)
    906		err = drbd_send_uuids(peer_device);
    907	if (!err)
    908		err = drbd_send_current_state(peer_device);
    909	clear_bit(USE_DEGR_WFC_T, &device->flags);
    910	clear_bit(RESIZE_PENDING, &device->flags);
    911	atomic_set(&device->ap_in_flight, 0);
    912	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
    913	return err;
    914}
    915
    916/*
    917 * return values:
    918 *   1 yes, we have a valid connection
    919 *   0 oops, did not work out, please try again
    920 *  -1 peer talks different language,
    921 *     no point in trying again, please go standalone.
    922 *  -2 We do not have a network config...
    923 */
    924static int conn_connect(struct drbd_connection *connection)
    925{
    926	struct drbd_socket sock, msock;
    927	struct drbd_peer_device *peer_device;
    928	struct net_conf *nc;
    929	int vnr, timeout, h;
    930	bool discard_my_data, ok;
    931	enum drbd_state_rv rv;
    932	struct accept_wait_data ad = {
    933		.connection = connection,
    934		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
    935	};
    936
    937	clear_bit(DISCONNECT_SENT, &connection->flags);
    938	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
    939		return -2;
    940
    941	mutex_init(&sock.mutex);
    942	sock.sbuf = connection->data.sbuf;
    943	sock.rbuf = connection->data.rbuf;
    944	sock.socket = NULL;
    945	mutex_init(&msock.mutex);
    946	msock.sbuf = connection->meta.sbuf;
    947	msock.rbuf = connection->meta.rbuf;
    948	msock.socket = NULL;
    949
    950	/* Assume that the peer only understands protocol 80 until we know better.  */
    951	connection->agreed_pro_version = 80;
    952
    953	if (prepare_listen_socket(connection, &ad))
    954		return 0;
    955
    956	do {
    957		struct socket *s;
    958
    959		s = drbd_try_connect(connection);
    960		if (s) {
    961			if (!sock.socket) {
    962				sock.socket = s;
    963				send_first_packet(connection, &sock, P_INITIAL_DATA);
    964			} else if (!msock.socket) {
    965				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
    966				msock.socket = s;
    967				send_first_packet(connection, &msock, P_INITIAL_META);
    968			} else {
    969				drbd_err(connection, "Logic error in conn_connect()\n");
    970				goto out_release_sockets;
    971			}
    972		}
    973
    974		if (connection_established(connection, &sock.socket, &msock.socket))
    975			break;
    976
    977retry:
    978		s = drbd_wait_for_connect(connection, &ad);
    979		if (s) {
    980			int fp = receive_first_packet(connection, s);
    981			drbd_socket_okay(&sock.socket);
    982			drbd_socket_okay(&msock.socket);
    983			switch (fp) {
    984			case P_INITIAL_DATA:
    985				if (sock.socket) {
    986					drbd_warn(connection, "initial packet S crossed\n");
    987					sock_release(sock.socket);
    988					sock.socket = s;
    989					goto randomize;
    990				}
    991				sock.socket = s;
    992				break;
    993			case P_INITIAL_META:
    994				set_bit(RESOLVE_CONFLICTS, &connection->flags);
    995				if (msock.socket) {
    996					drbd_warn(connection, "initial packet M crossed\n");
    997					sock_release(msock.socket);
    998					msock.socket = s;
    999					goto randomize;
   1000				}
   1001				msock.socket = s;
   1002				break;
   1003			default:
   1004				drbd_warn(connection, "Error receiving initial packet\n");
   1005				sock_release(s);
   1006randomize:
   1007				if (prandom_u32() & 1)
   1008					goto retry;
   1009			}
   1010		}
   1011
   1012		if (connection->cstate <= C_DISCONNECTING)
   1013			goto out_release_sockets;
   1014		if (signal_pending(current)) {
   1015			flush_signals(current);
   1016			smp_rmb();
   1017			if (get_t_state(&connection->receiver) == EXITING)
   1018				goto out_release_sockets;
   1019		}
   1020
   1021		ok = connection_established(connection, &sock.socket, &msock.socket);
   1022	} while (!ok);
   1023
   1024	if (ad.s_listen)
   1025		sock_release(ad.s_listen);
   1026
   1027	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
   1028	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
   1029
   1030	sock.socket->sk->sk_allocation = GFP_NOIO;
   1031	msock.socket->sk->sk_allocation = GFP_NOIO;
   1032
   1033	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
   1034	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
   1035
   1036	/* NOT YET ...
   1037	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
   1038	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
   1039	 * first set it to the P_CONNECTION_FEATURES timeout,
   1040	 * which we set to 4x the configured ping_timeout. */
   1041	rcu_read_lock();
   1042	nc = rcu_dereference(connection->net_conf);
   1043
   1044	sock.socket->sk->sk_sndtimeo =
   1045	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
   1046
   1047	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
   1048	timeout = nc->timeout * HZ / 10;
   1049	discard_my_data = nc->discard_my_data;
   1050	rcu_read_unlock();
   1051
   1052	msock.socket->sk->sk_sndtimeo = timeout;
   1053
   1054	/* we don't want delays.
   1055	 * we use TCP_CORK where appropriate, though */
   1056	tcp_sock_set_nodelay(sock.socket->sk);
   1057	tcp_sock_set_nodelay(msock.socket->sk);
   1058
   1059	connection->data.socket = sock.socket;
   1060	connection->meta.socket = msock.socket;
   1061	connection->last_received = jiffies;
   1062
   1063	h = drbd_do_features(connection);
   1064	if (h <= 0)
   1065		return h;
   1066
   1067	if (connection->cram_hmac_tfm) {
   1068		/* drbd_request_state(device, NS(conn, WFAuth)); */
   1069		switch (drbd_do_auth(connection)) {
   1070		case -1:
   1071			drbd_err(connection, "Authentication of peer failed\n");
   1072			return -1;
   1073		case 0:
   1074			drbd_err(connection, "Authentication of peer failed, trying again.\n");
   1075			return 0;
   1076		}
   1077	}
   1078
   1079	connection->data.socket->sk->sk_sndtimeo = timeout;
   1080	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
   1081
   1082	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
   1083		return -1;
   1084
   1085	/* Prevent a race between resync-handshake and
   1086	 * being promoted to Primary.
   1087	 *
   1088	 * Grab and release the state mutex, so we know that any current
   1089	 * drbd_set_role() is finished, and any incoming drbd_set_role
   1090	 * will see the STATE_SENT flag, and wait for it to be cleared.
   1091	 */
   1092	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
   1093		mutex_lock(peer_device->device->state_mutex);
   1094
   1095	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
   1096	spin_lock_irq(&connection->resource->req_lock);
   1097	set_bit(STATE_SENT, &connection->flags);
   1098	spin_unlock_irq(&connection->resource->req_lock);
   1099
   1100	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
   1101		mutex_unlock(peer_device->device->state_mutex);
   1102
   1103	rcu_read_lock();
   1104	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   1105		struct drbd_device *device = peer_device->device;
   1106		kref_get(&device->kref);
   1107		rcu_read_unlock();
   1108
   1109		if (discard_my_data)
   1110			set_bit(DISCARD_MY_DATA, &device->flags);
   1111		else
   1112			clear_bit(DISCARD_MY_DATA, &device->flags);
   1113
   1114		drbd_connected(peer_device);
   1115		kref_put(&device->kref, drbd_destroy_device);
   1116		rcu_read_lock();
   1117	}
   1118	rcu_read_unlock();
   1119
   1120	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
   1121	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
   1122		clear_bit(STATE_SENT, &connection->flags);
   1123		return 0;
   1124	}
   1125
   1126	drbd_thread_start(&connection->ack_receiver);
   1127	/* opencoded create_singlethread_workqueue(),
   1128	 * to be able to use format string arguments */
   1129	connection->ack_sender =
   1130		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
   1131	if (!connection->ack_sender) {
   1132		drbd_err(connection, "Failed to create workqueue ack_sender\n");
   1133		return 0;
   1134	}
   1135
   1136	mutex_lock(&connection->resource->conf_update);
   1137	/* The discard_my_data flag is a single-shot modifier to the next
   1138	 * connection attempt, the handshake of which is now well underway.
   1139	 * No need for rcu style copying of the whole struct
   1140	 * just to clear a single value. */
   1141	connection->net_conf->discard_my_data = 0;
   1142	mutex_unlock(&connection->resource->conf_update);
   1143
   1144	return h;
   1145
   1146out_release_sockets:
   1147	if (ad.s_listen)
   1148		sock_release(ad.s_listen);
   1149	if (sock.socket)
   1150		sock_release(sock.socket);
   1151	if (msock.socket)
   1152		sock_release(msock.socket);
   1153	return -1;
   1154}
   1155
   1156static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
   1157{
   1158	unsigned int header_size = drbd_header_size(connection);
   1159
   1160	if (header_size == sizeof(struct p_header100) &&
   1161	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
   1162		struct p_header100 *h = header;
   1163		if (h->pad != 0) {
   1164			drbd_err(connection, "Header padding is not zero\n");
   1165			return -EINVAL;
   1166		}
   1167		pi->vnr = be16_to_cpu(h->volume);
   1168		pi->cmd = be16_to_cpu(h->command);
   1169		pi->size = be32_to_cpu(h->length);
   1170	} else if (header_size == sizeof(struct p_header95) &&
   1171		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
   1172		struct p_header95 *h = header;
   1173		pi->cmd = be16_to_cpu(h->command);
   1174		pi->size = be32_to_cpu(h->length);
   1175		pi->vnr = 0;
   1176	} else if (header_size == sizeof(struct p_header80) &&
   1177		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
   1178		struct p_header80 *h = header;
   1179		pi->cmd = be16_to_cpu(h->command);
   1180		pi->size = be16_to_cpu(h->length);
   1181		pi->vnr = 0;
   1182	} else {
   1183		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
   1184			 be32_to_cpu(*(__be32 *)header),
   1185			 connection->agreed_pro_version);
   1186		return -EINVAL;
   1187	}
   1188	pi->data = header + header_size;
   1189	return 0;
   1190}
   1191
   1192static void drbd_unplug_all_devices(struct drbd_connection *connection)
   1193{
   1194	if (current->plug == &connection->receiver_plug) {
   1195		blk_finish_plug(&connection->receiver_plug);
   1196		blk_start_plug(&connection->receiver_plug);
   1197	} /* else: maybe just schedule() ?? */
   1198}
   1199
   1200static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
   1201{
   1202	void *buffer = connection->data.rbuf;
   1203	int err;
   1204
   1205	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
   1206	if (err)
   1207		return err;
   1208
   1209	err = decode_header(connection, buffer, pi);
   1210	connection->last_received = jiffies;
   1211
   1212	return err;
   1213}
   1214
   1215static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
   1216{
   1217	void *buffer = connection->data.rbuf;
   1218	unsigned int size = drbd_header_size(connection);
   1219	int err;
   1220
   1221	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
   1222	if (err != size) {
   1223		/* If we have nothing in the receive buffer now, to reduce
   1224		 * application latency, try to drain the backend queues as
   1225		 * quickly as possible, and let remote TCP know what we have
   1226		 * received so far. */
   1227		if (err == -EAGAIN) {
   1228			tcp_sock_set_quickack(connection->data.socket->sk, 2);
   1229			drbd_unplug_all_devices(connection);
   1230		}
   1231		if (err > 0) {
   1232			buffer += err;
   1233			size -= err;
   1234		}
   1235		err = drbd_recv_all_warn(connection, buffer, size);
   1236		if (err)
   1237			return err;
   1238	}
   1239
   1240	err = decode_header(connection, connection->data.rbuf, pi);
   1241	connection->last_received = jiffies;
   1242
   1243	return err;
   1244}
   1245/* This is blkdev_issue_flush, but asynchronous.
   1246 * We want to submit to all component volumes in parallel,
   1247 * then wait for all completions.
   1248 */
   1249struct issue_flush_context {
   1250	atomic_t pending;
   1251	int error;
   1252	struct completion done;
   1253};
   1254struct one_flush_context {
   1255	struct drbd_device *device;
   1256	struct issue_flush_context *ctx;
   1257};
   1258
   1259static void one_flush_endio(struct bio *bio)
   1260{
   1261	struct one_flush_context *octx = bio->bi_private;
   1262	struct drbd_device *device = octx->device;
   1263	struct issue_flush_context *ctx = octx->ctx;
   1264
   1265	if (bio->bi_status) {
   1266		ctx->error = blk_status_to_errno(bio->bi_status);
   1267		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
   1268	}
   1269	kfree(octx);
   1270	bio_put(bio);
   1271
   1272	clear_bit(FLUSH_PENDING, &device->flags);
   1273	put_ldev(device);
   1274	kref_put(&device->kref, drbd_destroy_device);
   1275
   1276	if (atomic_dec_and_test(&ctx->pending))
   1277		complete(&ctx->done);
   1278}
   1279
   1280static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
   1281{
   1282	struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
   1283				    REQ_OP_FLUSH | REQ_PREFLUSH, GFP_NOIO);
   1284	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
   1285
   1286	if (!octx) {
   1287		drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
   1288		/* FIXME: what else can I do now?  disconnecting or detaching
   1289		 * really does not help to improve the state of the world, either.
   1290		 */
   1291		bio_put(bio);
   1292
   1293		ctx->error = -ENOMEM;
   1294		put_ldev(device);
   1295		kref_put(&device->kref, drbd_destroy_device);
   1296		return;
   1297	}
   1298
   1299	octx->device = device;
   1300	octx->ctx = ctx;
   1301	bio->bi_private = octx;
   1302	bio->bi_end_io = one_flush_endio;
   1303
   1304	device->flush_jif = jiffies;
   1305	set_bit(FLUSH_PENDING, &device->flags);
   1306	atomic_inc(&ctx->pending);
   1307	submit_bio(bio);
   1308}
   1309
   1310static void drbd_flush(struct drbd_connection *connection)
   1311{
   1312	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
   1313		struct drbd_peer_device *peer_device;
   1314		struct issue_flush_context ctx;
   1315		int vnr;
   1316
   1317		atomic_set(&ctx.pending, 1);
   1318		ctx.error = 0;
   1319		init_completion(&ctx.done);
   1320
   1321		rcu_read_lock();
   1322		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   1323			struct drbd_device *device = peer_device->device;
   1324
   1325			if (!get_ldev(device))
   1326				continue;
   1327			kref_get(&device->kref);
   1328			rcu_read_unlock();
   1329
   1330			submit_one_flush(device, &ctx);
   1331
   1332			rcu_read_lock();
   1333		}
   1334		rcu_read_unlock();
   1335
   1336		/* Do we want to add a timeout,
   1337		 * if disk-timeout is set? */
   1338		if (!atomic_dec_and_test(&ctx.pending))
   1339			wait_for_completion(&ctx.done);
   1340
   1341		if (ctx.error) {
   1342			/* would rather check on EOPNOTSUPP, but that is not reliable.
   1343			 * don't try again for ANY return value != 0
   1344			 * if (rv == -EOPNOTSUPP) */
   1345			/* Any error is already reported by bio_endio callback. */
   1346			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
   1347		}
   1348	}
   1349}
   1350
   1351/**
   1352 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
   1353 * @connection:	DRBD connection.
   1354 * @epoch:	Epoch object.
   1355 * @ev:		Epoch event.
   1356 */
   1357static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
   1358					       struct drbd_epoch *epoch,
   1359					       enum epoch_event ev)
   1360{
   1361	int epoch_size;
   1362	struct drbd_epoch *next_epoch;
   1363	enum finish_epoch rv = FE_STILL_LIVE;
   1364
   1365	spin_lock(&connection->epoch_lock);
   1366	do {
   1367		next_epoch = NULL;
   1368
   1369		epoch_size = atomic_read(&epoch->epoch_size);
   1370
   1371		switch (ev & ~EV_CLEANUP) {
   1372		case EV_PUT:
   1373			atomic_dec(&epoch->active);
   1374			break;
   1375		case EV_GOT_BARRIER_NR:
   1376			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
   1377			break;
   1378		case EV_BECAME_LAST:
   1379			/* nothing to do*/
   1380			break;
   1381		}
   1382
   1383		if (epoch_size != 0 &&
   1384		    atomic_read(&epoch->active) == 0 &&
   1385		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
   1386			if (!(ev & EV_CLEANUP)) {
   1387				spin_unlock(&connection->epoch_lock);
   1388				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
   1389				spin_lock(&connection->epoch_lock);
   1390			}
   1391#if 0
   1392			/* FIXME: dec unacked on connection, once we have
   1393			 * something to count pending connection packets in. */
   1394			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
   1395				dec_unacked(epoch->connection);
   1396#endif
   1397
   1398			if (connection->current_epoch != epoch) {
   1399				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
   1400				list_del(&epoch->list);
   1401				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
   1402				connection->epochs--;
   1403				kfree(epoch);
   1404
   1405				if (rv == FE_STILL_LIVE)
   1406					rv = FE_DESTROYED;
   1407			} else {
   1408				epoch->flags = 0;
   1409				atomic_set(&epoch->epoch_size, 0);
   1410				/* atomic_set(&epoch->active, 0); is already zero */
   1411				if (rv == FE_STILL_LIVE)
   1412					rv = FE_RECYCLED;
   1413			}
   1414		}
   1415
   1416		if (!next_epoch)
   1417			break;
   1418
   1419		epoch = next_epoch;
   1420	} while (1);
   1421
   1422	spin_unlock(&connection->epoch_lock);
   1423
   1424	return rv;
   1425}
   1426
   1427static enum write_ordering_e
   1428max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
   1429{
   1430	struct disk_conf *dc;
   1431
   1432	dc = rcu_dereference(bdev->disk_conf);
   1433
   1434	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
   1435		wo = WO_DRAIN_IO;
   1436	if (wo == WO_DRAIN_IO && !dc->disk_drain)
   1437		wo = WO_NONE;
   1438
   1439	return wo;
   1440}
   1441
   1442/*
   1443 * drbd_bump_write_ordering() - Fall back to an other write ordering method
   1444 * @wo:		Write ordering method to try.
   1445 */
   1446void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
   1447			      enum write_ordering_e wo)
   1448{
   1449	struct drbd_device *device;
   1450	enum write_ordering_e pwo;
   1451	int vnr;
   1452	static char *write_ordering_str[] = {
   1453		[WO_NONE] = "none",
   1454		[WO_DRAIN_IO] = "drain",
   1455		[WO_BDEV_FLUSH] = "flush",
   1456	};
   1457
   1458	pwo = resource->write_ordering;
   1459	if (wo != WO_BDEV_FLUSH)
   1460		wo = min(pwo, wo);
   1461	rcu_read_lock();
   1462	idr_for_each_entry(&resource->devices, device, vnr) {
   1463		if (get_ldev(device)) {
   1464			wo = max_allowed_wo(device->ldev, wo);
   1465			if (device->ldev == bdev)
   1466				bdev = NULL;
   1467			put_ldev(device);
   1468		}
   1469	}
   1470
   1471	if (bdev)
   1472		wo = max_allowed_wo(bdev, wo);
   1473
   1474	rcu_read_unlock();
   1475
   1476	resource->write_ordering = wo;
   1477	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
   1478		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
   1479}
   1480
   1481/*
   1482 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
   1483 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
   1484 * will directly go to fallback mode, submitting normal writes, and
   1485 * never even try to UNMAP.
   1486 *
   1487 * And dm-thin does not do this (yet), mostly because in general it has
   1488 * to assume that "skip_block_zeroing" is set.  See also:
   1489 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
   1490 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
   1491 *
   1492 * We *may* ignore the discard-zeroes-data setting, if so configured.
   1493 *
   1494 * Assumption is that this "discard_zeroes_data=0" is only because the backend
   1495 * may ignore partial unaligned discards.
   1496 *
   1497 * LVM/DM thin as of at least
   1498 *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
   1499 *   Library version: 1.02.93-RHEL7 (2015-01-28)
   1500 *   Driver version:  4.29.0
   1501 * still behaves this way.
   1502 *
   1503 * For unaligned (wrt. alignment and granularity) or too small discards,
   1504 * we zero-out the initial (and/or) trailing unaligned partial chunks,
   1505 * but discard all the aligned full chunks.
   1506 *
   1507 * At least for LVM/DM thin, with skip_block_zeroing=false,
   1508 * the result is effectively "discard_zeroes_data=1".
   1509 */
   1510/* flags: EE_TRIM|EE_ZEROOUT */
   1511int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
   1512{
   1513	struct block_device *bdev = device->ldev->backing_bdev;
   1514	sector_t tmp, nr;
   1515	unsigned int max_discard_sectors, granularity;
   1516	int alignment;
   1517	int err = 0;
   1518
   1519	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
   1520		goto zero_out;
   1521
   1522	/* Zero-sector (unknown) and one-sector granularities are the same.  */
   1523	granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
   1524	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
   1525
   1526	max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
   1527	max_discard_sectors -= max_discard_sectors % granularity;
   1528	if (unlikely(!max_discard_sectors))
   1529		goto zero_out;
   1530
   1531	if (nr_sectors < granularity)
   1532		goto zero_out;
   1533
   1534	tmp = start;
   1535	if (sector_div(tmp, granularity) != alignment) {
   1536		if (nr_sectors < 2*granularity)
   1537			goto zero_out;
   1538		/* start + gran - (start + gran - align) % gran */
   1539		tmp = start + granularity - alignment;
   1540		tmp = start + granularity - sector_div(tmp, granularity);
   1541
   1542		nr = tmp - start;
   1543		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
   1544		 * layers are below us, some may have smaller granularity */
   1545		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
   1546		nr_sectors -= nr;
   1547		start = tmp;
   1548	}
   1549	while (nr_sectors >= max_discard_sectors) {
   1550		err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
   1551					    GFP_NOIO);
   1552		nr_sectors -= max_discard_sectors;
   1553		start += max_discard_sectors;
   1554	}
   1555	if (nr_sectors) {
   1556		/* max_discard_sectors is unsigned int (and a multiple of
   1557		 * granularity, we made sure of that above already);
   1558		 * nr is < max_discard_sectors;
   1559		 * I don't need sector_div here, even though nr is sector_t */
   1560		nr = nr_sectors;
   1561		nr -= (unsigned int)nr % granularity;
   1562		if (nr) {
   1563			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
   1564			nr_sectors -= nr;
   1565			start += nr;
   1566		}
   1567	}
   1568 zero_out:
   1569	if (nr_sectors) {
   1570		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
   1571				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
   1572	}
   1573	return err != 0;
   1574}
   1575
   1576static bool can_do_reliable_discards(struct drbd_device *device)
   1577{
   1578	struct disk_conf *dc;
   1579	bool can_do;
   1580
   1581	if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
   1582		return false;
   1583
   1584	rcu_read_lock();
   1585	dc = rcu_dereference(device->ldev->disk_conf);
   1586	can_do = dc->discard_zeroes_if_aligned;
   1587	rcu_read_unlock();
   1588	return can_do;
   1589}
   1590
   1591static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
   1592{
   1593	/* If the backend cannot discard, or does not guarantee
   1594	 * read-back zeroes in discarded ranges, we fall back to
   1595	 * zero-out.  Unless configuration specifically requested
   1596	 * otherwise. */
   1597	if (!can_do_reliable_discards(device))
   1598		peer_req->flags |= EE_ZEROOUT;
   1599
   1600	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
   1601	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
   1602		peer_req->flags |= EE_WAS_ERROR;
   1603	drbd_endio_write_sec_final(peer_req);
   1604}
   1605
   1606/**
   1607 * drbd_submit_peer_request()
   1608 * @device:	DRBD device.
   1609 * @peer_req:	peer request
   1610 *
   1611 * May spread the pages to multiple bios,
   1612 * depending on bio_add_page restrictions.
   1613 *
   1614 * Returns 0 if all bios have been submitted,
   1615 * -ENOMEM if we could not allocate enough bios,
   1616 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
   1617 *  single page to an empty bio (which should never happen and likely indicates
   1618 *  that the lower level IO stack is in some way broken). This has been observed
   1619 *  on certain Xen deployments.
   1620 */
   1621/* TODO allocate from our own bio_set. */
   1622int drbd_submit_peer_request(struct drbd_device *device,
   1623			     struct drbd_peer_request *peer_req,
   1624			     const unsigned op, const unsigned op_flags,
   1625			     const int fault_type)
   1626{
   1627	struct bio *bios = NULL;
   1628	struct bio *bio;
   1629	struct page *page = peer_req->pages;
   1630	sector_t sector = peer_req->i.sector;
   1631	unsigned int data_size = peer_req->i.size;
   1632	unsigned int n_bios = 0;
   1633	unsigned int nr_pages = PFN_UP(data_size);
   1634
   1635	/* TRIM/DISCARD: for now, always use the helper function
   1636	 * blkdev_issue_zeroout(..., discard=true).
   1637	 * It's synchronous, but it does the right thing wrt. bio splitting.
   1638	 * Correctness first, performance later.  Next step is to code an
   1639	 * asynchronous variant of the same.
   1640	 */
   1641	if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
   1642		/* wait for all pending IO completions, before we start
   1643		 * zeroing things out. */
   1644		conn_wait_active_ee_empty(peer_req->peer_device->connection);
   1645		/* add it to the active list now,
   1646		 * so we can find it to present it in debugfs */
   1647		peer_req->submit_jif = jiffies;
   1648		peer_req->flags |= EE_SUBMITTED;
   1649
   1650		/* If this was a resync request from receive_rs_deallocated(),
   1651		 * it is already on the sync_ee list */
   1652		if (list_empty(&peer_req->w.list)) {
   1653			spin_lock_irq(&device->resource->req_lock);
   1654			list_add_tail(&peer_req->w.list, &device->active_ee);
   1655			spin_unlock_irq(&device->resource->req_lock);
   1656		}
   1657
   1658		drbd_issue_peer_discard_or_zero_out(device, peer_req);
   1659		return 0;
   1660	}
   1661
   1662	/* In most cases, we will only need one bio.  But in case the lower
   1663	 * level restrictions happen to be different at this offset on this
   1664	 * side than those of the sending peer, we may need to submit the
   1665	 * request in more than one bio.
   1666	 *
   1667	 * Plain bio_alloc is good enough here, this is no DRBD internally
   1668	 * generated bio, but a bio allocated on behalf of the peer.
   1669	 */
   1670next_bio:
   1671	bio = bio_alloc(device->ldev->backing_bdev, nr_pages, op | op_flags,
   1672			GFP_NOIO);
   1673	/* > peer_req->i.sector, unless this is the first bio */
   1674	bio->bi_iter.bi_sector = sector;
   1675	bio->bi_private = peer_req;
   1676	bio->bi_end_io = drbd_peer_request_endio;
   1677
   1678	bio->bi_next = bios;
   1679	bios = bio;
   1680	++n_bios;
   1681
   1682	page_chain_for_each(page) {
   1683		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
   1684		if (!bio_add_page(bio, page, len, 0))
   1685			goto next_bio;
   1686		data_size -= len;
   1687		sector += len >> 9;
   1688		--nr_pages;
   1689	}
   1690	D_ASSERT(device, data_size == 0);
   1691	D_ASSERT(device, page == NULL);
   1692
   1693	atomic_set(&peer_req->pending_bios, n_bios);
   1694	/* for debugfs: update timestamp, mark as submitted */
   1695	peer_req->submit_jif = jiffies;
   1696	peer_req->flags |= EE_SUBMITTED;
   1697	do {
   1698		bio = bios;
   1699		bios = bios->bi_next;
   1700		bio->bi_next = NULL;
   1701
   1702		drbd_submit_bio_noacct(device, fault_type, bio);
   1703	} while (bios);
   1704	return 0;
   1705}
   1706
   1707static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
   1708					     struct drbd_peer_request *peer_req)
   1709{
   1710	struct drbd_interval *i = &peer_req->i;
   1711
   1712	drbd_remove_interval(&device->write_requests, i);
   1713	drbd_clear_interval(i);
   1714
   1715	/* Wake up any processes waiting for this peer request to complete.  */
   1716	if (i->waiting)
   1717		wake_up(&device->misc_wait);
   1718}
   1719
   1720static void conn_wait_active_ee_empty(struct drbd_connection *connection)
   1721{
   1722	struct drbd_peer_device *peer_device;
   1723	int vnr;
   1724
   1725	rcu_read_lock();
   1726	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   1727		struct drbd_device *device = peer_device->device;
   1728
   1729		kref_get(&device->kref);
   1730		rcu_read_unlock();
   1731		drbd_wait_ee_list_empty(device, &device->active_ee);
   1732		kref_put(&device->kref, drbd_destroy_device);
   1733		rcu_read_lock();
   1734	}
   1735	rcu_read_unlock();
   1736}
   1737
   1738static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
   1739{
   1740	int rv;
   1741	struct p_barrier *p = pi->data;
   1742	struct drbd_epoch *epoch;
   1743
   1744	/* FIXME these are unacked on connection,
   1745	 * not a specific (peer)device.
   1746	 */
   1747	connection->current_epoch->barrier_nr = p->barrier;
   1748	connection->current_epoch->connection = connection;
   1749	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
   1750
   1751	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
   1752	 * the activity log, which means it would not be resynced in case the
   1753	 * R_PRIMARY crashes now.
   1754	 * Therefore we must send the barrier_ack after the barrier request was
   1755	 * completed. */
   1756	switch (connection->resource->write_ordering) {
   1757	case WO_NONE:
   1758		if (rv == FE_RECYCLED)
   1759			return 0;
   1760
   1761		/* receiver context, in the writeout path of the other node.
   1762		 * avoid potential distributed deadlock */
   1763		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
   1764		if (epoch)
   1765			break;
   1766		else
   1767			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
   1768		fallthrough;
   1769
   1770	case WO_BDEV_FLUSH:
   1771	case WO_DRAIN_IO:
   1772		conn_wait_active_ee_empty(connection);
   1773		drbd_flush(connection);
   1774
   1775		if (atomic_read(&connection->current_epoch->epoch_size)) {
   1776			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
   1777			if (epoch)
   1778				break;
   1779		}
   1780
   1781		return 0;
   1782	default:
   1783		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
   1784			 connection->resource->write_ordering);
   1785		return -EIO;
   1786	}
   1787
   1788	epoch->flags = 0;
   1789	atomic_set(&epoch->epoch_size, 0);
   1790	atomic_set(&epoch->active, 0);
   1791
   1792	spin_lock(&connection->epoch_lock);
   1793	if (atomic_read(&connection->current_epoch->epoch_size)) {
   1794		list_add(&epoch->list, &connection->current_epoch->list);
   1795		connection->current_epoch = epoch;
   1796		connection->epochs++;
   1797	} else {
   1798		/* The current_epoch got recycled while we allocated this one... */
   1799		kfree(epoch);
   1800	}
   1801	spin_unlock(&connection->epoch_lock);
   1802
   1803	return 0;
   1804}
   1805
   1806/* quick wrapper in case payload size != request_size (write same) */
   1807static void drbd_csum_ee_size(struct crypto_shash *h,
   1808			      struct drbd_peer_request *r, void *d,
   1809			      unsigned int payload_size)
   1810{
   1811	unsigned int tmp = r->i.size;
   1812	r->i.size = payload_size;
   1813	drbd_csum_ee(h, r, d);
   1814	r->i.size = tmp;
   1815}
   1816
   1817/* used from receive_RSDataReply (recv_resync_read)
   1818 * and from receive_Data.
   1819 * data_size: actual payload ("data in")
   1820 * 	for normal writes that is bi_size.
   1821 * 	for discards, that is zero.
   1822 * 	for write same, it is logical_block_size.
   1823 * both trim and write same have the bi_size ("data len to be affected")
   1824 * as extra argument in the packet header.
   1825 */
   1826static struct drbd_peer_request *
   1827read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
   1828	      struct packet_info *pi) __must_hold(local)
   1829{
   1830	struct drbd_device *device = peer_device->device;
   1831	const sector_t capacity = get_capacity(device->vdisk);
   1832	struct drbd_peer_request *peer_req;
   1833	struct page *page;
   1834	int digest_size, err;
   1835	unsigned int data_size = pi->size, ds;
   1836	void *dig_in = peer_device->connection->int_dig_in;
   1837	void *dig_vv = peer_device->connection->int_dig_vv;
   1838	unsigned long *data;
   1839	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
   1840	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
   1841
   1842	digest_size = 0;
   1843	if (!trim && peer_device->connection->peer_integrity_tfm) {
   1844		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
   1845		/*
   1846		 * FIXME: Receive the incoming digest into the receive buffer
   1847		 *	  here, together with its struct p_data?
   1848		 */
   1849		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
   1850		if (err)
   1851			return NULL;
   1852		data_size -= digest_size;
   1853	}
   1854
   1855	/* assume request_size == data_size, but special case trim. */
   1856	ds = data_size;
   1857	if (trim) {
   1858		if (!expect(data_size == 0))
   1859			return NULL;
   1860		ds = be32_to_cpu(trim->size);
   1861	} else if (zeroes) {
   1862		if (!expect(data_size == 0))
   1863			return NULL;
   1864		ds = be32_to_cpu(zeroes->size);
   1865	}
   1866
   1867	if (!expect(IS_ALIGNED(ds, 512)))
   1868		return NULL;
   1869	if (trim || zeroes) {
   1870		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
   1871			return NULL;
   1872	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
   1873		return NULL;
   1874
   1875	/* even though we trust out peer,
   1876	 * we sometimes have to double check. */
   1877	if (sector + (ds>>9) > capacity) {
   1878		drbd_err(device, "request from peer beyond end of local disk: "
   1879			"capacity: %llus < sector: %llus + size: %u\n",
   1880			(unsigned long long)capacity,
   1881			(unsigned long long)sector, ds);
   1882		return NULL;
   1883	}
   1884
   1885	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
   1886	 * "criss-cross" setup, that might cause write-out on some other DRBD,
   1887	 * which in turn might block on the other node at this very place.  */
   1888	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
   1889	if (!peer_req)
   1890		return NULL;
   1891
   1892	peer_req->flags |= EE_WRITE;
   1893	if (trim) {
   1894		peer_req->flags |= EE_TRIM;
   1895		return peer_req;
   1896	}
   1897	if (zeroes) {
   1898		peer_req->flags |= EE_ZEROOUT;
   1899		return peer_req;
   1900	}
   1901
   1902	/* receive payload size bytes into page chain */
   1903	ds = data_size;
   1904	page = peer_req->pages;
   1905	page_chain_for_each(page) {
   1906		unsigned len = min_t(int, ds, PAGE_SIZE);
   1907		data = kmap(page);
   1908		err = drbd_recv_all_warn(peer_device->connection, data, len);
   1909		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
   1910			drbd_err(device, "Fault injection: Corrupting data on receive\n");
   1911			data[0] = data[0] ^ (unsigned long)-1;
   1912		}
   1913		kunmap(page);
   1914		if (err) {
   1915			drbd_free_peer_req(device, peer_req);
   1916			return NULL;
   1917		}
   1918		ds -= len;
   1919	}
   1920
   1921	if (digest_size) {
   1922		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
   1923		if (memcmp(dig_in, dig_vv, digest_size)) {
   1924			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
   1925				(unsigned long long)sector, data_size);
   1926			drbd_free_peer_req(device, peer_req);
   1927			return NULL;
   1928		}
   1929	}
   1930	device->recv_cnt += data_size >> 9;
   1931	return peer_req;
   1932}
   1933
   1934/* drbd_drain_block() just takes a data block
   1935 * out of the socket input buffer, and discards it.
   1936 */
   1937static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
   1938{
   1939	struct page *page;
   1940	int err = 0;
   1941	void *data;
   1942
   1943	if (!data_size)
   1944		return 0;
   1945
   1946	page = drbd_alloc_pages(peer_device, 1, 1);
   1947
   1948	data = kmap(page);
   1949	while (data_size) {
   1950		unsigned int len = min_t(int, data_size, PAGE_SIZE);
   1951
   1952		err = drbd_recv_all_warn(peer_device->connection, data, len);
   1953		if (err)
   1954			break;
   1955		data_size -= len;
   1956	}
   1957	kunmap(page);
   1958	drbd_free_pages(peer_device->device, page, 0);
   1959	return err;
   1960}
   1961
   1962static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
   1963			   sector_t sector, int data_size)
   1964{
   1965	struct bio_vec bvec;
   1966	struct bvec_iter iter;
   1967	struct bio *bio;
   1968	int digest_size, err, expect;
   1969	void *dig_in = peer_device->connection->int_dig_in;
   1970	void *dig_vv = peer_device->connection->int_dig_vv;
   1971
   1972	digest_size = 0;
   1973	if (peer_device->connection->peer_integrity_tfm) {
   1974		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
   1975		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
   1976		if (err)
   1977			return err;
   1978		data_size -= digest_size;
   1979	}
   1980
   1981	/* optimistically update recv_cnt.  if receiving fails below,
   1982	 * we disconnect anyways, and counters will be reset. */
   1983	peer_device->device->recv_cnt += data_size>>9;
   1984
   1985	bio = req->master_bio;
   1986	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
   1987
   1988	bio_for_each_segment(bvec, bio, iter) {
   1989		void *mapped = bvec_kmap_local(&bvec);
   1990		expect = min_t(int, data_size, bvec.bv_len);
   1991		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
   1992		kunmap_local(mapped);
   1993		if (err)
   1994			return err;
   1995		data_size -= expect;
   1996	}
   1997
   1998	if (digest_size) {
   1999		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
   2000		if (memcmp(dig_in, dig_vv, digest_size)) {
   2001			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
   2002			return -EINVAL;
   2003		}
   2004	}
   2005
   2006	D_ASSERT(peer_device->device, data_size == 0);
   2007	return 0;
   2008}
   2009
   2010/*
   2011 * e_end_resync_block() is called in ack_sender context via
   2012 * drbd_finish_peer_reqs().
   2013 */
   2014static int e_end_resync_block(struct drbd_work *w, int unused)
   2015{
   2016	struct drbd_peer_request *peer_req =
   2017		container_of(w, struct drbd_peer_request, w);
   2018	struct drbd_peer_device *peer_device = peer_req->peer_device;
   2019	struct drbd_device *device = peer_device->device;
   2020	sector_t sector = peer_req->i.sector;
   2021	int err;
   2022
   2023	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
   2024
   2025	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   2026		drbd_set_in_sync(device, sector, peer_req->i.size);
   2027		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
   2028	} else {
   2029		/* Record failure to sync */
   2030		drbd_rs_failed_io(device, sector, peer_req->i.size);
   2031
   2032		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
   2033	}
   2034	dec_unacked(device);
   2035
   2036	return err;
   2037}
   2038
   2039static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
   2040			    struct packet_info *pi) __releases(local)
   2041{
   2042	struct drbd_device *device = peer_device->device;
   2043	struct drbd_peer_request *peer_req;
   2044
   2045	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
   2046	if (!peer_req)
   2047		goto fail;
   2048
   2049	dec_rs_pending(device);
   2050
   2051	inc_unacked(device);
   2052	/* corresponding dec_unacked() in e_end_resync_block()
   2053	 * respective _drbd_clear_done_ee */
   2054
   2055	peer_req->w.cb = e_end_resync_block;
   2056	peer_req->submit_jif = jiffies;
   2057
   2058	spin_lock_irq(&device->resource->req_lock);
   2059	list_add_tail(&peer_req->w.list, &device->sync_ee);
   2060	spin_unlock_irq(&device->resource->req_lock);
   2061
   2062	atomic_add(pi->size >> 9, &device->rs_sect_ev);
   2063	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
   2064				     DRBD_FAULT_RS_WR) == 0)
   2065		return 0;
   2066
   2067	/* don't care for the reason here */
   2068	drbd_err(device, "submit failed, triggering re-connect\n");
   2069	spin_lock_irq(&device->resource->req_lock);
   2070	list_del(&peer_req->w.list);
   2071	spin_unlock_irq(&device->resource->req_lock);
   2072
   2073	drbd_free_peer_req(device, peer_req);
   2074fail:
   2075	put_ldev(device);
   2076	return -EIO;
   2077}
   2078
   2079static struct drbd_request *
   2080find_request(struct drbd_device *device, struct rb_root *root, u64 id,
   2081	     sector_t sector, bool missing_ok, const char *func)
   2082{
   2083	struct drbd_request *req;
   2084
   2085	/* Request object according to our peer */
   2086	req = (struct drbd_request *)(unsigned long)id;
   2087	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
   2088		return req;
   2089	if (!missing_ok) {
   2090		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
   2091			(unsigned long)id, (unsigned long long)sector);
   2092	}
   2093	return NULL;
   2094}
   2095
   2096static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
   2097{
   2098	struct drbd_peer_device *peer_device;
   2099	struct drbd_device *device;
   2100	struct drbd_request *req;
   2101	sector_t sector;
   2102	int err;
   2103	struct p_data *p = pi->data;
   2104
   2105	peer_device = conn_peer_device(connection, pi->vnr);
   2106	if (!peer_device)
   2107		return -EIO;
   2108	device = peer_device->device;
   2109
   2110	sector = be64_to_cpu(p->sector);
   2111
   2112	spin_lock_irq(&device->resource->req_lock);
   2113	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
   2114	spin_unlock_irq(&device->resource->req_lock);
   2115	if (unlikely(!req))
   2116		return -EIO;
   2117
   2118	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
   2119	 * special casing it there for the various failure cases.
   2120	 * still no race with drbd_fail_pending_reads */
   2121	err = recv_dless_read(peer_device, req, sector, pi->size);
   2122	if (!err)
   2123		req_mod(req, DATA_RECEIVED);
   2124	/* else: nothing. handled from drbd_disconnect...
   2125	 * I don't think we may complete this just yet
   2126	 * in case we are "on-disconnect: freeze" */
   2127
   2128	return err;
   2129}
   2130
   2131static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
   2132{
   2133	struct drbd_peer_device *peer_device;
   2134	struct drbd_device *device;
   2135	sector_t sector;
   2136	int err;
   2137	struct p_data *p = pi->data;
   2138
   2139	peer_device = conn_peer_device(connection, pi->vnr);
   2140	if (!peer_device)
   2141		return -EIO;
   2142	device = peer_device->device;
   2143
   2144	sector = be64_to_cpu(p->sector);
   2145	D_ASSERT(device, p->block_id == ID_SYNCER);
   2146
   2147	if (get_ldev(device)) {
   2148		/* data is submitted to disk within recv_resync_read.
   2149		 * corresponding put_ldev done below on error,
   2150		 * or in drbd_peer_request_endio. */
   2151		err = recv_resync_read(peer_device, sector, pi);
   2152	} else {
   2153		if (__ratelimit(&drbd_ratelimit_state))
   2154			drbd_err(device, "Can not write resync data to local disk.\n");
   2155
   2156		err = drbd_drain_block(peer_device, pi->size);
   2157
   2158		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
   2159	}
   2160
   2161	atomic_add(pi->size >> 9, &device->rs_sect_in);
   2162
   2163	return err;
   2164}
   2165
   2166static void restart_conflicting_writes(struct drbd_device *device,
   2167				       sector_t sector, int size)
   2168{
   2169	struct drbd_interval *i;
   2170	struct drbd_request *req;
   2171
   2172	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
   2173		if (!i->local)
   2174			continue;
   2175		req = container_of(i, struct drbd_request, i);
   2176		if (req->rq_state & RQ_LOCAL_PENDING ||
   2177		    !(req->rq_state & RQ_POSTPONED))
   2178			continue;
   2179		/* as it is RQ_POSTPONED, this will cause it to
   2180		 * be queued on the retry workqueue. */
   2181		__req_mod(req, CONFLICT_RESOLVED, NULL);
   2182	}
   2183}
   2184
   2185/*
   2186 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
   2187 */
   2188static int e_end_block(struct drbd_work *w, int cancel)
   2189{
   2190	struct drbd_peer_request *peer_req =
   2191		container_of(w, struct drbd_peer_request, w);
   2192	struct drbd_peer_device *peer_device = peer_req->peer_device;
   2193	struct drbd_device *device = peer_device->device;
   2194	sector_t sector = peer_req->i.sector;
   2195	int err = 0, pcmd;
   2196
   2197	if (peer_req->flags & EE_SEND_WRITE_ACK) {
   2198		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
   2199			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
   2200				device->state.conn <= C_PAUSED_SYNC_T &&
   2201				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
   2202				P_RS_WRITE_ACK : P_WRITE_ACK;
   2203			err = drbd_send_ack(peer_device, pcmd, peer_req);
   2204			if (pcmd == P_RS_WRITE_ACK)
   2205				drbd_set_in_sync(device, sector, peer_req->i.size);
   2206		} else {
   2207			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
   2208			/* we expect it to be marked out of sync anyways...
   2209			 * maybe assert this?  */
   2210		}
   2211		dec_unacked(device);
   2212	}
   2213
   2214	/* we delete from the conflict detection hash _after_ we sent out the
   2215	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
   2216	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
   2217		spin_lock_irq(&device->resource->req_lock);
   2218		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
   2219		drbd_remove_epoch_entry_interval(device, peer_req);
   2220		if (peer_req->flags & EE_RESTART_REQUESTS)
   2221			restart_conflicting_writes(device, sector, peer_req->i.size);
   2222		spin_unlock_irq(&device->resource->req_lock);
   2223	} else
   2224		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
   2225
   2226	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
   2227
   2228	return err;
   2229}
   2230
   2231static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
   2232{
   2233	struct drbd_peer_request *peer_req =
   2234		container_of(w, struct drbd_peer_request, w);
   2235	struct drbd_peer_device *peer_device = peer_req->peer_device;
   2236	int err;
   2237
   2238	err = drbd_send_ack(peer_device, ack, peer_req);
   2239	dec_unacked(peer_device->device);
   2240
   2241	return err;
   2242}
   2243
   2244static int e_send_superseded(struct drbd_work *w, int unused)
   2245{
   2246	return e_send_ack(w, P_SUPERSEDED);
   2247}
   2248
   2249static int e_send_retry_write(struct drbd_work *w, int unused)
   2250{
   2251	struct drbd_peer_request *peer_req =
   2252		container_of(w, struct drbd_peer_request, w);
   2253	struct drbd_connection *connection = peer_req->peer_device->connection;
   2254
   2255	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
   2256			     P_RETRY_WRITE : P_SUPERSEDED);
   2257}
   2258
   2259static bool seq_greater(u32 a, u32 b)
   2260{
   2261	/*
   2262	 * We assume 32-bit wrap-around here.
   2263	 * For 24-bit wrap-around, we would have to shift:
   2264	 *  a <<= 8; b <<= 8;
   2265	 */
   2266	return (s32)a - (s32)b > 0;
   2267}
   2268
   2269static u32 seq_max(u32 a, u32 b)
   2270{
   2271	return seq_greater(a, b) ? a : b;
   2272}
   2273
   2274static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
   2275{
   2276	struct drbd_device *device = peer_device->device;
   2277	unsigned int newest_peer_seq;
   2278
   2279	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
   2280		spin_lock(&device->peer_seq_lock);
   2281		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
   2282		device->peer_seq = newest_peer_seq;
   2283		spin_unlock(&device->peer_seq_lock);
   2284		/* wake up only if we actually changed device->peer_seq */
   2285		if (peer_seq == newest_peer_seq)
   2286			wake_up(&device->seq_wait);
   2287	}
   2288}
   2289
   2290static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
   2291{
   2292	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
   2293}
   2294
   2295/* maybe change sync_ee into interval trees as well? */
   2296static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
   2297{
   2298	struct drbd_peer_request *rs_req;
   2299	bool rv = false;
   2300
   2301	spin_lock_irq(&device->resource->req_lock);
   2302	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
   2303		if (overlaps(peer_req->i.sector, peer_req->i.size,
   2304			     rs_req->i.sector, rs_req->i.size)) {
   2305			rv = true;
   2306			break;
   2307		}
   2308	}
   2309	spin_unlock_irq(&device->resource->req_lock);
   2310
   2311	return rv;
   2312}
   2313
   2314/* Called from receive_Data.
   2315 * Synchronize packets on sock with packets on msock.
   2316 *
   2317 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
   2318 * packet traveling on msock, they are still processed in the order they have
   2319 * been sent.
   2320 *
   2321 * Note: we don't care for Ack packets overtaking P_DATA packets.
   2322 *
   2323 * In case packet_seq is larger than device->peer_seq number, there are
   2324 * outstanding packets on the msock. We wait for them to arrive.
   2325 * In case we are the logically next packet, we update device->peer_seq
   2326 * ourselves. Correctly handles 32bit wrap around.
   2327 *
   2328 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
   2329 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
   2330 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
   2331 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
   2332 *
   2333 * returns 0 if we may process the packet,
   2334 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
   2335static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
   2336{
   2337	struct drbd_device *device = peer_device->device;
   2338	DEFINE_WAIT(wait);
   2339	long timeout;
   2340	int ret = 0, tp;
   2341
   2342	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
   2343		return 0;
   2344
   2345	spin_lock(&device->peer_seq_lock);
   2346	for (;;) {
   2347		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
   2348			device->peer_seq = seq_max(device->peer_seq, peer_seq);
   2349			break;
   2350		}
   2351
   2352		if (signal_pending(current)) {
   2353			ret = -ERESTARTSYS;
   2354			break;
   2355		}
   2356
   2357		rcu_read_lock();
   2358		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
   2359		rcu_read_unlock();
   2360
   2361		if (!tp)
   2362			break;
   2363
   2364		/* Only need to wait if two_primaries is enabled */
   2365		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
   2366		spin_unlock(&device->peer_seq_lock);
   2367		rcu_read_lock();
   2368		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
   2369		rcu_read_unlock();
   2370		timeout = schedule_timeout(timeout);
   2371		spin_lock(&device->peer_seq_lock);
   2372		if (!timeout) {
   2373			ret = -ETIMEDOUT;
   2374			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
   2375			break;
   2376		}
   2377	}
   2378	spin_unlock(&device->peer_seq_lock);
   2379	finish_wait(&device->seq_wait, &wait);
   2380	return ret;
   2381}
   2382
   2383/* see also bio_flags_to_wire()
   2384 * DRBD_REQ_*, because we need to semantically map the flags to data packet
   2385 * flags and back. We may replicate to other kernel versions. */
   2386static unsigned long wire_flags_to_bio_flags(u32 dpf)
   2387{
   2388	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
   2389		(dpf & DP_FUA ? REQ_FUA : 0) |
   2390		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
   2391}
   2392
   2393static unsigned long wire_flags_to_bio_op(u32 dpf)
   2394{
   2395	if (dpf & DP_ZEROES)
   2396		return REQ_OP_WRITE_ZEROES;
   2397	if (dpf & DP_DISCARD)
   2398		return REQ_OP_DISCARD;
   2399	else
   2400		return REQ_OP_WRITE;
   2401}
   2402
   2403static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
   2404				    unsigned int size)
   2405{
   2406	struct drbd_interval *i;
   2407
   2408    repeat:
   2409	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
   2410		struct drbd_request *req;
   2411		struct bio_and_error m;
   2412
   2413		if (!i->local)
   2414			continue;
   2415		req = container_of(i, struct drbd_request, i);
   2416		if (!(req->rq_state & RQ_POSTPONED))
   2417			continue;
   2418		req->rq_state &= ~RQ_POSTPONED;
   2419		__req_mod(req, NEG_ACKED, &m);
   2420		spin_unlock_irq(&device->resource->req_lock);
   2421		if (m.bio)
   2422			complete_master_bio(device, &m);
   2423		spin_lock_irq(&device->resource->req_lock);
   2424		goto repeat;
   2425	}
   2426}
   2427
   2428static int handle_write_conflicts(struct drbd_device *device,
   2429				  struct drbd_peer_request *peer_req)
   2430{
   2431	struct drbd_connection *connection = peer_req->peer_device->connection;
   2432	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
   2433	sector_t sector = peer_req->i.sector;
   2434	const unsigned int size = peer_req->i.size;
   2435	struct drbd_interval *i;
   2436	bool equal;
   2437	int err;
   2438
   2439	/*
   2440	 * Inserting the peer request into the write_requests tree will prevent
   2441	 * new conflicting local requests from being added.
   2442	 */
   2443	drbd_insert_interval(&device->write_requests, &peer_req->i);
   2444
   2445    repeat:
   2446	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
   2447		if (i == &peer_req->i)
   2448			continue;
   2449		if (i->completed)
   2450			continue;
   2451
   2452		if (!i->local) {
   2453			/*
   2454			 * Our peer has sent a conflicting remote request; this
   2455			 * should not happen in a two-node setup.  Wait for the
   2456			 * earlier peer request to complete.
   2457			 */
   2458			err = drbd_wait_misc(device, i);
   2459			if (err)
   2460				goto out;
   2461			goto repeat;
   2462		}
   2463
   2464		equal = i->sector == sector && i->size == size;
   2465		if (resolve_conflicts) {
   2466			/*
   2467			 * If the peer request is fully contained within the
   2468			 * overlapping request, it can be considered overwritten
   2469			 * and thus superseded; otherwise, it will be retried
   2470			 * once all overlapping requests have completed.
   2471			 */
   2472			bool superseded = i->sector <= sector && i->sector +
   2473				       (i->size >> 9) >= sector + (size >> 9);
   2474
   2475			if (!equal)
   2476				drbd_alert(device, "Concurrent writes detected: "
   2477					       "local=%llus +%u, remote=%llus +%u, "
   2478					       "assuming %s came first\n",
   2479					  (unsigned long long)i->sector, i->size,
   2480					  (unsigned long long)sector, size,
   2481					  superseded ? "local" : "remote");
   2482
   2483			peer_req->w.cb = superseded ? e_send_superseded :
   2484						   e_send_retry_write;
   2485			list_add_tail(&peer_req->w.list, &device->done_ee);
   2486			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
   2487
   2488			err = -ENOENT;
   2489			goto out;
   2490		} else {
   2491			struct drbd_request *req =
   2492				container_of(i, struct drbd_request, i);
   2493
   2494			if (!equal)
   2495				drbd_alert(device, "Concurrent writes detected: "
   2496					       "local=%llus +%u, remote=%llus +%u\n",
   2497					  (unsigned long long)i->sector, i->size,
   2498					  (unsigned long long)sector, size);
   2499
   2500			if (req->rq_state & RQ_LOCAL_PENDING ||
   2501			    !(req->rq_state & RQ_POSTPONED)) {
   2502				/*
   2503				 * Wait for the node with the discard flag to
   2504				 * decide if this request has been superseded
   2505				 * or needs to be retried.
   2506				 * Requests that have been superseded will
   2507				 * disappear from the write_requests tree.
   2508				 *
   2509				 * In addition, wait for the conflicting
   2510				 * request to finish locally before submitting
   2511				 * the conflicting peer request.
   2512				 */
   2513				err = drbd_wait_misc(device, &req->i);
   2514				if (err) {
   2515					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
   2516					fail_postponed_requests(device, sector, size);
   2517					goto out;
   2518				}
   2519				goto repeat;
   2520			}
   2521			/*
   2522			 * Remember to restart the conflicting requests after
   2523			 * the new peer request has completed.
   2524			 */
   2525			peer_req->flags |= EE_RESTART_REQUESTS;
   2526		}
   2527	}
   2528	err = 0;
   2529
   2530    out:
   2531	if (err)
   2532		drbd_remove_epoch_entry_interval(device, peer_req);
   2533	return err;
   2534}
   2535
   2536/* mirrored write */
   2537static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
   2538{
   2539	struct drbd_peer_device *peer_device;
   2540	struct drbd_device *device;
   2541	struct net_conf *nc;
   2542	sector_t sector;
   2543	struct drbd_peer_request *peer_req;
   2544	struct p_data *p = pi->data;
   2545	u32 peer_seq = be32_to_cpu(p->seq_num);
   2546	int op, op_flags;
   2547	u32 dp_flags;
   2548	int err, tp;
   2549
   2550	peer_device = conn_peer_device(connection, pi->vnr);
   2551	if (!peer_device)
   2552		return -EIO;
   2553	device = peer_device->device;
   2554
   2555	if (!get_ldev(device)) {
   2556		int err2;
   2557
   2558		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
   2559		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
   2560		atomic_inc(&connection->current_epoch->epoch_size);
   2561		err2 = drbd_drain_block(peer_device, pi->size);
   2562		if (!err)
   2563			err = err2;
   2564		return err;
   2565	}
   2566
   2567	/*
   2568	 * Corresponding put_ldev done either below (on various errors), or in
   2569	 * drbd_peer_request_endio, if we successfully submit the data at the
   2570	 * end of this function.
   2571	 */
   2572
   2573	sector = be64_to_cpu(p->sector);
   2574	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
   2575	if (!peer_req) {
   2576		put_ldev(device);
   2577		return -EIO;
   2578	}
   2579
   2580	peer_req->w.cb = e_end_block;
   2581	peer_req->submit_jif = jiffies;
   2582	peer_req->flags |= EE_APPLICATION;
   2583
   2584	dp_flags = be32_to_cpu(p->dp_flags);
   2585	op = wire_flags_to_bio_op(dp_flags);
   2586	op_flags = wire_flags_to_bio_flags(dp_flags);
   2587	if (pi->cmd == P_TRIM) {
   2588		D_ASSERT(peer_device, peer_req->i.size > 0);
   2589		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
   2590		D_ASSERT(peer_device, peer_req->pages == NULL);
   2591		/* need to play safe: an older DRBD sender
   2592		 * may mean zero-out while sending P_TRIM. */
   2593		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
   2594			peer_req->flags |= EE_ZEROOUT;
   2595	} else if (pi->cmd == P_ZEROES) {
   2596		D_ASSERT(peer_device, peer_req->i.size > 0);
   2597		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
   2598		D_ASSERT(peer_device, peer_req->pages == NULL);
   2599		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
   2600		if (dp_flags & DP_DISCARD)
   2601			peer_req->flags |= EE_TRIM;
   2602	} else if (peer_req->pages == NULL) {
   2603		D_ASSERT(device, peer_req->i.size == 0);
   2604		D_ASSERT(device, dp_flags & DP_FLUSH);
   2605	}
   2606
   2607	if (dp_flags & DP_MAY_SET_IN_SYNC)
   2608		peer_req->flags |= EE_MAY_SET_IN_SYNC;
   2609
   2610	spin_lock(&connection->epoch_lock);
   2611	peer_req->epoch = connection->current_epoch;
   2612	atomic_inc(&peer_req->epoch->epoch_size);
   2613	atomic_inc(&peer_req->epoch->active);
   2614	spin_unlock(&connection->epoch_lock);
   2615
   2616	rcu_read_lock();
   2617	nc = rcu_dereference(peer_device->connection->net_conf);
   2618	tp = nc->two_primaries;
   2619	if (peer_device->connection->agreed_pro_version < 100) {
   2620		switch (nc->wire_protocol) {
   2621		case DRBD_PROT_C:
   2622			dp_flags |= DP_SEND_WRITE_ACK;
   2623			break;
   2624		case DRBD_PROT_B:
   2625			dp_flags |= DP_SEND_RECEIVE_ACK;
   2626			break;
   2627		}
   2628	}
   2629	rcu_read_unlock();
   2630
   2631	if (dp_flags & DP_SEND_WRITE_ACK) {
   2632		peer_req->flags |= EE_SEND_WRITE_ACK;
   2633		inc_unacked(device);
   2634		/* corresponding dec_unacked() in e_end_block()
   2635		 * respective _drbd_clear_done_ee */
   2636	}
   2637
   2638	if (dp_flags & DP_SEND_RECEIVE_ACK) {
   2639		/* I really don't like it that the receiver thread
   2640		 * sends on the msock, but anyways */
   2641		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
   2642	}
   2643
   2644	if (tp) {
   2645		/* two primaries implies protocol C */
   2646		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
   2647		peer_req->flags |= EE_IN_INTERVAL_TREE;
   2648		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
   2649		if (err)
   2650			goto out_interrupted;
   2651		spin_lock_irq(&device->resource->req_lock);
   2652		err = handle_write_conflicts(device, peer_req);
   2653		if (err) {
   2654			spin_unlock_irq(&device->resource->req_lock);
   2655			if (err == -ENOENT) {
   2656				put_ldev(device);
   2657				return 0;
   2658			}
   2659			goto out_interrupted;
   2660		}
   2661	} else {
   2662		update_peer_seq(peer_device, peer_seq);
   2663		spin_lock_irq(&device->resource->req_lock);
   2664	}
   2665	/* TRIM and is processed synchronously,
   2666	 * we wait for all pending requests, respectively wait for
   2667	 * active_ee to become empty in drbd_submit_peer_request();
   2668	 * better not add ourselves here. */
   2669	if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
   2670		list_add_tail(&peer_req->w.list, &device->active_ee);
   2671	spin_unlock_irq(&device->resource->req_lock);
   2672
   2673	if (device->state.conn == C_SYNC_TARGET)
   2674		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
   2675
   2676	if (device->state.pdsk < D_INCONSISTENT) {
   2677		/* In case we have the only disk of the cluster, */
   2678		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
   2679		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
   2680		drbd_al_begin_io(device, &peer_req->i);
   2681		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
   2682	}
   2683
   2684	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
   2685				       DRBD_FAULT_DT_WR);
   2686	if (!err)
   2687		return 0;
   2688
   2689	/* don't care for the reason here */
   2690	drbd_err(device, "submit failed, triggering re-connect\n");
   2691	spin_lock_irq(&device->resource->req_lock);
   2692	list_del(&peer_req->w.list);
   2693	drbd_remove_epoch_entry_interval(device, peer_req);
   2694	spin_unlock_irq(&device->resource->req_lock);
   2695	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
   2696		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
   2697		drbd_al_complete_io(device, &peer_req->i);
   2698	}
   2699
   2700out_interrupted:
   2701	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
   2702	put_ldev(device);
   2703	drbd_free_peer_req(device, peer_req);
   2704	return err;
   2705}
   2706
   2707/* We may throttle resync, if the lower device seems to be busy,
   2708 * and current sync rate is above c_min_rate.
   2709 *
   2710 * To decide whether or not the lower device is busy, we use a scheme similar
   2711 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
   2712 * (more than 64 sectors) of activity we cannot account for with our own resync
   2713 * activity, it obviously is "busy".
   2714 *
   2715 * The current sync rate used here uses only the most recent two step marks,
   2716 * to have a short time average so we can react faster.
   2717 */
   2718bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
   2719		bool throttle_if_app_is_waiting)
   2720{
   2721	struct lc_element *tmp;
   2722	bool throttle = drbd_rs_c_min_rate_throttle(device);
   2723
   2724	if (!throttle || throttle_if_app_is_waiting)
   2725		return throttle;
   2726
   2727	spin_lock_irq(&device->al_lock);
   2728	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
   2729	if (tmp) {
   2730		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
   2731		if (test_bit(BME_PRIORITY, &bm_ext->flags))
   2732			throttle = false;
   2733		/* Do not slow down if app IO is already waiting for this extent,
   2734		 * and our progress is necessary for application IO to complete. */
   2735	}
   2736	spin_unlock_irq(&device->al_lock);
   2737
   2738	return throttle;
   2739}
   2740
   2741bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
   2742{
   2743	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
   2744	unsigned long db, dt, dbdt;
   2745	unsigned int c_min_rate;
   2746	int curr_events;
   2747
   2748	rcu_read_lock();
   2749	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
   2750	rcu_read_unlock();
   2751
   2752	/* feature disabled? */
   2753	if (c_min_rate == 0)
   2754		return false;
   2755
   2756	curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
   2757			atomic_read(&device->rs_sect_ev);
   2758
   2759	if (atomic_read(&device->ap_actlog_cnt)
   2760	    || curr_events - device->rs_last_events > 64) {
   2761		unsigned long rs_left;
   2762		int i;
   2763
   2764		device->rs_last_events = curr_events;
   2765
   2766		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
   2767		 * approx. */
   2768		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
   2769
   2770		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
   2771			rs_left = device->ov_left;
   2772		else
   2773			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
   2774
   2775		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
   2776		if (!dt)
   2777			dt++;
   2778		db = device->rs_mark_left[i] - rs_left;
   2779		dbdt = Bit2KB(db/dt);
   2780
   2781		if (dbdt > c_min_rate)
   2782			return true;
   2783	}
   2784	return false;
   2785}
   2786
   2787static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
   2788{
   2789	struct drbd_peer_device *peer_device;
   2790	struct drbd_device *device;
   2791	sector_t sector;
   2792	sector_t capacity;
   2793	struct drbd_peer_request *peer_req;
   2794	struct digest_info *di = NULL;
   2795	int size, verb;
   2796	unsigned int fault_type;
   2797	struct p_block_req *p =	pi->data;
   2798
   2799	peer_device = conn_peer_device(connection, pi->vnr);
   2800	if (!peer_device)
   2801		return -EIO;
   2802	device = peer_device->device;
   2803	capacity = get_capacity(device->vdisk);
   2804
   2805	sector = be64_to_cpu(p->sector);
   2806	size   = be32_to_cpu(p->blksize);
   2807
   2808	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
   2809		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
   2810				(unsigned long long)sector, size);
   2811		return -EINVAL;
   2812	}
   2813	if (sector + (size>>9) > capacity) {
   2814		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
   2815				(unsigned long long)sector, size);
   2816		return -EINVAL;
   2817	}
   2818
   2819	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
   2820		verb = 1;
   2821		switch (pi->cmd) {
   2822		case P_DATA_REQUEST:
   2823			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
   2824			break;
   2825		case P_RS_THIN_REQ:
   2826		case P_RS_DATA_REQUEST:
   2827		case P_CSUM_RS_REQUEST:
   2828		case P_OV_REQUEST:
   2829			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
   2830			break;
   2831		case P_OV_REPLY:
   2832			verb = 0;
   2833			dec_rs_pending(device);
   2834			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
   2835			break;
   2836		default:
   2837			BUG();
   2838		}
   2839		if (verb && __ratelimit(&drbd_ratelimit_state))
   2840			drbd_err(device, "Can not satisfy peer's read request, "
   2841			    "no local data.\n");
   2842
   2843		/* drain possibly payload */
   2844		return drbd_drain_block(peer_device, pi->size);
   2845	}
   2846
   2847	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
   2848	 * "criss-cross" setup, that might cause write-out on some other DRBD,
   2849	 * which in turn might block on the other node at this very place.  */
   2850	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
   2851			size, GFP_NOIO);
   2852	if (!peer_req) {
   2853		put_ldev(device);
   2854		return -ENOMEM;
   2855	}
   2856
   2857	switch (pi->cmd) {
   2858	case P_DATA_REQUEST:
   2859		peer_req->w.cb = w_e_end_data_req;
   2860		fault_type = DRBD_FAULT_DT_RD;
   2861		/* application IO, don't drbd_rs_begin_io */
   2862		peer_req->flags |= EE_APPLICATION;
   2863		goto submit;
   2864
   2865	case P_RS_THIN_REQ:
   2866		/* If at some point in the future we have a smart way to
   2867		   find out if this data block is completely deallocated,
   2868		   then we would do something smarter here than reading
   2869		   the block... */
   2870		peer_req->flags |= EE_RS_THIN_REQ;
   2871		fallthrough;
   2872	case P_RS_DATA_REQUEST:
   2873		peer_req->w.cb = w_e_end_rsdata_req;
   2874		fault_type = DRBD_FAULT_RS_RD;
   2875		/* used in the sector offset progress display */
   2876		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
   2877		break;
   2878
   2879	case P_OV_REPLY:
   2880	case P_CSUM_RS_REQUEST:
   2881		fault_type = DRBD_FAULT_RS_RD;
   2882		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
   2883		if (!di)
   2884			goto out_free_e;
   2885
   2886		di->digest_size = pi->size;
   2887		di->digest = (((char *)di)+sizeof(struct digest_info));
   2888
   2889		peer_req->digest = di;
   2890		peer_req->flags |= EE_HAS_DIGEST;
   2891
   2892		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
   2893			goto out_free_e;
   2894
   2895		if (pi->cmd == P_CSUM_RS_REQUEST) {
   2896			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
   2897			peer_req->w.cb = w_e_end_csum_rs_req;
   2898			/* used in the sector offset progress display */
   2899			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
   2900			/* remember to report stats in drbd_resync_finished */
   2901			device->use_csums = true;
   2902		} else if (pi->cmd == P_OV_REPLY) {
   2903			/* track progress, we may need to throttle */
   2904			atomic_add(size >> 9, &device->rs_sect_in);
   2905			peer_req->w.cb = w_e_end_ov_reply;
   2906			dec_rs_pending(device);
   2907			/* drbd_rs_begin_io done when we sent this request,
   2908			 * but accounting still needs to be done. */
   2909			goto submit_for_resync;
   2910		}
   2911		break;
   2912
   2913	case P_OV_REQUEST:
   2914		if (device->ov_start_sector == ~(sector_t)0 &&
   2915		    peer_device->connection->agreed_pro_version >= 90) {
   2916			unsigned long now = jiffies;
   2917			int i;
   2918			device->ov_start_sector = sector;
   2919			device->ov_position = sector;
   2920			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
   2921			device->rs_total = device->ov_left;
   2922			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
   2923				device->rs_mark_left[i] = device->ov_left;
   2924				device->rs_mark_time[i] = now;
   2925			}
   2926			drbd_info(device, "Online Verify start sector: %llu\n",
   2927					(unsigned long long)sector);
   2928		}
   2929		peer_req->w.cb = w_e_end_ov_req;
   2930		fault_type = DRBD_FAULT_RS_RD;
   2931		break;
   2932
   2933	default:
   2934		BUG();
   2935	}
   2936
   2937	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
   2938	 * wrt the receiver, but it is not as straightforward as it may seem.
   2939	 * Various places in the resync start and stop logic assume resync
   2940	 * requests are processed in order, requeuing this on the worker thread
   2941	 * introduces a bunch of new code for synchronization between threads.
   2942	 *
   2943	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
   2944	 * "forever", throttling after drbd_rs_begin_io will lock that extent
   2945	 * for application writes for the same time.  For now, just throttle
   2946	 * here, where the rest of the code expects the receiver to sleep for
   2947	 * a while, anyways.
   2948	 */
   2949
   2950	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
   2951	 * this defers syncer requests for some time, before letting at least
   2952	 * on request through.  The resync controller on the receiving side
   2953	 * will adapt to the incoming rate accordingly.
   2954	 *
   2955	 * We cannot throttle here if remote is Primary/SyncTarget:
   2956	 * we would also throttle its application reads.
   2957	 * In that case, throttling is done on the SyncTarget only.
   2958	 */
   2959
   2960	/* Even though this may be a resync request, we do add to "read_ee";
   2961	 * "sync_ee" is only used for resync WRITEs.
   2962	 * Add to list early, so debugfs can find this request
   2963	 * even if we have to sleep below. */
   2964	spin_lock_irq(&device->resource->req_lock);
   2965	list_add_tail(&peer_req->w.list, &device->read_ee);
   2966	spin_unlock_irq(&device->resource->req_lock);
   2967
   2968	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
   2969	if (device->state.peer != R_PRIMARY
   2970	&& drbd_rs_should_slow_down(device, sector, false))
   2971		schedule_timeout_uninterruptible(HZ/10);
   2972	update_receiver_timing_details(connection, drbd_rs_begin_io);
   2973	if (drbd_rs_begin_io(device, sector))
   2974		goto out_free_e;
   2975
   2976submit_for_resync:
   2977	atomic_add(size >> 9, &device->rs_sect_ev);
   2978
   2979submit:
   2980	update_receiver_timing_details(connection, drbd_submit_peer_request);
   2981	inc_unacked(device);
   2982	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
   2983				     fault_type) == 0)
   2984		return 0;
   2985
   2986	/* don't care for the reason here */
   2987	drbd_err(device, "submit failed, triggering re-connect\n");
   2988
   2989out_free_e:
   2990	spin_lock_irq(&device->resource->req_lock);
   2991	list_del(&peer_req->w.list);
   2992	spin_unlock_irq(&device->resource->req_lock);
   2993	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
   2994
   2995	put_ldev(device);
   2996	drbd_free_peer_req(device, peer_req);
   2997	return -EIO;
   2998}
   2999
   3000/*
   3001 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
   3002 */
   3003static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
   3004{
   3005	struct drbd_device *device = peer_device->device;
   3006	int self, peer, rv = -100;
   3007	unsigned long ch_self, ch_peer;
   3008	enum drbd_after_sb_p after_sb_0p;
   3009
   3010	self = device->ldev->md.uuid[UI_BITMAP] & 1;
   3011	peer = device->p_uuid[UI_BITMAP] & 1;
   3012
   3013	ch_peer = device->p_uuid[UI_SIZE];
   3014	ch_self = device->comm_bm_set;
   3015
   3016	rcu_read_lock();
   3017	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
   3018	rcu_read_unlock();
   3019	switch (after_sb_0p) {
   3020	case ASB_CONSENSUS:
   3021	case ASB_DISCARD_SECONDARY:
   3022	case ASB_CALL_HELPER:
   3023	case ASB_VIOLENTLY:
   3024		drbd_err(device, "Configuration error.\n");
   3025		break;
   3026	case ASB_DISCONNECT:
   3027		break;
   3028	case ASB_DISCARD_YOUNGER_PRI:
   3029		if (self == 0 && peer == 1) {
   3030			rv = -1;
   3031			break;
   3032		}
   3033		if (self == 1 && peer == 0) {
   3034			rv =  1;
   3035			break;
   3036		}
   3037		fallthrough;	/* to one of the other strategies */
   3038	case ASB_DISCARD_OLDER_PRI:
   3039		if (self == 0 && peer == 1) {
   3040			rv = 1;
   3041			break;
   3042		}
   3043		if (self == 1 && peer == 0) {
   3044			rv = -1;
   3045			break;
   3046		}
   3047		/* Else fall through to one of the other strategies... */
   3048		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
   3049		     "Using discard-least-changes instead\n");
   3050		fallthrough;
   3051	case ASB_DISCARD_ZERO_CHG:
   3052		if (ch_peer == 0 && ch_self == 0) {
   3053			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
   3054				? -1 : 1;
   3055			break;
   3056		} else {
   3057			if (ch_peer == 0) { rv =  1; break; }
   3058			if (ch_self == 0) { rv = -1; break; }
   3059		}
   3060		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
   3061			break;
   3062		fallthrough;
   3063	case ASB_DISCARD_LEAST_CHG:
   3064		if	(ch_self < ch_peer)
   3065			rv = -1;
   3066		else if (ch_self > ch_peer)
   3067			rv =  1;
   3068		else /* ( ch_self == ch_peer ) */
   3069		     /* Well, then use something else. */
   3070			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
   3071				? -1 : 1;
   3072		break;
   3073	case ASB_DISCARD_LOCAL:
   3074		rv = -1;
   3075		break;
   3076	case ASB_DISCARD_REMOTE:
   3077		rv =  1;
   3078	}
   3079
   3080	return rv;
   3081}
   3082
   3083/*
   3084 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
   3085 */
   3086static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
   3087{
   3088	struct drbd_device *device = peer_device->device;
   3089	int hg, rv = -100;
   3090	enum drbd_after_sb_p after_sb_1p;
   3091
   3092	rcu_read_lock();
   3093	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
   3094	rcu_read_unlock();
   3095	switch (after_sb_1p) {
   3096	case ASB_DISCARD_YOUNGER_PRI:
   3097	case ASB_DISCARD_OLDER_PRI:
   3098	case ASB_DISCARD_LEAST_CHG:
   3099	case ASB_DISCARD_LOCAL:
   3100	case ASB_DISCARD_REMOTE:
   3101	case ASB_DISCARD_ZERO_CHG:
   3102		drbd_err(device, "Configuration error.\n");
   3103		break;
   3104	case ASB_DISCONNECT:
   3105		break;
   3106	case ASB_CONSENSUS:
   3107		hg = drbd_asb_recover_0p(peer_device);
   3108		if (hg == -1 && device->state.role == R_SECONDARY)
   3109			rv = hg;
   3110		if (hg == 1  && device->state.role == R_PRIMARY)
   3111			rv = hg;
   3112		break;
   3113	case ASB_VIOLENTLY:
   3114		rv = drbd_asb_recover_0p(peer_device);
   3115		break;
   3116	case ASB_DISCARD_SECONDARY:
   3117		return device->state.role == R_PRIMARY ? 1 : -1;
   3118	case ASB_CALL_HELPER:
   3119		hg = drbd_asb_recover_0p(peer_device);
   3120		if (hg == -1 && device->state.role == R_PRIMARY) {
   3121			enum drbd_state_rv rv2;
   3122
   3123			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
   3124			  * we might be here in C_WF_REPORT_PARAMS which is transient.
   3125			  * we do not need to wait for the after state change work either. */
   3126			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
   3127			if (rv2 != SS_SUCCESS) {
   3128				drbd_khelper(device, "pri-lost-after-sb");
   3129			} else {
   3130				drbd_warn(device, "Successfully gave up primary role.\n");
   3131				rv = hg;
   3132			}
   3133		} else
   3134			rv = hg;
   3135	}
   3136
   3137	return rv;
   3138}
   3139
   3140/*
   3141 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
   3142 */
   3143static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
   3144{
   3145	struct drbd_device *device = peer_device->device;
   3146	int hg, rv = -100;
   3147	enum drbd_after_sb_p after_sb_2p;
   3148
   3149	rcu_read_lock();
   3150	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
   3151	rcu_read_unlock();
   3152	switch (after_sb_2p) {
   3153	case ASB_DISCARD_YOUNGER_PRI:
   3154	case ASB_DISCARD_OLDER_PRI:
   3155	case ASB_DISCARD_LEAST_CHG:
   3156	case ASB_DISCARD_LOCAL:
   3157	case ASB_DISCARD_REMOTE:
   3158	case ASB_CONSENSUS:
   3159	case ASB_DISCARD_SECONDARY:
   3160	case ASB_DISCARD_ZERO_CHG:
   3161		drbd_err(device, "Configuration error.\n");
   3162		break;
   3163	case ASB_VIOLENTLY:
   3164		rv = drbd_asb_recover_0p(peer_device);
   3165		break;
   3166	case ASB_DISCONNECT:
   3167		break;
   3168	case ASB_CALL_HELPER:
   3169		hg = drbd_asb_recover_0p(peer_device);
   3170		if (hg == -1) {
   3171			enum drbd_state_rv rv2;
   3172
   3173			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
   3174			  * we might be here in C_WF_REPORT_PARAMS which is transient.
   3175			  * we do not need to wait for the after state change work either. */
   3176			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
   3177			if (rv2 != SS_SUCCESS) {
   3178				drbd_khelper(device, "pri-lost-after-sb");
   3179			} else {
   3180				drbd_warn(device, "Successfully gave up primary role.\n");
   3181				rv = hg;
   3182			}
   3183		} else
   3184			rv = hg;
   3185	}
   3186
   3187	return rv;
   3188}
   3189
   3190static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
   3191			   u64 bits, u64 flags)
   3192{
   3193	if (!uuid) {
   3194		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
   3195		return;
   3196	}
   3197	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
   3198	     text,
   3199	     (unsigned long long)uuid[UI_CURRENT],
   3200	     (unsigned long long)uuid[UI_BITMAP],
   3201	     (unsigned long long)uuid[UI_HISTORY_START],
   3202	     (unsigned long long)uuid[UI_HISTORY_END],
   3203	     (unsigned long long)bits,
   3204	     (unsigned long long)flags);
   3205}
   3206
   3207/*
   3208  100	after split brain try auto recover
   3209    2	C_SYNC_SOURCE set BitMap
   3210    1	C_SYNC_SOURCE use BitMap
   3211    0	no Sync
   3212   -1	C_SYNC_TARGET use BitMap
   3213   -2	C_SYNC_TARGET set BitMap
   3214 -100	after split brain, disconnect
   3215-1000	unrelated data
   3216-1091   requires proto 91
   3217-1096   requires proto 96
   3218 */
   3219
   3220static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
   3221{
   3222	struct drbd_peer_device *const peer_device = first_peer_device(device);
   3223	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
   3224	u64 self, peer;
   3225	int i, j;
   3226
   3227	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
   3228	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
   3229
   3230	*rule_nr = 10;
   3231	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
   3232		return 0;
   3233
   3234	*rule_nr = 20;
   3235	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
   3236	     peer != UUID_JUST_CREATED)
   3237		return -2;
   3238
   3239	*rule_nr = 30;
   3240	if (self != UUID_JUST_CREATED &&
   3241	    (peer == UUID_JUST_CREATED || peer == (u64)0))
   3242		return 2;
   3243
   3244	if (self == peer) {
   3245		int rct, dc; /* roles at crash time */
   3246
   3247		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
   3248
   3249			if (connection->agreed_pro_version < 91)
   3250				return -1091;
   3251
   3252			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
   3253			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
   3254				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
   3255				drbd_uuid_move_history(device);
   3256				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
   3257				device->ldev->md.uuid[UI_BITMAP] = 0;
   3258
   3259				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
   3260					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
   3261				*rule_nr = 34;
   3262			} else {
   3263				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
   3264				*rule_nr = 36;
   3265			}
   3266
   3267			return 1;
   3268		}
   3269
   3270		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
   3271
   3272			if (connection->agreed_pro_version < 91)
   3273				return -1091;
   3274
   3275			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
   3276			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
   3277				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
   3278
   3279				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
   3280				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
   3281				device->p_uuid[UI_BITMAP] = 0UL;
   3282
   3283				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
   3284				*rule_nr = 35;
   3285			} else {
   3286				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
   3287				*rule_nr = 37;
   3288			}
   3289
   3290			return -1;
   3291		}
   3292
   3293		/* Common power [off|failure] */
   3294		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
   3295			(device->p_uuid[UI_FLAGS] & 2);
   3296		/* lowest bit is set when we were primary,
   3297		 * next bit (weight 2) is set when peer was primary */
   3298		*rule_nr = 40;
   3299
   3300		/* Neither has the "crashed primary" flag set,
   3301		 * only a replication link hickup. */
   3302		if (rct == 0)
   3303			return 0;
   3304
   3305		/* Current UUID equal and no bitmap uuid; does not necessarily
   3306		 * mean this was a "simultaneous hard crash", maybe IO was
   3307		 * frozen, so no UUID-bump happened.
   3308		 * This is a protocol change, overload DRBD_FF_WSAME as flag
   3309		 * for "new-enough" peer DRBD version. */
   3310		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
   3311			*rule_nr = 41;
   3312			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
   3313				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
   3314				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
   3315			}
   3316			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
   3317				/* At least one has the "crashed primary" bit set,
   3318				 * both are primary now, but neither has rotated its UUIDs?
   3319				 * "Can not happen." */
   3320				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
   3321				return -100;
   3322			}
   3323			if (device->state.role == R_PRIMARY)
   3324				return 1;
   3325			return -1;
   3326		}
   3327
   3328		/* Both are secondary.
   3329		 * Really looks like recovery from simultaneous hard crash.
   3330		 * Check which had been primary before, and arbitrate. */
   3331		switch (rct) {
   3332		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
   3333		case 1: /*  self_pri && !peer_pri */ return 1;
   3334		case 2: /* !self_pri &&  peer_pri */ return -1;
   3335		case 3: /*  self_pri &&  peer_pri */
   3336			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
   3337			return dc ? -1 : 1;
   3338		}
   3339	}
   3340
   3341	*rule_nr = 50;
   3342	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
   3343	if (self == peer)
   3344		return -1;
   3345
   3346	*rule_nr = 51;
   3347	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
   3348	if (self == peer) {
   3349		if (connection->agreed_pro_version < 96 ?
   3350		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
   3351		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
   3352		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
   3353			/* The last P_SYNC_UUID did not get though. Undo the last start of
   3354			   resync as sync source modifications of the peer's UUIDs. */
   3355
   3356			if (connection->agreed_pro_version < 91)
   3357				return -1091;
   3358
   3359			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
   3360			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
   3361
   3362			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
   3363			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
   3364
   3365			return -1;
   3366		}
   3367	}
   3368
   3369	*rule_nr = 60;
   3370	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
   3371	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
   3372		peer = device->p_uuid[i] & ~((u64)1);
   3373		if (self == peer)
   3374			return -2;
   3375	}
   3376
   3377	*rule_nr = 70;
   3378	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
   3379	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
   3380	if (self == peer)
   3381		return 1;
   3382
   3383	*rule_nr = 71;
   3384	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
   3385	if (self == peer) {
   3386		if (connection->agreed_pro_version < 96 ?
   3387		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
   3388		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
   3389		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
   3390			/* The last P_SYNC_UUID did not get though. Undo the last start of
   3391			   resync as sync source modifications of our UUIDs. */
   3392
   3393			if (connection->agreed_pro_version < 91)
   3394				return -1091;
   3395
   3396			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
   3397			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
   3398
   3399			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
   3400			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
   3401				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
   3402
   3403			return 1;
   3404		}
   3405	}
   3406
   3407
   3408	*rule_nr = 80;
   3409	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
   3410	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
   3411		self = device->ldev->md.uuid[i] & ~((u64)1);
   3412		if (self == peer)
   3413			return 2;
   3414	}
   3415
   3416	*rule_nr = 90;
   3417	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
   3418	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
   3419	if (self == peer && self != ((u64)0))
   3420		return 100;
   3421
   3422	*rule_nr = 100;
   3423	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
   3424		self = device->ldev->md.uuid[i] & ~((u64)1);
   3425		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
   3426			peer = device->p_uuid[j] & ~((u64)1);
   3427			if (self == peer)
   3428				return -100;
   3429		}
   3430	}
   3431
   3432	return -1000;
   3433}
   3434
   3435/* drbd_sync_handshake() returns the new conn state on success, or
   3436   CONN_MASK (-1) on failure.
   3437 */
   3438static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
   3439					   enum drbd_role peer_role,
   3440					   enum drbd_disk_state peer_disk) __must_hold(local)
   3441{
   3442	struct drbd_device *device = peer_device->device;
   3443	enum drbd_conns rv = C_MASK;
   3444	enum drbd_disk_state mydisk;
   3445	struct net_conf *nc;
   3446	int hg, rule_nr, rr_conflict, tentative, always_asbp;
   3447
   3448	mydisk = device->state.disk;
   3449	if (mydisk == D_NEGOTIATING)
   3450		mydisk = device->new_state_tmp.disk;
   3451
   3452	drbd_info(device, "drbd_sync_handshake:\n");
   3453
   3454	spin_lock_irq(&device->ldev->md.uuid_lock);
   3455	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
   3456	drbd_uuid_dump(device, "peer", device->p_uuid,
   3457		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
   3458
   3459	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
   3460	spin_unlock_irq(&device->ldev->md.uuid_lock);
   3461
   3462	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
   3463
   3464	if (hg == -1000) {
   3465		drbd_alert(device, "Unrelated data, aborting!\n");
   3466		return C_MASK;
   3467	}
   3468	if (hg < -0x10000) {
   3469		int proto, fflags;
   3470		hg = -hg;
   3471		proto = hg & 0xff;
   3472		fflags = (hg >> 8) & 0xff;
   3473		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
   3474					proto, fflags);
   3475		return C_MASK;
   3476	}
   3477	if (hg < -1000) {
   3478		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
   3479		return C_MASK;
   3480	}
   3481
   3482	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
   3483	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
   3484		int f = (hg == -100) || abs(hg) == 2;
   3485		hg = mydisk > D_INCONSISTENT ? 1 : -1;
   3486		if (f)
   3487			hg = hg*2;
   3488		drbd_info(device, "Becoming sync %s due to disk states.\n",
   3489		     hg > 0 ? "source" : "target");
   3490	}
   3491
   3492	if (abs(hg) == 100)
   3493		drbd_khelper(device, "initial-split-brain");
   3494
   3495	rcu_read_lock();
   3496	nc = rcu_dereference(peer_device->connection->net_conf);
   3497	always_asbp = nc->always_asbp;
   3498	rr_conflict = nc->rr_conflict;
   3499	tentative = nc->tentative;
   3500	rcu_read_unlock();
   3501
   3502	if (hg == 100 || (hg == -100 && always_asbp)) {
   3503		int pcount = (device->state.role == R_PRIMARY)
   3504			   + (peer_role == R_PRIMARY);
   3505		int forced = (hg == -100);
   3506
   3507		switch (pcount) {
   3508		case 0:
   3509			hg = drbd_asb_recover_0p(peer_device);
   3510			break;
   3511		case 1:
   3512			hg = drbd_asb_recover_1p(peer_device);
   3513			break;
   3514		case 2:
   3515			hg = drbd_asb_recover_2p(peer_device);
   3516			break;
   3517		}
   3518		if (abs(hg) < 100) {
   3519			drbd_warn(device, "Split-Brain detected, %d primaries, "
   3520			     "automatically solved. Sync from %s node\n",
   3521			     pcount, (hg < 0) ? "peer" : "this");
   3522			if (forced) {
   3523				drbd_warn(device, "Doing a full sync, since"
   3524				     " UUIDs where ambiguous.\n");
   3525				hg = hg*2;
   3526			}
   3527		}
   3528	}
   3529
   3530	if (hg == -100) {
   3531		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
   3532			hg = -1;
   3533		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
   3534			hg = 1;
   3535
   3536		if (abs(hg) < 100)
   3537			drbd_warn(device, "Split-Brain detected, manually solved. "
   3538			     "Sync from %s node\n",
   3539			     (hg < 0) ? "peer" : "this");
   3540	}
   3541
   3542	if (hg == -100) {
   3543		/* FIXME this log message is not correct if we end up here
   3544		 * after an attempted attach on a diskless node.
   3545		 * We just refuse to attach -- well, we drop the "connection"
   3546		 * to that disk, in a way... */
   3547		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
   3548		drbd_khelper(device, "split-brain");
   3549		return C_MASK;
   3550	}
   3551
   3552	if (hg > 0 && mydisk <= D_INCONSISTENT) {
   3553		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
   3554		return C_MASK;
   3555	}
   3556
   3557	if (hg < 0 && /* by intention we do not use mydisk here. */
   3558	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
   3559		switch (rr_conflict) {
   3560		case ASB_CALL_HELPER:
   3561			drbd_khelper(device, "pri-lost");
   3562			fallthrough;
   3563		case ASB_DISCONNECT:
   3564			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
   3565			return C_MASK;
   3566		case ASB_VIOLENTLY:
   3567			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
   3568			     "assumption\n");
   3569		}
   3570	}
   3571
   3572	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
   3573		if (hg == 0)
   3574			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
   3575		else
   3576			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
   3577				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
   3578				 abs(hg) >= 2 ? "full" : "bit-map based");
   3579		return C_MASK;
   3580	}
   3581
   3582	if (abs(hg) >= 2) {
   3583		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
   3584		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
   3585					BM_LOCKED_SET_ALLOWED))
   3586			return C_MASK;
   3587	}
   3588
   3589	if (hg > 0) { /* become sync source. */
   3590		rv = C_WF_BITMAP_S;
   3591	} else if (hg < 0) { /* become sync target */
   3592		rv = C_WF_BITMAP_T;
   3593	} else {
   3594		rv = C_CONNECTED;
   3595		if (drbd_bm_total_weight(device)) {
   3596			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
   3597			     drbd_bm_total_weight(device));
   3598		}
   3599	}
   3600
   3601	return rv;
   3602}
   3603
   3604static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
   3605{
   3606	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
   3607	if (peer == ASB_DISCARD_REMOTE)
   3608		return ASB_DISCARD_LOCAL;
   3609
   3610	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
   3611	if (peer == ASB_DISCARD_LOCAL)
   3612		return ASB_DISCARD_REMOTE;
   3613
   3614	/* everything else is valid if they are equal on both sides. */
   3615	return peer;
   3616}
   3617
   3618static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
   3619{
   3620	struct p_protocol *p = pi->data;
   3621	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
   3622	int p_proto, p_discard_my_data, p_two_primaries, cf;
   3623	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
   3624	char integrity_alg[SHARED_SECRET_MAX] = "";
   3625	struct crypto_shash *peer_integrity_tfm = NULL;
   3626	void *int_dig_in = NULL, *int_dig_vv = NULL;
   3627
   3628	p_proto		= be32_to_cpu(p->protocol);
   3629	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
   3630	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
   3631	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
   3632	p_two_primaries = be32_to_cpu(p->two_primaries);
   3633	cf		= be32_to_cpu(p->conn_flags);
   3634	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
   3635
   3636	if (connection->agreed_pro_version >= 87) {
   3637		int err;
   3638
   3639		if (pi->size > sizeof(integrity_alg))
   3640			return -EIO;
   3641		err = drbd_recv_all(connection, integrity_alg, pi->size);
   3642		if (err)
   3643			return err;
   3644		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
   3645	}
   3646
   3647	if (pi->cmd != P_PROTOCOL_UPDATE) {
   3648		clear_bit(CONN_DRY_RUN, &connection->flags);
   3649
   3650		if (cf & CF_DRY_RUN)
   3651			set_bit(CONN_DRY_RUN, &connection->flags);
   3652
   3653		rcu_read_lock();
   3654		nc = rcu_dereference(connection->net_conf);
   3655
   3656		if (p_proto != nc->wire_protocol) {
   3657			drbd_err(connection, "incompatible %s settings\n", "protocol");
   3658			goto disconnect_rcu_unlock;
   3659		}
   3660
   3661		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
   3662			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
   3663			goto disconnect_rcu_unlock;
   3664		}
   3665
   3666		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
   3667			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
   3668			goto disconnect_rcu_unlock;
   3669		}
   3670
   3671		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
   3672			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
   3673			goto disconnect_rcu_unlock;
   3674		}
   3675
   3676		if (p_discard_my_data && nc->discard_my_data) {
   3677			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
   3678			goto disconnect_rcu_unlock;
   3679		}
   3680
   3681		if (p_two_primaries != nc->two_primaries) {
   3682			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
   3683			goto disconnect_rcu_unlock;
   3684		}
   3685
   3686		if (strcmp(integrity_alg, nc->integrity_alg)) {
   3687			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
   3688			goto disconnect_rcu_unlock;
   3689		}
   3690
   3691		rcu_read_unlock();
   3692	}
   3693
   3694	if (integrity_alg[0]) {
   3695		int hash_size;
   3696
   3697		/*
   3698		 * We can only change the peer data integrity algorithm
   3699		 * here.  Changing our own data integrity algorithm
   3700		 * requires that we send a P_PROTOCOL_UPDATE packet at
   3701		 * the same time; otherwise, the peer has no way to
   3702		 * tell between which packets the algorithm should
   3703		 * change.
   3704		 */
   3705
   3706		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
   3707		if (IS_ERR(peer_integrity_tfm)) {
   3708			peer_integrity_tfm = NULL;
   3709			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
   3710				 integrity_alg);
   3711			goto disconnect;
   3712		}
   3713
   3714		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
   3715		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
   3716		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
   3717		if (!(int_dig_in && int_dig_vv)) {
   3718			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
   3719			goto disconnect;
   3720		}
   3721	}
   3722
   3723	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
   3724	if (!new_net_conf)
   3725		goto disconnect;
   3726
   3727	mutex_lock(&connection->data.mutex);
   3728	mutex_lock(&connection->resource->conf_update);
   3729	old_net_conf = connection->net_conf;
   3730	*new_net_conf = *old_net_conf;
   3731
   3732	new_net_conf->wire_protocol = p_proto;
   3733	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
   3734	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
   3735	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
   3736	new_net_conf->two_primaries = p_two_primaries;
   3737
   3738	rcu_assign_pointer(connection->net_conf, new_net_conf);
   3739	mutex_unlock(&connection->resource->conf_update);
   3740	mutex_unlock(&connection->data.mutex);
   3741
   3742	crypto_free_shash(connection->peer_integrity_tfm);
   3743	kfree(connection->int_dig_in);
   3744	kfree(connection->int_dig_vv);
   3745	connection->peer_integrity_tfm = peer_integrity_tfm;
   3746	connection->int_dig_in = int_dig_in;
   3747	connection->int_dig_vv = int_dig_vv;
   3748
   3749	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
   3750		drbd_info(connection, "peer data-integrity-alg: %s\n",
   3751			  integrity_alg[0] ? integrity_alg : "(none)");
   3752
   3753	kvfree_rcu(old_net_conf);
   3754	return 0;
   3755
   3756disconnect_rcu_unlock:
   3757	rcu_read_unlock();
   3758disconnect:
   3759	crypto_free_shash(peer_integrity_tfm);
   3760	kfree(int_dig_in);
   3761	kfree(int_dig_vv);
   3762	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
   3763	return -EIO;
   3764}
   3765
   3766/* helper function
   3767 * input: alg name, feature name
   3768 * return: NULL (alg name was "")
   3769 *         ERR_PTR(error) if something goes wrong
   3770 *         or the crypto hash ptr, if it worked out ok. */
   3771static struct crypto_shash *drbd_crypto_alloc_digest_safe(
   3772		const struct drbd_device *device,
   3773		const char *alg, const char *name)
   3774{
   3775	struct crypto_shash *tfm;
   3776
   3777	if (!alg[0])
   3778		return NULL;
   3779
   3780	tfm = crypto_alloc_shash(alg, 0, 0);
   3781	if (IS_ERR(tfm)) {
   3782		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
   3783			alg, name, PTR_ERR(tfm));
   3784		return tfm;
   3785	}
   3786	return tfm;
   3787}
   3788
   3789static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
   3790{
   3791	void *buffer = connection->data.rbuf;
   3792	int size = pi->size;
   3793
   3794	while (size) {
   3795		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
   3796		s = drbd_recv(connection, buffer, s);
   3797		if (s <= 0) {
   3798			if (s < 0)
   3799				return s;
   3800			break;
   3801		}
   3802		size -= s;
   3803	}
   3804	if (size)
   3805		return -EIO;
   3806	return 0;
   3807}
   3808
   3809/*
   3810 * config_unknown_volume  -  device configuration command for unknown volume
   3811 *
   3812 * When a device is added to an existing connection, the node on which the
   3813 * device is added first will send configuration commands to its peer but the
   3814 * peer will not know about the device yet.  It will warn and ignore these
   3815 * commands.  Once the device is added on the second node, the second node will
   3816 * send the same device configuration commands, but in the other direction.
   3817 *
   3818 * (We can also end up here if drbd is misconfigured.)
   3819 */
   3820static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
   3821{
   3822	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
   3823		  cmdname(pi->cmd), pi->vnr);
   3824	return ignore_remaining_packet(connection, pi);
   3825}
   3826
   3827static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
   3828{
   3829	struct drbd_peer_device *peer_device;
   3830	struct drbd_device *device;
   3831	struct p_rs_param_95 *p;
   3832	unsigned int header_size, data_size, exp_max_sz;
   3833	struct crypto_shash *verify_tfm = NULL;
   3834	struct crypto_shash *csums_tfm = NULL;
   3835	struct net_conf *old_net_conf, *new_net_conf = NULL;
   3836	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
   3837	const int apv = connection->agreed_pro_version;
   3838	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
   3839	unsigned int fifo_size = 0;
   3840	int err;
   3841
   3842	peer_device = conn_peer_device(connection, pi->vnr);
   3843	if (!peer_device)
   3844		return config_unknown_volume(connection, pi);
   3845	device = peer_device->device;
   3846
   3847	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
   3848		    : apv == 88 ? sizeof(struct p_rs_param)
   3849					+ SHARED_SECRET_MAX
   3850		    : apv <= 94 ? sizeof(struct p_rs_param_89)
   3851		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
   3852
   3853	if (pi->size > exp_max_sz) {
   3854		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
   3855		    pi->size, exp_max_sz);
   3856		return -EIO;
   3857	}
   3858
   3859	if (apv <= 88) {
   3860		header_size = sizeof(struct p_rs_param);
   3861		data_size = pi->size - header_size;
   3862	} else if (apv <= 94) {
   3863		header_size = sizeof(struct p_rs_param_89);
   3864		data_size = pi->size - header_size;
   3865		D_ASSERT(device, data_size == 0);
   3866	} else {
   3867		header_size = sizeof(struct p_rs_param_95);
   3868		data_size = pi->size - header_size;
   3869		D_ASSERT(device, data_size == 0);
   3870	}
   3871
   3872	/* initialize verify_alg and csums_alg */
   3873	p = pi->data;
   3874	BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
   3875	memset(&p->algs, 0, sizeof(p->algs));
   3876
   3877	err = drbd_recv_all(peer_device->connection, p, header_size);
   3878	if (err)
   3879		return err;
   3880
   3881	mutex_lock(&connection->resource->conf_update);
   3882	old_net_conf = peer_device->connection->net_conf;
   3883	if (get_ldev(device)) {
   3884		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
   3885		if (!new_disk_conf) {
   3886			put_ldev(device);
   3887			mutex_unlock(&connection->resource->conf_update);
   3888			drbd_err(device, "Allocation of new disk_conf failed\n");
   3889			return -ENOMEM;
   3890		}
   3891
   3892		old_disk_conf = device->ldev->disk_conf;
   3893		*new_disk_conf = *old_disk_conf;
   3894
   3895		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
   3896	}
   3897
   3898	if (apv >= 88) {
   3899		if (apv == 88) {
   3900			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
   3901				drbd_err(device, "verify-alg of wrong size, "
   3902					"peer wants %u, accepting only up to %u byte\n",
   3903					data_size, SHARED_SECRET_MAX);
   3904				goto reconnect;
   3905			}
   3906
   3907			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
   3908			if (err)
   3909				goto reconnect;
   3910			/* we expect NUL terminated string */
   3911			/* but just in case someone tries to be evil */
   3912			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
   3913			p->verify_alg[data_size-1] = 0;
   3914
   3915		} else /* apv >= 89 */ {
   3916			/* we still expect NUL terminated strings */
   3917			/* but just in case someone tries to be evil */
   3918			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
   3919			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
   3920			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
   3921			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
   3922		}
   3923
   3924		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
   3925			if (device->state.conn == C_WF_REPORT_PARAMS) {
   3926				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
   3927				    old_net_conf->verify_alg, p->verify_alg);
   3928				goto disconnect;
   3929			}
   3930			verify_tfm = drbd_crypto_alloc_digest_safe(device,
   3931					p->verify_alg, "verify-alg");
   3932			if (IS_ERR(verify_tfm)) {
   3933				verify_tfm = NULL;
   3934				goto disconnect;
   3935			}
   3936		}
   3937
   3938		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
   3939			if (device->state.conn == C_WF_REPORT_PARAMS) {
   3940				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
   3941				    old_net_conf->csums_alg, p->csums_alg);
   3942				goto disconnect;
   3943			}
   3944			csums_tfm = drbd_crypto_alloc_digest_safe(device,
   3945					p->csums_alg, "csums-alg");
   3946			if (IS_ERR(csums_tfm)) {
   3947				csums_tfm = NULL;
   3948				goto disconnect;
   3949			}
   3950		}
   3951
   3952		if (apv > 94 && new_disk_conf) {
   3953			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
   3954			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
   3955			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
   3956			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
   3957
   3958			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
   3959			if (fifo_size != device->rs_plan_s->size) {
   3960				new_plan = fifo_alloc(fifo_size);
   3961				if (!new_plan) {
   3962					drbd_err(device, "kmalloc of fifo_buffer failed");
   3963					put_ldev(device);
   3964					goto disconnect;
   3965				}
   3966			}
   3967		}
   3968
   3969		if (verify_tfm || csums_tfm) {
   3970			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
   3971			if (!new_net_conf)
   3972				goto disconnect;
   3973
   3974			*new_net_conf = *old_net_conf;
   3975
   3976			if (verify_tfm) {
   3977				strcpy(new_net_conf->verify_alg, p->verify_alg);
   3978				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
   3979				crypto_free_shash(peer_device->connection->verify_tfm);
   3980				peer_device->connection->verify_tfm = verify_tfm;
   3981				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
   3982			}
   3983			if (csums_tfm) {
   3984				strcpy(new_net_conf->csums_alg, p->csums_alg);
   3985				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
   3986				crypto_free_shash(peer_device->connection->csums_tfm);
   3987				peer_device->connection->csums_tfm = csums_tfm;
   3988				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
   3989			}
   3990			rcu_assign_pointer(connection->net_conf, new_net_conf);
   3991		}
   3992	}
   3993
   3994	if (new_disk_conf) {
   3995		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
   3996		put_ldev(device);
   3997	}
   3998
   3999	if (new_plan) {
   4000		old_plan = device->rs_plan_s;
   4001		rcu_assign_pointer(device->rs_plan_s, new_plan);
   4002	}
   4003
   4004	mutex_unlock(&connection->resource->conf_update);
   4005	synchronize_rcu();
   4006	if (new_net_conf)
   4007		kfree(old_net_conf);
   4008	kfree(old_disk_conf);
   4009	kfree(old_plan);
   4010
   4011	return 0;
   4012
   4013reconnect:
   4014	if (new_disk_conf) {
   4015		put_ldev(device);
   4016		kfree(new_disk_conf);
   4017	}
   4018	mutex_unlock(&connection->resource->conf_update);
   4019	return -EIO;
   4020
   4021disconnect:
   4022	kfree(new_plan);
   4023	if (new_disk_conf) {
   4024		put_ldev(device);
   4025		kfree(new_disk_conf);
   4026	}
   4027	mutex_unlock(&connection->resource->conf_update);
   4028	/* just for completeness: actually not needed,
   4029	 * as this is not reached if csums_tfm was ok. */
   4030	crypto_free_shash(csums_tfm);
   4031	/* but free the verify_tfm again, if csums_tfm did not work out */
   4032	crypto_free_shash(verify_tfm);
   4033	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4034	return -EIO;
   4035}
   4036
   4037/* warn if the arguments differ by more than 12.5% */
   4038static void warn_if_differ_considerably(struct drbd_device *device,
   4039	const char *s, sector_t a, sector_t b)
   4040{
   4041	sector_t d;
   4042	if (a == 0 || b == 0)
   4043		return;
   4044	d = (a > b) ? (a - b) : (b - a);
   4045	if (d > (a>>3) || d > (b>>3))
   4046		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
   4047		     (unsigned long long)a, (unsigned long long)b);
   4048}
   4049
   4050static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
   4051{
   4052	struct drbd_peer_device *peer_device;
   4053	struct drbd_device *device;
   4054	struct p_sizes *p = pi->data;
   4055	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
   4056	enum determine_dev_size dd = DS_UNCHANGED;
   4057	sector_t p_size, p_usize, p_csize, my_usize;
   4058	sector_t new_size, cur_size;
   4059	int ldsc = 0; /* local disk size changed */
   4060	enum dds_flags ddsf;
   4061
   4062	peer_device = conn_peer_device(connection, pi->vnr);
   4063	if (!peer_device)
   4064		return config_unknown_volume(connection, pi);
   4065	device = peer_device->device;
   4066	cur_size = get_capacity(device->vdisk);
   4067
   4068	p_size = be64_to_cpu(p->d_size);
   4069	p_usize = be64_to_cpu(p->u_size);
   4070	p_csize = be64_to_cpu(p->c_size);
   4071
   4072	/* just store the peer's disk size for now.
   4073	 * we still need to figure out whether we accept that. */
   4074	device->p_size = p_size;
   4075
   4076	if (get_ldev(device)) {
   4077		rcu_read_lock();
   4078		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
   4079		rcu_read_unlock();
   4080
   4081		warn_if_differ_considerably(device, "lower level device sizes",
   4082			   p_size, drbd_get_max_capacity(device->ldev));
   4083		warn_if_differ_considerably(device, "user requested size",
   4084					    p_usize, my_usize);
   4085
   4086		/* if this is the first connect, or an otherwise expected
   4087		 * param exchange, choose the minimum */
   4088		if (device->state.conn == C_WF_REPORT_PARAMS)
   4089			p_usize = min_not_zero(my_usize, p_usize);
   4090
   4091		/* Never shrink a device with usable data during connect,
   4092		 * or "attach" on the peer.
   4093		 * But allow online shrinking if we are connected. */
   4094		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
   4095		if (new_size < cur_size &&
   4096		    device->state.disk >= D_OUTDATED &&
   4097		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
   4098			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
   4099					(unsigned long long)new_size, (unsigned long long)cur_size);
   4100			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4101			put_ldev(device);
   4102			return -EIO;
   4103		}
   4104
   4105		if (my_usize != p_usize) {
   4106			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
   4107
   4108			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
   4109			if (!new_disk_conf) {
   4110				put_ldev(device);
   4111				return -ENOMEM;
   4112			}
   4113
   4114			mutex_lock(&connection->resource->conf_update);
   4115			old_disk_conf = device->ldev->disk_conf;
   4116			*new_disk_conf = *old_disk_conf;
   4117			new_disk_conf->disk_size = p_usize;
   4118
   4119			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
   4120			mutex_unlock(&connection->resource->conf_update);
   4121			kvfree_rcu(old_disk_conf);
   4122
   4123			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
   4124				 (unsigned long)p_usize, (unsigned long)my_usize);
   4125		}
   4126
   4127		put_ldev(device);
   4128	}
   4129
   4130	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
   4131	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
   4132	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
   4133	   drbd_reconsider_queue_parameters(), we can be sure that after
   4134	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
   4135
   4136	ddsf = be16_to_cpu(p->dds_flags);
   4137	if (get_ldev(device)) {
   4138		drbd_reconsider_queue_parameters(device, device->ldev, o);
   4139		dd = drbd_determine_dev_size(device, ddsf, NULL);
   4140		put_ldev(device);
   4141		if (dd == DS_ERROR)
   4142			return -EIO;
   4143		drbd_md_sync(device);
   4144	} else {
   4145		/*
   4146		 * I am diskless, need to accept the peer's *current* size.
   4147		 * I must NOT accept the peers backing disk size,
   4148		 * it may have been larger than mine all along...
   4149		 *
   4150		 * At this point, the peer knows more about my disk, or at
   4151		 * least about what we last agreed upon, than myself.
   4152		 * So if his c_size is less than his d_size, the most likely
   4153		 * reason is that *my* d_size was smaller last time we checked.
   4154		 *
   4155		 * However, if he sends a zero current size,
   4156		 * take his (user-capped or) backing disk size anyways.
   4157		 *
   4158		 * Unless of course he does not have a disk himself.
   4159		 * In which case we ignore this completely.
   4160		 */
   4161		sector_t new_size = p_csize ?: p_usize ?: p_size;
   4162		drbd_reconsider_queue_parameters(device, NULL, o);
   4163		if (new_size == 0) {
   4164			/* Ignore, peer does not know nothing. */
   4165		} else if (new_size == cur_size) {
   4166			/* nothing to do */
   4167		} else if (cur_size != 0 && p_size == 0) {
   4168			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
   4169					(unsigned long long)new_size, (unsigned long long)cur_size);
   4170		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
   4171			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
   4172					(unsigned long long)new_size, (unsigned long long)cur_size);
   4173			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4174			return -EIO;
   4175		} else {
   4176			/* I believe the peer, if
   4177			 *  - I don't have a current size myself
   4178			 *  - we agree on the size anyways
   4179			 *  - I do have a current size, am Secondary,
   4180			 *    and he has the only disk
   4181			 *  - I do have a current size, am Primary,
   4182			 *    and he has the only disk,
   4183			 *    which is larger than my current size
   4184			 */
   4185			drbd_set_my_capacity(device, new_size);
   4186		}
   4187	}
   4188
   4189	if (get_ldev(device)) {
   4190		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
   4191			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
   4192			ldsc = 1;
   4193		}
   4194
   4195		put_ldev(device);
   4196	}
   4197
   4198	if (device->state.conn > C_WF_REPORT_PARAMS) {
   4199		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
   4200		    ldsc) {
   4201			/* we have different sizes, probably peer
   4202			 * needs to know my new size... */
   4203			drbd_send_sizes(peer_device, 0, ddsf);
   4204		}
   4205		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
   4206		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
   4207			if (device->state.pdsk >= D_INCONSISTENT &&
   4208			    device->state.disk >= D_INCONSISTENT) {
   4209				if (ddsf & DDSF_NO_RESYNC)
   4210					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
   4211				else
   4212					resync_after_online_grow(device);
   4213			} else
   4214				set_bit(RESYNC_AFTER_NEG, &device->flags);
   4215		}
   4216	}
   4217
   4218	return 0;
   4219}
   4220
   4221static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
   4222{
   4223	struct drbd_peer_device *peer_device;
   4224	struct drbd_device *device;
   4225	struct p_uuids *p = pi->data;
   4226	u64 *p_uuid;
   4227	int i, updated_uuids = 0;
   4228
   4229	peer_device = conn_peer_device(connection, pi->vnr);
   4230	if (!peer_device)
   4231		return config_unknown_volume(connection, pi);
   4232	device = peer_device->device;
   4233
   4234	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
   4235	if (!p_uuid)
   4236		return false;
   4237
   4238	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
   4239		p_uuid[i] = be64_to_cpu(p->uuid[i]);
   4240
   4241	kfree(device->p_uuid);
   4242	device->p_uuid = p_uuid;
   4243
   4244	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
   4245	    device->state.disk < D_INCONSISTENT &&
   4246	    device->state.role == R_PRIMARY &&
   4247	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
   4248		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
   4249		    (unsigned long long)device->ed_uuid);
   4250		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4251		return -EIO;
   4252	}
   4253
   4254	if (get_ldev(device)) {
   4255		int skip_initial_sync =
   4256			device->state.conn == C_CONNECTED &&
   4257			peer_device->connection->agreed_pro_version >= 90 &&
   4258			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
   4259			(p_uuid[UI_FLAGS] & 8);
   4260		if (skip_initial_sync) {
   4261			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
   4262			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
   4263					"clear_n_write from receive_uuids",
   4264					BM_LOCKED_TEST_ALLOWED);
   4265			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
   4266			_drbd_uuid_set(device, UI_BITMAP, 0);
   4267			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
   4268					CS_VERBOSE, NULL);
   4269			drbd_md_sync(device);
   4270			updated_uuids = 1;
   4271		}
   4272		put_ldev(device);
   4273	} else if (device->state.disk < D_INCONSISTENT &&
   4274		   device->state.role == R_PRIMARY) {
   4275		/* I am a diskless primary, the peer just created a new current UUID
   4276		   for me. */
   4277		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
   4278	}
   4279
   4280	/* Before we test for the disk state, we should wait until an eventually
   4281	   ongoing cluster wide state change is finished. That is important if
   4282	   we are primary and are detaching from our disk. We need to see the
   4283	   new disk state... */
   4284	mutex_lock(device->state_mutex);
   4285	mutex_unlock(device->state_mutex);
   4286	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
   4287		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
   4288
   4289	if (updated_uuids)
   4290		drbd_print_uuids(device, "receiver updated UUIDs to");
   4291
   4292	return 0;
   4293}
   4294
   4295/**
   4296 * convert_state() - Converts the peer's view of the cluster state to our point of view
   4297 * @ps:		The state as seen by the peer.
   4298 */
   4299static union drbd_state convert_state(union drbd_state ps)
   4300{
   4301	union drbd_state ms;
   4302
   4303	static enum drbd_conns c_tab[] = {
   4304		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
   4305		[C_CONNECTED] = C_CONNECTED,
   4306
   4307		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
   4308		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
   4309		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
   4310		[C_VERIFY_S]       = C_VERIFY_T,
   4311		[C_MASK]   = C_MASK,
   4312	};
   4313
   4314	ms.i = ps.i;
   4315
   4316	ms.conn = c_tab[ps.conn];
   4317	ms.peer = ps.role;
   4318	ms.role = ps.peer;
   4319	ms.pdsk = ps.disk;
   4320	ms.disk = ps.pdsk;
   4321	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
   4322
   4323	return ms;
   4324}
   4325
   4326static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
   4327{
   4328	struct drbd_peer_device *peer_device;
   4329	struct drbd_device *device;
   4330	struct p_req_state *p = pi->data;
   4331	union drbd_state mask, val;
   4332	enum drbd_state_rv rv;
   4333
   4334	peer_device = conn_peer_device(connection, pi->vnr);
   4335	if (!peer_device)
   4336		return -EIO;
   4337	device = peer_device->device;
   4338
   4339	mask.i = be32_to_cpu(p->mask);
   4340	val.i = be32_to_cpu(p->val);
   4341
   4342	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
   4343	    mutex_is_locked(device->state_mutex)) {
   4344		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
   4345		return 0;
   4346	}
   4347
   4348	mask = convert_state(mask);
   4349	val = convert_state(val);
   4350
   4351	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
   4352	drbd_send_sr_reply(peer_device, rv);
   4353
   4354	drbd_md_sync(device);
   4355
   4356	return 0;
   4357}
   4358
   4359static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
   4360{
   4361	struct p_req_state *p = pi->data;
   4362	union drbd_state mask, val;
   4363	enum drbd_state_rv rv;
   4364
   4365	mask.i = be32_to_cpu(p->mask);
   4366	val.i = be32_to_cpu(p->val);
   4367
   4368	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
   4369	    mutex_is_locked(&connection->cstate_mutex)) {
   4370		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
   4371		return 0;
   4372	}
   4373
   4374	mask = convert_state(mask);
   4375	val = convert_state(val);
   4376
   4377	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
   4378	conn_send_sr_reply(connection, rv);
   4379
   4380	return 0;
   4381}
   4382
   4383static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
   4384{
   4385	struct drbd_peer_device *peer_device;
   4386	struct drbd_device *device;
   4387	struct p_state *p = pi->data;
   4388	union drbd_state os, ns, peer_state;
   4389	enum drbd_disk_state real_peer_disk;
   4390	enum chg_state_flags cs_flags;
   4391	int rv;
   4392
   4393	peer_device = conn_peer_device(connection, pi->vnr);
   4394	if (!peer_device)
   4395		return config_unknown_volume(connection, pi);
   4396	device = peer_device->device;
   4397
   4398	peer_state.i = be32_to_cpu(p->state);
   4399
   4400	real_peer_disk = peer_state.disk;
   4401	if (peer_state.disk == D_NEGOTIATING) {
   4402		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
   4403		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
   4404	}
   4405
   4406	spin_lock_irq(&device->resource->req_lock);
   4407 retry:
   4408	os = ns = drbd_read_state(device);
   4409	spin_unlock_irq(&device->resource->req_lock);
   4410
   4411	/* If some other part of the code (ack_receiver thread, timeout)
   4412	 * already decided to close the connection again,
   4413	 * we must not "re-establish" it here. */
   4414	if (os.conn <= C_TEAR_DOWN)
   4415		return -ECONNRESET;
   4416
   4417	/* If this is the "end of sync" confirmation, usually the peer disk
   4418	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
   4419	 * set) resync started in PausedSyncT, or if the timing of pause-/
   4420	 * unpause-sync events has been "just right", the peer disk may
   4421	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
   4422	 */
   4423	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
   4424	    real_peer_disk == D_UP_TO_DATE &&
   4425	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
   4426		/* If we are (becoming) SyncSource, but peer is still in sync
   4427		 * preparation, ignore its uptodate-ness to avoid flapping, it
   4428		 * will change to inconsistent once the peer reaches active
   4429		 * syncing states.
   4430		 * It may have changed syncer-paused flags, however, so we
   4431		 * cannot ignore this completely. */
   4432		if (peer_state.conn > C_CONNECTED &&
   4433		    peer_state.conn < C_SYNC_SOURCE)
   4434			real_peer_disk = D_INCONSISTENT;
   4435
   4436		/* if peer_state changes to connected at the same time,
   4437		 * it explicitly notifies us that it finished resync.
   4438		 * Maybe we should finish it up, too? */
   4439		else if (os.conn >= C_SYNC_SOURCE &&
   4440			 peer_state.conn == C_CONNECTED) {
   4441			if (drbd_bm_total_weight(device) <= device->rs_failed)
   4442				drbd_resync_finished(device);
   4443			return 0;
   4444		}
   4445	}
   4446
   4447	/* explicit verify finished notification, stop sector reached. */
   4448	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
   4449	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
   4450		ov_out_of_sync_print(device);
   4451		drbd_resync_finished(device);
   4452		return 0;
   4453	}
   4454
   4455	/* peer says his disk is inconsistent, while we think it is uptodate,
   4456	 * and this happens while the peer still thinks we have a sync going on,
   4457	 * but we think we are already done with the sync.
   4458	 * We ignore this to avoid flapping pdsk.
   4459	 * This should not happen, if the peer is a recent version of drbd. */
   4460	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
   4461	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
   4462		real_peer_disk = D_UP_TO_DATE;
   4463
   4464	if (ns.conn == C_WF_REPORT_PARAMS)
   4465		ns.conn = C_CONNECTED;
   4466
   4467	if (peer_state.conn == C_AHEAD)
   4468		ns.conn = C_BEHIND;
   4469
   4470	/* TODO:
   4471	 * if (primary and diskless and peer uuid != effective uuid)
   4472	 *     abort attach on peer;
   4473	 *
   4474	 * If this node does not have good data, was already connected, but
   4475	 * the peer did a late attach only now, trying to "negotiate" with me,
   4476	 * AND I am currently Primary, possibly frozen, with some specific
   4477	 * "effective" uuid, this should never be reached, really, because
   4478	 * we first send the uuids, then the current state.
   4479	 *
   4480	 * In this scenario, we already dropped the connection hard
   4481	 * when we received the unsuitable uuids (receive_uuids().
   4482	 *
   4483	 * Should we want to change this, that is: not drop the connection in
   4484	 * receive_uuids() already, then we would need to add a branch here
   4485	 * that aborts the attach of "unsuitable uuids" on the peer in case
   4486	 * this node is currently Diskless Primary.
   4487	 */
   4488
   4489	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
   4490	    get_ldev_if_state(device, D_NEGOTIATING)) {
   4491		int cr; /* consider resync */
   4492
   4493		/* if we established a new connection */
   4494		cr  = (os.conn < C_CONNECTED);
   4495		/* if we had an established connection
   4496		 * and one of the nodes newly attaches a disk */
   4497		cr |= (os.conn == C_CONNECTED &&
   4498		       (peer_state.disk == D_NEGOTIATING ||
   4499			os.disk == D_NEGOTIATING));
   4500		/* if we have both been inconsistent, and the peer has been
   4501		 * forced to be UpToDate with --force */
   4502		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
   4503		/* if we had been plain connected, and the admin requested to
   4504		 * start a sync by "invalidate" or "invalidate-remote" */
   4505		cr |= (os.conn == C_CONNECTED &&
   4506				(peer_state.conn >= C_STARTING_SYNC_S &&
   4507				 peer_state.conn <= C_WF_BITMAP_T));
   4508
   4509		if (cr)
   4510			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
   4511
   4512		put_ldev(device);
   4513		if (ns.conn == C_MASK) {
   4514			ns.conn = C_CONNECTED;
   4515			if (device->state.disk == D_NEGOTIATING) {
   4516				drbd_force_state(device, NS(disk, D_FAILED));
   4517			} else if (peer_state.disk == D_NEGOTIATING) {
   4518				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
   4519				peer_state.disk = D_DISKLESS;
   4520				real_peer_disk = D_DISKLESS;
   4521			} else {
   4522				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
   4523					return -EIO;
   4524				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
   4525				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4526				return -EIO;
   4527			}
   4528		}
   4529	}
   4530
   4531	spin_lock_irq(&device->resource->req_lock);
   4532	if (os.i != drbd_read_state(device).i)
   4533		goto retry;
   4534	clear_bit(CONSIDER_RESYNC, &device->flags);
   4535	ns.peer = peer_state.role;
   4536	ns.pdsk = real_peer_disk;
   4537	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
   4538	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
   4539		ns.disk = device->new_state_tmp.disk;
   4540	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
   4541	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
   4542	    test_bit(NEW_CUR_UUID, &device->flags)) {
   4543		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
   4544		   for temporal network outages! */
   4545		spin_unlock_irq(&device->resource->req_lock);
   4546		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
   4547		tl_clear(peer_device->connection);
   4548		drbd_uuid_new_current(device);
   4549		clear_bit(NEW_CUR_UUID, &device->flags);
   4550		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
   4551		return -EIO;
   4552	}
   4553	rv = _drbd_set_state(device, ns, cs_flags, NULL);
   4554	ns = drbd_read_state(device);
   4555	spin_unlock_irq(&device->resource->req_lock);
   4556
   4557	if (rv < SS_SUCCESS) {
   4558		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
   4559		return -EIO;
   4560	}
   4561
   4562	if (os.conn > C_WF_REPORT_PARAMS) {
   4563		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
   4564		    peer_state.disk != D_NEGOTIATING ) {
   4565			/* we want resync, peer has not yet decided to sync... */
   4566			/* Nowadays only used when forcing a node into primary role and
   4567			   setting its disk to UpToDate with that */
   4568			drbd_send_uuids(peer_device);
   4569			drbd_send_current_state(peer_device);
   4570		}
   4571	}
   4572
   4573	clear_bit(DISCARD_MY_DATA, &device->flags);
   4574
   4575	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
   4576
   4577	return 0;
   4578}
   4579
   4580static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
   4581{
   4582	struct drbd_peer_device *peer_device;
   4583	struct drbd_device *device;
   4584	struct p_rs_uuid *p = pi->data;
   4585
   4586	peer_device = conn_peer_device(connection, pi->vnr);
   4587	if (!peer_device)
   4588		return -EIO;
   4589	device = peer_device->device;
   4590
   4591	wait_event(device->misc_wait,
   4592		   device->state.conn == C_WF_SYNC_UUID ||
   4593		   device->state.conn == C_BEHIND ||
   4594		   device->state.conn < C_CONNECTED ||
   4595		   device->state.disk < D_NEGOTIATING);
   4596
   4597	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
   4598
   4599	/* Here the _drbd_uuid_ functions are right, current should
   4600	   _not_ be rotated into the history */
   4601	if (get_ldev_if_state(device, D_NEGOTIATING)) {
   4602		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
   4603		_drbd_uuid_set(device, UI_BITMAP, 0UL);
   4604
   4605		drbd_print_uuids(device, "updated sync uuid");
   4606		drbd_start_resync(device, C_SYNC_TARGET);
   4607
   4608		put_ldev(device);
   4609	} else
   4610		drbd_err(device, "Ignoring SyncUUID packet!\n");
   4611
   4612	return 0;
   4613}
   4614
   4615/*
   4616 * receive_bitmap_plain
   4617 *
   4618 * Return 0 when done, 1 when another iteration is needed, and a negative error
   4619 * code upon failure.
   4620 */
   4621static int
   4622receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
   4623		     unsigned long *p, struct bm_xfer_ctx *c)
   4624{
   4625	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
   4626				 drbd_header_size(peer_device->connection);
   4627	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
   4628				       c->bm_words - c->word_offset);
   4629	unsigned int want = num_words * sizeof(*p);
   4630	int err;
   4631
   4632	if (want != size) {
   4633		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
   4634		return -EIO;
   4635	}
   4636	if (want == 0)
   4637		return 0;
   4638	err = drbd_recv_all(peer_device->connection, p, want);
   4639	if (err)
   4640		return err;
   4641
   4642	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
   4643
   4644	c->word_offset += num_words;
   4645	c->bit_offset = c->word_offset * BITS_PER_LONG;
   4646	if (c->bit_offset > c->bm_bits)
   4647		c->bit_offset = c->bm_bits;
   4648
   4649	return 1;
   4650}
   4651
   4652static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
   4653{
   4654	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
   4655}
   4656
   4657static int dcbp_get_start(struct p_compressed_bm *p)
   4658{
   4659	return (p->encoding & 0x80) != 0;
   4660}
   4661
   4662static int dcbp_get_pad_bits(struct p_compressed_bm *p)
   4663{
   4664	return (p->encoding >> 4) & 0x7;
   4665}
   4666
   4667/*
   4668 * recv_bm_rle_bits
   4669 *
   4670 * Return 0 when done, 1 when another iteration is needed, and a negative error
   4671 * code upon failure.
   4672 */
   4673static int
   4674recv_bm_rle_bits(struct drbd_peer_device *peer_device,
   4675		struct p_compressed_bm *p,
   4676		 struct bm_xfer_ctx *c,
   4677		 unsigned int len)
   4678{
   4679	struct bitstream bs;
   4680	u64 look_ahead;
   4681	u64 rl;
   4682	u64 tmp;
   4683	unsigned long s = c->bit_offset;
   4684	unsigned long e;
   4685	int toggle = dcbp_get_start(p);
   4686	int have;
   4687	int bits;
   4688
   4689	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
   4690
   4691	bits = bitstream_get_bits(&bs, &look_ahead, 64);
   4692	if (bits < 0)
   4693		return -EIO;
   4694
   4695	for (have = bits; have > 0; s += rl, toggle = !toggle) {
   4696		bits = vli_decode_bits(&rl, look_ahead);
   4697		if (bits <= 0)
   4698			return -EIO;
   4699
   4700		if (toggle) {
   4701			e = s + rl -1;
   4702			if (e >= c->bm_bits) {
   4703				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
   4704				return -EIO;
   4705			}
   4706			_drbd_bm_set_bits(peer_device->device, s, e);
   4707		}
   4708
   4709		if (have < bits) {
   4710			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
   4711				have, bits, look_ahead,
   4712				(unsigned int)(bs.cur.b - p->code),
   4713				(unsigned int)bs.buf_len);
   4714			return -EIO;
   4715		}
   4716		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
   4717		if (likely(bits < 64))
   4718			look_ahead >>= bits;
   4719		else
   4720			look_ahead = 0;
   4721		have -= bits;
   4722
   4723		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
   4724		if (bits < 0)
   4725			return -EIO;
   4726		look_ahead |= tmp << have;
   4727		have += bits;
   4728	}
   4729
   4730	c->bit_offset = s;
   4731	bm_xfer_ctx_bit_to_word_offset(c);
   4732
   4733	return (s != c->bm_bits);
   4734}
   4735
   4736/*
   4737 * decode_bitmap_c
   4738 *
   4739 * Return 0 when done, 1 when another iteration is needed, and a negative error
   4740 * code upon failure.
   4741 */
   4742static int
   4743decode_bitmap_c(struct drbd_peer_device *peer_device,
   4744		struct p_compressed_bm *p,
   4745		struct bm_xfer_ctx *c,
   4746		unsigned int len)
   4747{
   4748	if (dcbp_get_code(p) == RLE_VLI_Bits)
   4749		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
   4750
   4751	/* other variants had been implemented for evaluation,
   4752	 * but have been dropped as this one turned out to be "best"
   4753	 * during all our tests. */
   4754
   4755	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
   4756	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
   4757	return -EIO;
   4758}
   4759
   4760void INFO_bm_xfer_stats(struct drbd_device *device,
   4761		const char *direction, struct bm_xfer_ctx *c)
   4762{
   4763	/* what would it take to transfer it "plaintext" */
   4764	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
   4765	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
   4766	unsigned int plain =
   4767		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
   4768		c->bm_words * sizeof(unsigned long);
   4769	unsigned int total = c->bytes[0] + c->bytes[1];
   4770	unsigned int r;
   4771
   4772	/* total can not be zero. but just in case: */
   4773	if (total == 0)
   4774		return;
   4775
   4776	/* don't report if not compressed */
   4777	if (total >= plain)
   4778		return;
   4779
   4780	/* total < plain. check for overflow, still */
   4781	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
   4782		                    : (1000 * total / plain);
   4783
   4784	if (r > 1000)
   4785		r = 1000;
   4786
   4787	r = 1000 - r;
   4788	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
   4789	     "total %u; compression: %u.%u%%\n",
   4790			direction,
   4791			c->bytes[1], c->packets[1],
   4792			c->bytes[0], c->packets[0],
   4793			total, r/10, r % 10);
   4794}
   4795
   4796/* Since we are processing the bitfield from lower addresses to higher,
   4797   it does not matter if the process it in 32 bit chunks or 64 bit
   4798   chunks as long as it is little endian. (Understand it as byte stream,
   4799   beginning with the lowest byte...) If we would use big endian
   4800   we would need to process it from the highest address to the lowest,
   4801   in order to be agnostic to the 32 vs 64 bits issue.
   4802
   4803   returns 0 on failure, 1 if we successfully received it. */
   4804static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
   4805{
   4806	struct drbd_peer_device *peer_device;
   4807	struct drbd_device *device;
   4808	struct bm_xfer_ctx c;
   4809	int err;
   4810
   4811	peer_device = conn_peer_device(connection, pi->vnr);
   4812	if (!peer_device)
   4813		return -EIO;
   4814	device = peer_device->device;
   4815
   4816	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
   4817	/* you are supposed to send additional out-of-sync information
   4818	 * if you actually set bits during this phase */
   4819
   4820	c = (struct bm_xfer_ctx) {
   4821		.bm_bits = drbd_bm_bits(device),
   4822		.bm_words = drbd_bm_words(device),
   4823	};
   4824
   4825	for(;;) {
   4826		if (pi->cmd == P_BITMAP)
   4827			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
   4828		else if (pi->cmd == P_COMPRESSED_BITMAP) {
   4829			/* MAYBE: sanity check that we speak proto >= 90,
   4830			 * and the feature is enabled! */
   4831			struct p_compressed_bm *p = pi->data;
   4832
   4833			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
   4834				drbd_err(device, "ReportCBitmap packet too large\n");
   4835				err = -EIO;
   4836				goto out;
   4837			}
   4838			if (pi->size <= sizeof(*p)) {
   4839				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
   4840				err = -EIO;
   4841				goto out;
   4842			}
   4843			err = drbd_recv_all(peer_device->connection, p, pi->size);
   4844			if (err)
   4845			       goto out;
   4846			err = decode_bitmap_c(peer_device, p, &c, pi->size);
   4847		} else {
   4848			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
   4849			err = -EIO;
   4850			goto out;
   4851		}
   4852
   4853		c.packets[pi->cmd == P_BITMAP]++;
   4854		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
   4855
   4856		if (err <= 0) {
   4857			if (err < 0)
   4858				goto out;
   4859			break;
   4860		}
   4861		err = drbd_recv_header(peer_device->connection, pi);
   4862		if (err)
   4863			goto out;
   4864	}
   4865
   4866	INFO_bm_xfer_stats(device, "receive", &c);
   4867
   4868	if (device->state.conn == C_WF_BITMAP_T) {
   4869		enum drbd_state_rv rv;
   4870
   4871		err = drbd_send_bitmap(device);
   4872		if (err)
   4873			goto out;
   4874		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
   4875		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
   4876		D_ASSERT(device, rv == SS_SUCCESS);
   4877	} else if (device->state.conn != C_WF_BITMAP_S) {
   4878		/* admin may have requested C_DISCONNECTING,
   4879		 * other threads may have noticed network errors */
   4880		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
   4881		    drbd_conn_str(device->state.conn));
   4882	}
   4883	err = 0;
   4884
   4885 out:
   4886	drbd_bm_unlock(device);
   4887	if (!err && device->state.conn == C_WF_BITMAP_S)
   4888		drbd_start_resync(device, C_SYNC_SOURCE);
   4889	return err;
   4890}
   4891
   4892static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
   4893{
   4894	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
   4895		 pi->cmd, pi->size);
   4896
   4897	return ignore_remaining_packet(connection, pi);
   4898}
   4899
   4900static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
   4901{
   4902	/* Make sure we've acked all the TCP data associated
   4903	 * with the data requests being unplugged */
   4904	tcp_sock_set_quickack(connection->data.socket->sk, 2);
   4905	return 0;
   4906}
   4907
   4908static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
   4909{
   4910	struct drbd_peer_device *peer_device;
   4911	struct drbd_device *device;
   4912	struct p_block_desc *p = pi->data;
   4913
   4914	peer_device = conn_peer_device(connection, pi->vnr);
   4915	if (!peer_device)
   4916		return -EIO;
   4917	device = peer_device->device;
   4918
   4919	switch (device->state.conn) {
   4920	case C_WF_SYNC_UUID:
   4921	case C_WF_BITMAP_T:
   4922	case C_BEHIND:
   4923			break;
   4924	default:
   4925		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
   4926				drbd_conn_str(device->state.conn));
   4927	}
   4928
   4929	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
   4930
   4931	return 0;
   4932}
   4933
   4934static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
   4935{
   4936	struct drbd_peer_device *peer_device;
   4937	struct p_block_desc *p = pi->data;
   4938	struct drbd_device *device;
   4939	sector_t sector;
   4940	int size, err = 0;
   4941
   4942	peer_device = conn_peer_device(connection, pi->vnr);
   4943	if (!peer_device)
   4944		return -EIO;
   4945	device = peer_device->device;
   4946
   4947	sector = be64_to_cpu(p->sector);
   4948	size = be32_to_cpu(p->blksize);
   4949
   4950	dec_rs_pending(device);
   4951
   4952	if (get_ldev(device)) {
   4953		struct drbd_peer_request *peer_req;
   4954		const int op = REQ_OP_WRITE_ZEROES;
   4955
   4956		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
   4957					       size, 0, GFP_NOIO);
   4958		if (!peer_req) {
   4959			put_ldev(device);
   4960			return -ENOMEM;
   4961		}
   4962
   4963		peer_req->w.cb = e_end_resync_block;
   4964		peer_req->submit_jif = jiffies;
   4965		peer_req->flags |= EE_TRIM;
   4966
   4967		spin_lock_irq(&device->resource->req_lock);
   4968		list_add_tail(&peer_req->w.list, &device->sync_ee);
   4969		spin_unlock_irq(&device->resource->req_lock);
   4970
   4971		atomic_add(pi->size >> 9, &device->rs_sect_ev);
   4972		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
   4973
   4974		if (err) {
   4975			spin_lock_irq(&device->resource->req_lock);
   4976			list_del(&peer_req->w.list);
   4977			spin_unlock_irq(&device->resource->req_lock);
   4978
   4979			drbd_free_peer_req(device, peer_req);
   4980			put_ldev(device);
   4981			err = 0;
   4982			goto fail;
   4983		}
   4984
   4985		inc_unacked(device);
   4986
   4987		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
   4988		   as well as drbd_rs_complete_io() */
   4989	} else {
   4990	fail:
   4991		drbd_rs_complete_io(device, sector);
   4992		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
   4993	}
   4994
   4995	atomic_add(size >> 9, &device->rs_sect_in);
   4996
   4997	return err;
   4998}
   4999
   5000struct data_cmd {
   5001	int expect_payload;
   5002	unsigned int pkt_size;
   5003	int (*fn)(struct drbd_connection *, struct packet_info *);
   5004};
   5005
   5006static struct data_cmd drbd_cmd_handler[] = {
   5007	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
   5008	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
   5009	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
   5010	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
   5011	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
   5012	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
   5013	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
   5014	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
   5015	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
   5016	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
   5017	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
   5018	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
   5019	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
   5020	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
   5021	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
   5022	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
   5023	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
   5024	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
   5025	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
   5026	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
   5027	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
   5028	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
   5029	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
   5030	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
   5031	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
   5032	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
   5033	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
   5034	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
   5035};
   5036
   5037static void drbdd(struct drbd_connection *connection)
   5038{
   5039	struct packet_info pi;
   5040	size_t shs; /* sub header size */
   5041	int err;
   5042
   5043	while (get_t_state(&connection->receiver) == RUNNING) {
   5044		struct data_cmd const *cmd;
   5045
   5046		drbd_thread_current_set_cpu(&connection->receiver);
   5047		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
   5048		if (drbd_recv_header_maybe_unplug(connection, &pi))
   5049			goto err_out;
   5050
   5051		cmd = &drbd_cmd_handler[pi.cmd];
   5052		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
   5053			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
   5054				 cmdname(pi.cmd), pi.cmd);
   5055			goto err_out;
   5056		}
   5057
   5058		shs = cmd->pkt_size;
   5059		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
   5060			shs += sizeof(struct o_qlim);
   5061		if (pi.size > shs && !cmd->expect_payload) {
   5062			drbd_err(connection, "No payload expected %s l:%d\n",
   5063				 cmdname(pi.cmd), pi.size);
   5064			goto err_out;
   5065		}
   5066		if (pi.size < shs) {
   5067			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
   5068				 cmdname(pi.cmd), (int)shs, pi.size);
   5069			goto err_out;
   5070		}
   5071
   5072		if (shs) {
   5073			update_receiver_timing_details(connection, drbd_recv_all_warn);
   5074			err = drbd_recv_all_warn(connection, pi.data, shs);
   5075			if (err)
   5076				goto err_out;
   5077			pi.size -= shs;
   5078		}
   5079
   5080		update_receiver_timing_details(connection, cmd->fn);
   5081		err = cmd->fn(connection, &pi);
   5082		if (err) {
   5083			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
   5084				 cmdname(pi.cmd), err, pi.size);
   5085			goto err_out;
   5086		}
   5087	}
   5088	return;
   5089
   5090    err_out:
   5091	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
   5092}
   5093
   5094static void conn_disconnect(struct drbd_connection *connection)
   5095{
   5096	struct drbd_peer_device *peer_device;
   5097	enum drbd_conns oc;
   5098	int vnr;
   5099
   5100	if (connection->cstate == C_STANDALONE)
   5101		return;
   5102
   5103	/* We are about to start the cleanup after connection loss.
   5104	 * Make sure drbd_make_request knows about that.
   5105	 * Usually we should be in some network failure state already,
   5106	 * but just in case we are not, we fix it up here.
   5107	 */
   5108	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
   5109
   5110	/* ack_receiver does not clean up anything. it must not interfere, either */
   5111	drbd_thread_stop(&connection->ack_receiver);
   5112	if (connection->ack_sender) {
   5113		destroy_workqueue(connection->ack_sender);
   5114		connection->ack_sender = NULL;
   5115	}
   5116	drbd_free_sock(connection);
   5117
   5118	rcu_read_lock();
   5119	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   5120		struct drbd_device *device = peer_device->device;
   5121		kref_get(&device->kref);
   5122		rcu_read_unlock();
   5123		drbd_disconnected(peer_device);
   5124		kref_put(&device->kref, drbd_destroy_device);
   5125		rcu_read_lock();
   5126	}
   5127	rcu_read_unlock();
   5128
   5129	if (!list_empty(&connection->current_epoch->list))
   5130		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
   5131	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
   5132	atomic_set(&connection->current_epoch->epoch_size, 0);
   5133	connection->send.seen_any_write_yet = false;
   5134
   5135	drbd_info(connection, "Connection closed\n");
   5136
   5137	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
   5138		conn_try_outdate_peer_async(connection);
   5139
   5140	spin_lock_irq(&connection->resource->req_lock);
   5141	oc = connection->cstate;
   5142	if (oc >= C_UNCONNECTED)
   5143		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
   5144
   5145	spin_unlock_irq(&connection->resource->req_lock);
   5146
   5147	if (oc == C_DISCONNECTING)
   5148		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
   5149}
   5150
   5151static int drbd_disconnected(struct drbd_peer_device *peer_device)
   5152{
   5153	struct drbd_device *device = peer_device->device;
   5154	unsigned int i;
   5155
   5156	/* wait for current activity to cease. */
   5157	spin_lock_irq(&device->resource->req_lock);
   5158	_drbd_wait_ee_list_empty(device, &device->active_ee);
   5159	_drbd_wait_ee_list_empty(device, &device->sync_ee);
   5160	_drbd_wait_ee_list_empty(device, &device->read_ee);
   5161	spin_unlock_irq(&device->resource->req_lock);
   5162
   5163	/* We do not have data structures that would allow us to
   5164	 * get the rs_pending_cnt down to 0 again.
   5165	 *  * On C_SYNC_TARGET we do not have any data structures describing
   5166	 *    the pending RSDataRequest's we have sent.
   5167	 *  * On C_SYNC_SOURCE there is no data structure that tracks
   5168	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
   5169	 *  And no, it is not the sum of the reference counts in the
   5170	 *  resync_LRU. The resync_LRU tracks the whole operation including
   5171	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
   5172	 *  on the fly. */
   5173	drbd_rs_cancel_all(device);
   5174	device->rs_total = 0;
   5175	device->rs_failed = 0;
   5176	atomic_set(&device->rs_pending_cnt, 0);
   5177	wake_up(&device->misc_wait);
   5178
   5179	del_timer_sync(&device->resync_timer);
   5180	resync_timer_fn(&device->resync_timer);
   5181
   5182	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
   5183	 * w_make_resync_request etc. which may still be on the worker queue
   5184	 * to be "canceled" */
   5185	drbd_flush_workqueue(&peer_device->connection->sender_work);
   5186
   5187	drbd_finish_peer_reqs(device);
   5188
   5189	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
   5190	   might have issued a work again. The one before drbd_finish_peer_reqs() is
   5191	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
   5192	drbd_flush_workqueue(&peer_device->connection->sender_work);
   5193
   5194	/* need to do it again, drbd_finish_peer_reqs() may have populated it
   5195	 * again via drbd_try_clear_on_disk_bm(). */
   5196	drbd_rs_cancel_all(device);
   5197
   5198	kfree(device->p_uuid);
   5199	device->p_uuid = NULL;
   5200
   5201	if (!drbd_suspended(device))
   5202		tl_clear(peer_device->connection);
   5203
   5204	drbd_md_sync(device);
   5205
   5206	if (get_ldev(device)) {
   5207		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
   5208				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
   5209		put_ldev(device);
   5210	}
   5211
   5212	/* tcp_close and release of sendpage pages can be deferred.  I don't
   5213	 * want to use SO_LINGER, because apparently it can be deferred for
   5214	 * more than 20 seconds (longest time I checked).
   5215	 *
   5216	 * Actually we don't care for exactly when the network stack does its
   5217	 * put_page(), but release our reference on these pages right here.
   5218	 */
   5219	i = drbd_free_peer_reqs(device, &device->net_ee);
   5220	if (i)
   5221		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
   5222	i = atomic_read(&device->pp_in_use_by_net);
   5223	if (i)
   5224		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
   5225	i = atomic_read(&device->pp_in_use);
   5226	if (i)
   5227		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
   5228
   5229	D_ASSERT(device, list_empty(&device->read_ee));
   5230	D_ASSERT(device, list_empty(&device->active_ee));
   5231	D_ASSERT(device, list_empty(&device->sync_ee));
   5232	D_ASSERT(device, list_empty(&device->done_ee));
   5233
   5234	return 0;
   5235}
   5236
   5237/*
   5238 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
   5239 * we can agree on is stored in agreed_pro_version.
   5240 *
   5241 * feature flags and the reserved array should be enough room for future
   5242 * enhancements of the handshake protocol, and possible plugins...
   5243 *
   5244 * for now, they are expected to be zero, but ignored.
   5245 */
   5246static int drbd_send_features(struct drbd_connection *connection)
   5247{
   5248	struct drbd_socket *sock;
   5249	struct p_connection_features *p;
   5250
   5251	sock = &connection->data;
   5252	p = conn_prepare_command(connection, sock);
   5253	if (!p)
   5254		return -EIO;
   5255	memset(p, 0, sizeof(*p));
   5256	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
   5257	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
   5258	p->feature_flags = cpu_to_be32(PRO_FEATURES);
   5259	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
   5260}
   5261
   5262/*
   5263 * return values:
   5264 *   1 yes, we have a valid connection
   5265 *   0 oops, did not work out, please try again
   5266 *  -1 peer talks different language,
   5267 *     no point in trying again, please go standalone.
   5268 */
   5269static int drbd_do_features(struct drbd_connection *connection)
   5270{
   5271	/* ASSERT current == connection->receiver ... */
   5272	struct p_connection_features *p;
   5273	const int expect = sizeof(struct p_connection_features);
   5274	struct packet_info pi;
   5275	int err;
   5276
   5277	err = drbd_send_features(connection);
   5278	if (err)
   5279		return 0;
   5280
   5281	err = drbd_recv_header(connection, &pi);
   5282	if (err)
   5283		return 0;
   5284
   5285	if (pi.cmd != P_CONNECTION_FEATURES) {
   5286		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
   5287			 cmdname(pi.cmd), pi.cmd);
   5288		return -1;
   5289	}
   5290
   5291	if (pi.size != expect) {
   5292		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
   5293		     expect, pi.size);
   5294		return -1;
   5295	}
   5296
   5297	p = pi.data;
   5298	err = drbd_recv_all_warn(connection, p, expect);
   5299	if (err)
   5300		return 0;
   5301
   5302	p->protocol_min = be32_to_cpu(p->protocol_min);
   5303	p->protocol_max = be32_to_cpu(p->protocol_max);
   5304	if (p->protocol_max == 0)
   5305		p->protocol_max = p->protocol_min;
   5306
   5307	if (PRO_VERSION_MAX < p->protocol_min ||
   5308	    PRO_VERSION_MIN > p->protocol_max)
   5309		goto incompat;
   5310
   5311	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
   5312	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
   5313
   5314	drbd_info(connection, "Handshake successful: "
   5315	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
   5316
   5317	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
   5318		  connection->agreed_features,
   5319		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
   5320		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
   5321		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
   5322		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
   5323		  connection->agreed_features ? "" : " none");
   5324
   5325	return 1;
   5326
   5327 incompat:
   5328	drbd_err(connection, "incompatible DRBD dialects: "
   5329	    "I support %d-%d, peer supports %d-%d\n",
   5330	    PRO_VERSION_MIN, PRO_VERSION_MAX,
   5331	    p->protocol_min, p->protocol_max);
   5332	return -1;
   5333}
   5334
   5335#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
   5336static int drbd_do_auth(struct drbd_connection *connection)
   5337{
   5338	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
   5339	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
   5340	return -1;
   5341}
   5342#else
   5343#define CHALLENGE_LEN 64
   5344
   5345/* Return value:
   5346	1 - auth succeeded,
   5347	0 - failed, try again (network error),
   5348	-1 - auth failed, don't try again.
   5349*/
   5350
   5351static int drbd_do_auth(struct drbd_connection *connection)
   5352{
   5353	struct drbd_socket *sock;
   5354	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
   5355	char *response = NULL;
   5356	char *right_response = NULL;
   5357	char *peers_ch = NULL;
   5358	unsigned int key_len;
   5359	char secret[SHARED_SECRET_MAX]; /* 64 byte */
   5360	unsigned int resp_size;
   5361	struct shash_desc *desc;
   5362	struct packet_info pi;
   5363	struct net_conf *nc;
   5364	int err, rv;
   5365
   5366	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
   5367
   5368	rcu_read_lock();
   5369	nc = rcu_dereference(connection->net_conf);
   5370	key_len = strlen(nc->shared_secret);
   5371	memcpy(secret, nc->shared_secret, key_len);
   5372	rcu_read_unlock();
   5373
   5374	desc = kmalloc(sizeof(struct shash_desc) +
   5375		       crypto_shash_descsize(connection->cram_hmac_tfm),
   5376		       GFP_KERNEL);
   5377	if (!desc) {
   5378		rv = -1;
   5379		goto fail;
   5380	}
   5381	desc->tfm = connection->cram_hmac_tfm;
   5382
   5383	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
   5384	if (rv) {
   5385		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
   5386		rv = -1;
   5387		goto fail;
   5388	}
   5389
   5390	get_random_bytes(my_challenge, CHALLENGE_LEN);
   5391
   5392	sock = &connection->data;
   5393	if (!conn_prepare_command(connection, sock)) {
   5394		rv = 0;
   5395		goto fail;
   5396	}
   5397	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
   5398				my_challenge, CHALLENGE_LEN);
   5399	if (!rv)
   5400		goto fail;
   5401
   5402	err = drbd_recv_header(connection, &pi);
   5403	if (err) {
   5404		rv = 0;
   5405		goto fail;
   5406	}
   5407
   5408	if (pi.cmd != P_AUTH_CHALLENGE) {
   5409		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
   5410			 cmdname(pi.cmd), pi.cmd);
   5411		rv = -1;
   5412		goto fail;
   5413	}
   5414
   5415	if (pi.size > CHALLENGE_LEN * 2) {
   5416		drbd_err(connection, "expected AuthChallenge payload too big.\n");
   5417		rv = -1;
   5418		goto fail;
   5419	}
   5420
   5421	if (pi.size < CHALLENGE_LEN) {
   5422		drbd_err(connection, "AuthChallenge payload too small.\n");
   5423		rv = -1;
   5424		goto fail;
   5425	}
   5426
   5427	peers_ch = kmalloc(pi.size, GFP_NOIO);
   5428	if (!peers_ch) {
   5429		rv = -1;
   5430		goto fail;
   5431	}
   5432
   5433	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
   5434	if (err) {
   5435		rv = 0;
   5436		goto fail;
   5437	}
   5438
   5439	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
   5440		drbd_err(connection, "Peer presented the same challenge!\n");
   5441		rv = -1;
   5442		goto fail;
   5443	}
   5444
   5445	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
   5446	response = kmalloc(resp_size, GFP_NOIO);
   5447	if (!response) {
   5448		rv = -1;
   5449		goto fail;
   5450	}
   5451
   5452	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
   5453	if (rv) {
   5454		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
   5455		rv = -1;
   5456		goto fail;
   5457	}
   5458
   5459	if (!conn_prepare_command(connection, sock)) {
   5460		rv = 0;
   5461		goto fail;
   5462	}
   5463	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
   5464				response, resp_size);
   5465	if (!rv)
   5466		goto fail;
   5467
   5468	err = drbd_recv_header(connection, &pi);
   5469	if (err) {
   5470		rv = 0;
   5471		goto fail;
   5472	}
   5473
   5474	if (pi.cmd != P_AUTH_RESPONSE) {
   5475		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
   5476			 cmdname(pi.cmd), pi.cmd);
   5477		rv = 0;
   5478		goto fail;
   5479	}
   5480
   5481	if (pi.size != resp_size) {
   5482		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
   5483		rv = 0;
   5484		goto fail;
   5485	}
   5486
   5487	err = drbd_recv_all_warn(connection, response , resp_size);
   5488	if (err) {
   5489		rv = 0;
   5490		goto fail;
   5491	}
   5492
   5493	right_response = kmalloc(resp_size, GFP_NOIO);
   5494	if (!right_response) {
   5495		rv = -1;
   5496		goto fail;
   5497	}
   5498
   5499	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
   5500				 right_response);
   5501	if (rv) {
   5502		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
   5503		rv = -1;
   5504		goto fail;
   5505	}
   5506
   5507	rv = !memcmp(response, right_response, resp_size);
   5508
   5509	if (rv)
   5510		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
   5511		     resp_size);
   5512	else
   5513		rv = -1;
   5514
   5515 fail:
   5516	kfree(peers_ch);
   5517	kfree(response);
   5518	kfree(right_response);
   5519	if (desc) {
   5520		shash_desc_zero(desc);
   5521		kfree(desc);
   5522	}
   5523
   5524	return rv;
   5525}
   5526#endif
   5527
   5528int drbd_receiver(struct drbd_thread *thi)
   5529{
   5530	struct drbd_connection *connection = thi->connection;
   5531	int h;
   5532
   5533	drbd_info(connection, "receiver (re)started\n");
   5534
   5535	do {
   5536		h = conn_connect(connection);
   5537		if (h == 0) {
   5538			conn_disconnect(connection);
   5539			schedule_timeout_interruptible(HZ);
   5540		}
   5541		if (h == -1) {
   5542			drbd_warn(connection, "Discarding network configuration.\n");
   5543			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
   5544		}
   5545	} while (h == 0);
   5546
   5547	if (h > 0) {
   5548		blk_start_plug(&connection->receiver_plug);
   5549		drbdd(connection);
   5550		blk_finish_plug(&connection->receiver_plug);
   5551	}
   5552
   5553	conn_disconnect(connection);
   5554
   5555	drbd_info(connection, "receiver terminated\n");
   5556	return 0;
   5557}
   5558
   5559/* ********* acknowledge sender ******** */
   5560
   5561static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
   5562{
   5563	struct p_req_state_reply *p = pi->data;
   5564	int retcode = be32_to_cpu(p->retcode);
   5565
   5566	if (retcode >= SS_SUCCESS) {
   5567		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
   5568	} else {
   5569		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
   5570		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
   5571			 drbd_set_st_err_str(retcode), retcode);
   5572	}
   5573	wake_up(&connection->ping_wait);
   5574
   5575	return 0;
   5576}
   5577
   5578static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
   5579{
   5580	struct drbd_peer_device *peer_device;
   5581	struct drbd_device *device;
   5582	struct p_req_state_reply *p = pi->data;
   5583	int retcode = be32_to_cpu(p->retcode);
   5584
   5585	peer_device = conn_peer_device(connection, pi->vnr);
   5586	if (!peer_device)
   5587		return -EIO;
   5588	device = peer_device->device;
   5589
   5590	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
   5591		D_ASSERT(device, connection->agreed_pro_version < 100);
   5592		return got_conn_RqSReply(connection, pi);
   5593	}
   5594
   5595	if (retcode >= SS_SUCCESS) {
   5596		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
   5597	} else {
   5598		set_bit(CL_ST_CHG_FAIL, &device->flags);
   5599		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
   5600			drbd_set_st_err_str(retcode), retcode);
   5601	}
   5602	wake_up(&device->state_wait);
   5603
   5604	return 0;
   5605}
   5606
   5607static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
   5608{
   5609	return drbd_send_ping_ack(connection);
   5610
   5611}
   5612
   5613static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
   5614{
   5615	/* restore idle timeout */
   5616	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
   5617	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
   5618		wake_up(&connection->ping_wait);
   5619
   5620	return 0;
   5621}
   5622
   5623static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
   5624{
   5625	struct drbd_peer_device *peer_device;
   5626	struct drbd_device *device;
   5627	struct p_block_ack *p = pi->data;
   5628	sector_t sector = be64_to_cpu(p->sector);
   5629	int blksize = be32_to_cpu(p->blksize);
   5630
   5631	peer_device = conn_peer_device(connection, pi->vnr);
   5632	if (!peer_device)
   5633		return -EIO;
   5634	device = peer_device->device;
   5635
   5636	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
   5637
   5638	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5639
   5640	if (get_ldev(device)) {
   5641		drbd_rs_complete_io(device, sector);
   5642		drbd_set_in_sync(device, sector, blksize);
   5643		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
   5644		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
   5645		put_ldev(device);
   5646	}
   5647	dec_rs_pending(device);
   5648	atomic_add(blksize >> 9, &device->rs_sect_in);
   5649
   5650	return 0;
   5651}
   5652
   5653static int
   5654validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
   5655			      struct rb_root *root, const char *func,
   5656			      enum drbd_req_event what, bool missing_ok)
   5657{
   5658	struct drbd_request *req;
   5659	struct bio_and_error m;
   5660
   5661	spin_lock_irq(&device->resource->req_lock);
   5662	req = find_request(device, root, id, sector, missing_ok, func);
   5663	if (unlikely(!req)) {
   5664		spin_unlock_irq(&device->resource->req_lock);
   5665		return -EIO;
   5666	}
   5667	__req_mod(req, what, &m);
   5668	spin_unlock_irq(&device->resource->req_lock);
   5669
   5670	if (m.bio)
   5671		complete_master_bio(device, &m);
   5672	return 0;
   5673}
   5674
   5675static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
   5676{
   5677	struct drbd_peer_device *peer_device;
   5678	struct drbd_device *device;
   5679	struct p_block_ack *p = pi->data;
   5680	sector_t sector = be64_to_cpu(p->sector);
   5681	int blksize = be32_to_cpu(p->blksize);
   5682	enum drbd_req_event what;
   5683
   5684	peer_device = conn_peer_device(connection, pi->vnr);
   5685	if (!peer_device)
   5686		return -EIO;
   5687	device = peer_device->device;
   5688
   5689	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5690
   5691	if (p->block_id == ID_SYNCER) {
   5692		drbd_set_in_sync(device, sector, blksize);
   5693		dec_rs_pending(device);
   5694		return 0;
   5695	}
   5696	switch (pi->cmd) {
   5697	case P_RS_WRITE_ACK:
   5698		what = WRITE_ACKED_BY_PEER_AND_SIS;
   5699		break;
   5700	case P_WRITE_ACK:
   5701		what = WRITE_ACKED_BY_PEER;
   5702		break;
   5703	case P_RECV_ACK:
   5704		what = RECV_ACKED_BY_PEER;
   5705		break;
   5706	case P_SUPERSEDED:
   5707		what = CONFLICT_RESOLVED;
   5708		break;
   5709	case P_RETRY_WRITE:
   5710		what = POSTPONE_WRITE;
   5711		break;
   5712	default:
   5713		BUG();
   5714	}
   5715
   5716	return validate_req_change_req_state(device, p->block_id, sector,
   5717					     &device->write_requests, __func__,
   5718					     what, false);
   5719}
   5720
   5721static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
   5722{
   5723	struct drbd_peer_device *peer_device;
   5724	struct drbd_device *device;
   5725	struct p_block_ack *p = pi->data;
   5726	sector_t sector = be64_to_cpu(p->sector);
   5727	int size = be32_to_cpu(p->blksize);
   5728	int err;
   5729
   5730	peer_device = conn_peer_device(connection, pi->vnr);
   5731	if (!peer_device)
   5732		return -EIO;
   5733	device = peer_device->device;
   5734
   5735	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5736
   5737	if (p->block_id == ID_SYNCER) {
   5738		dec_rs_pending(device);
   5739		drbd_rs_failed_io(device, sector, size);
   5740		return 0;
   5741	}
   5742
   5743	err = validate_req_change_req_state(device, p->block_id, sector,
   5744					    &device->write_requests, __func__,
   5745					    NEG_ACKED, true);
   5746	if (err) {
   5747		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
   5748		   The master bio might already be completed, therefore the
   5749		   request is no longer in the collision hash. */
   5750		/* In Protocol B we might already have got a P_RECV_ACK
   5751		   but then get a P_NEG_ACK afterwards. */
   5752		drbd_set_out_of_sync(device, sector, size);
   5753	}
   5754	return 0;
   5755}
   5756
   5757static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
   5758{
   5759	struct drbd_peer_device *peer_device;
   5760	struct drbd_device *device;
   5761	struct p_block_ack *p = pi->data;
   5762	sector_t sector = be64_to_cpu(p->sector);
   5763
   5764	peer_device = conn_peer_device(connection, pi->vnr);
   5765	if (!peer_device)
   5766		return -EIO;
   5767	device = peer_device->device;
   5768
   5769	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5770
   5771	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
   5772	    (unsigned long long)sector, be32_to_cpu(p->blksize));
   5773
   5774	return validate_req_change_req_state(device, p->block_id, sector,
   5775					     &device->read_requests, __func__,
   5776					     NEG_ACKED, false);
   5777}
   5778
   5779static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
   5780{
   5781	struct drbd_peer_device *peer_device;
   5782	struct drbd_device *device;
   5783	sector_t sector;
   5784	int size;
   5785	struct p_block_ack *p = pi->data;
   5786
   5787	peer_device = conn_peer_device(connection, pi->vnr);
   5788	if (!peer_device)
   5789		return -EIO;
   5790	device = peer_device->device;
   5791
   5792	sector = be64_to_cpu(p->sector);
   5793	size = be32_to_cpu(p->blksize);
   5794
   5795	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5796
   5797	dec_rs_pending(device);
   5798
   5799	if (get_ldev_if_state(device, D_FAILED)) {
   5800		drbd_rs_complete_io(device, sector);
   5801		switch (pi->cmd) {
   5802		case P_NEG_RS_DREPLY:
   5803			drbd_rs_failed_io(device, sector, size);
   5804			break;
   5805		case P_RS_CANCEL:
   5806			break;
   5807		default:
   5808			BUG();
   5809		}
   5810		put_ldev(device);
   5811	}
   5812
   5813	return 0;
   5814}
   5815
   5816static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
   5817{
   5818	struct p_barrier_ack *p = pi->data;
   5819	struct drbd_peer_device *peer_device;
   5820	int vnr;
   5821
   5822	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
   5823
   5824	rcu_read_lock();
   5825	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
   5826		struct drbd_device *device = peer_device->device;
   5827
   5828		if (device->state.conn == C_AHEAD &&
   5829		    atomic_read(&device->ap_in_flight) == 0 &&
   5830		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
   5831			device->start_resync_timer.expires = jiffies + HZ;
   5832			add_timer(&device->start_resync_timer);
   5833		}
   5834	}
   5835	rcu_read_unlock();
   5836
   5837	return 0;
   5838}
   5839
   5840static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
   5841{
   5842	struct drbd_peer_device *peer_device;
   5843	struct drbd_device *device;
   5844	struct p_block_ack *p = pi->data;
   5845	struct drbd_device_work *dw;
   5846	sector_t sector;
   5847	int size;
   5848
   5849	peer_device = conn_peer_device(connection, pi->vnr);
   5850	if (!peer_device)
   5851		return -EIO;
   5852	device = peer_device->device;
   5853
   5854	sector = be64_to_cpu(p->sector);
   5855	size = be32_to_cpu(p->blksize);
   5856
   5857	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
   5858
   5859	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
   5860		drbd_ov_out_of_sync_found(device, sector, size);
   5861	else
   5862		ov_out_of_sync_print(device);
   5863
   5864	if (!get_ldev(device))
   5865		return 0;
   5866
   5867	drbd_rs_complete_io(device, sector);
   5868	dec_rs_pending(device);
   5869
   5870	--device->ov_left;
   5871
   5872	/* let's advance progress step marks only for every other megabyte */
   5873	if ((device->ov_left & 0x200) == 0x200)
   5874		drbd_advance_rs_marks(device, device->ov_left);
   5875
   5876	if (device->ov_left == 0) {
   5877		dw = kmalloc(sizeof(*dw), GFP_NOIO);
   5878		if (dw) {
   5879			dw->w.cb = w_ov_finished;
   5880			dw->device = device;
   5881			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
   5882		} else {
   5883			drbd_err(device, "kmalloc(dw) failed.");
   5884			ov_out_of_sync_print(device);
   5885			drbd_resync_finished(device);
   5886		}
   5887	}
   5888	put_ldev(device);
   5889	return 0;
   5890}
   5891
   5892static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
   5893{
   5894	return 0;
   5895}
   5896
   5897struct meta_sock_cmd {
   5898	size_t pkt_size;
   5899	int (*fn)(struct drbd_connection *connection, struct packet_info *);
   5900};
   5901
   5902static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
   5903{
   5904	long t;
   5905	struct net_conf *nc;
   5906
   5907	rcu_read_lock();
   5908	nc = rcu_dereference(connection->net_conf);
   5909	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
   5910	rcu_read_unlock();
   5911
   5912	t *= HZ;
   5913	if (ping_timeout)
   5914		t /= 10;
   5915
   5916	connection->meta.socket->sk->sk_rcvtimeo = t;
   5917}
   5918
   5919static void set_ping_timeout(struct drbd_connection *connection)
   5920{
   5921	set_rcvtimeo(connection, 1);
   5922}
   5923
   5924static void set_idle_timeout(struct drbd_connection *connection)
   5925{
   5926	set_rcvtimeo(connection, 0);
   5927}
   5928
   5929static struct meta_sock_cmd ack_receiver_tbl[] = {
   5930	[P_PING]	    = { 0, got_Ping },
   5931	[P_PING_ACK]	    = { 0, got_PingAck },
   5932	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
   5933	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
   5934	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
   5935	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
   5936	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
   5937	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
   5938	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
   5939	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
   5940	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
   5941	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
   5942	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
   5943	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
   5944	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
   5945	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
   5946	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
   5947};
   5948
   5949int drbd_ack_receiver(struct drbd_thread *thi)
   5950{
   5951	struct drbd_connection *connection = thi->connection;
   5952	struct meta_sock_cmd *cmd = NULL;
   5953	struct packet_info pi;
   5954	unsigned long pre_recv_jif;
   5955	int rv;
   5956	void *buf    = connection->meta.rbuf;
   5957	int received = 0;
   5958	unsigned int header_size = drbd_header_size(connection);
   5959	int expect   = header_size;
   5960	bool ping_timeout_active = false;
   5961
   5962	sched_set_fifo_low(current);
   5963
   5964	while (get_t_state(thi) == RUNNING) {
   5965		drbd_thread_current_set_cpu(thi);
   5966
   5967		conn_reclaim_net_peer_reqs(connection);
   5968
   5969		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
   5970			if (drbd_send_ping(connection)) {
   5971				drbd_err(connection, "drbd_send_ping has failed\n");
   5972				goto reconnect;
   5973			}
   5974			set_ping_timeout(connection);
   5975			ping_timeout_active = true;
   5976		}
   5977
   5978		pre_recv_jif = jiffies;
   5979		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
   5980
   5981		/* Note:
   5982		 * -EINTR	 (on meta) we got a signal
   5983		 * -EAGAIN	 (on meta) rcvtimeo expired
   5984		 * -ECONNRESET	 other side closed the connection
   5985		 * -ERESTARTSYS  (on data) we got a signal
   5986		 * rv <  0	 other than above: unexpected error!
   5987		 * rv == expected: full header or command
   5988		 * rv <  expected: "woken" by signal during receive
   5989		 * rv == 0	 : "connection shut down by peer"
   5990		 */
   5991		if (likely(rv > 0)) {
   5992			received += rv;
   5993			buf	 += rv;
   5994		} else if (rv == 0) {
   5995			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
   5996				long t;
   5997				rcu_read_lock();
   5998				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
   5999				rcu_read_unlock();
   6000
   6001				t = wait_event_timeout(connection->ping_wait,
   6002						       connection->cstate < C_WF_REPORT_PARAMS,
   6003						       t);
   6004				if (t)
   6005					break;
   6006			}
   6007			drbd_err(connection, "meta connection shut down by peer.\n");
   6008			goto reconnect;
   6009		} else if (rv == -EAGAIN) {
   6010			/* If the data socket received something meanwhile,
   6011			 * that is good enough: peer is still alive. */
   6012			if (time_after(connection->last_received, pre_recv_jif))
   6013				continue;
   6014			if (ping_timeout_active) {
   6015				drbd_err(connection, "PingAck did not arrive in time.\n");
   6016				goto reconnect;
   6017			}
   6018			set_bit(SEND_PING, &connection->flags);
   6019			continue;
   6020		} else if (rv == -EINTR) {
   6021			/* maybe drbd_thread_stop(): the while condition will notice.
   6022			 * maybe woken for send_ping: we'll send a ping above,
   6023			 * and change the rcvtimeo */
   6024			flush_signals(current);
   6025			continue;
   6026		} else {
   6027			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
   6028			goto reconnect;
   6029		}
   6030
   6031		if (received == expect && cmd == NULL) {
   6032			if (decode_header(connection, connection->meta.rbuf, &pi))
   6033				goto reconnect;
   6034			cmd = &ack_receiver_tbl[pi.cmd];
   6035			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
   6036				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
   6037					 cmdname(pi.cmd), pi.cmd);
   6038				goto disconnect;
   6039			}
   6040			expect = header_size + cmd->pkt_size;
   6041			if (pi.size != expect - header_size) {
   6042				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
   6043					pi.cmd, pi.size);
   6044				goto reconnect;
   6045			}
   6046		}
   6047		if (received == expect) {
   6048			bool err;
   6049
   6050			err = cmd->fn(connection, &pi);
   6051			if (err) {
   6052				drbd_err(connection, "%ps failed\n", cmd->fn);
   6053				goto reconnect;
   6054			}
   6055
   6056			connection->last_received = jiffies;
   6057
   6058			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
   6059				set_idle_timeout(connection);
   6060				ping_timeout_active = false;
   6061			}
   6062
   6063			buf	 = connection->meta.rbuf;
   6064			received = 0;
   6065			expect	 = header_size;
   6066			cmd	 = NULL;
   6067		}
   6068	}
   6069
   6070	if (0) {
   6071reconnect:
   6072		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
   6073		conn_md_sync(connection);
   6074	}
   6075	if (0) {
   6076disconnect:
   6077		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
   6078	}
   6079
   6080	drbd_info(connection, "ack_receiver terminated\n");
   6081
   6082	return 0;
   6083}
   6084
   6085void drbd_send_acks_wf(struct work_struct *ws)
   6086{
   6087	struct drbd_peer_device *peer_device =
   6088		container_of(ws, struct drbd_peer_device, send_acks_work);
   6089	struct drbd_connection *connection = peer_device->connection;
   6090	struct drbd_device *device = peer_device->device;
   6091	struct net_conf *nc;
   6092	int tcp_cork, err;
   6093
   6094	rcu_read_lock();
   6095	nc = rcu_dereference(connection->net_conf);
   6096	tcp_cork = nc->tcp_cork;
   6097	rcu_read_unlock();
   6098
   6099	if (tcp_cork)
   6100		tcp_sock_set_cork(connection->meta.socket->sk, true);
   6101
   6102	err = drbd_finish_peer_reqs(device);
   6103	kref_put(&device->kref, drbd_destroy_device);
   6104	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
   6105	   struct work_struct send_acks_work alive, which is in the peer_device object */
   6106
   6107	if (err) {
   6108		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
   6109		return;
   6110	}
   6111
   6112	if (tcp_cork)
   6113		tcp_sock_set_cork(connection->meta.socket->sk, false);
   6114
   6115	return;
   6116}