cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

lowcomms.c (48383B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/******************************************************************************
      3*******************************************************************************
      4**
      5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
      6**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
      7**
      8**
      9*******************************************************************************
     10******************************************************************************/
     11
     12/*
     13 * lowcomms.c
     14 *
     15 * This is the "low-level" comms layer.
     16 *
     17 * It is responsible for sending/receiving messages
     18 * from other nodes in the cluster.
     19 *
     20 * Cluster nodes are referred to by their nodeids. nodeids are
     21 * simply 32 bit numbers to the locking module - if they need to
     22 * be expanded for the cluster infrastructure then that is its
     23 * responsibility. It is this layer's
     24 * responsibility to resolve these into IP address or
     25 * whatever it needs for inter-node communication.
     26 *
     27 * The comms level is two kernel threads that deal mainly with
     28 * the receiving of messages from other nodes and passing them
     29 * up to the mid-level comms layer (which understands the
     30 * message format) for execution by the locking core, and
     31 * a send thread which does all the setting up of connections
     32 * to remote nodes and the sending of data. Threads are not allowed
     33 * to send their own data because it may cause them to wait in times
     34 * of high load. Also, this way, the sending thread can collect together
     35 * messages bound for one node and send them in one block.
     36 *
     37 * lowcomms will choose to use either TCP or SCTP as its transport layer
     38 * depending on the configuration variable 'protocol'. This should be set
     39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
     40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
     41 * for the DLM to function.
     42 *
     43 */
     44
     45#include <asm/ioctls.h>
     46#include <net/sock.h>
     47#include <net/tcp.h>
     48#include <linux/pagemap.h>
     49#include <linux/file.h>
     50#include <linux/mutex.h>
     51#include <linux/sctp.h>
     52#include <linux/slab.h>
     53#include <net/sctp/sctp.h>
     54#include <net/ipv6.h>
     55
     56#include <trace/events/dlm.h>
     57
     58#include "dlm_internal.h"
     59#include "lowcomms.h"
     60#include "midcomms.h"
     61#include "memory.h"
     62#include "config.h"
     63
     64#define NEEDED_RMEM (4*1024*1024)
     65
     66/* Number of messages to send before rescheduling */
     67#define MAX_SEND_MSG_COUNT 25
     68#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
     69
     70struct connection {
     71	struct socket *sock;	/* NULL if not connected */
     72	uint32_t nodeid;	/* So we know who we are in the list */
     73	struct mutex sock_mutex;
     74	unsigned long flags;
     75#define CF_READ_PENDING 1
     76#define CF_WRITE_PENDING 2
     77#define CF_INIT_PENDING 4
     78#define CF_IS_OTHERCON 5
     79#define CF_CLOSE 6
     80#define CF_APP_LIMITED 7
     81#define CF_CLOSING 8
     82#define CF_SHUTDOWN 9
     83#define CF_CONNECTED 10
     84#define CF_RECONNECT 11
     85#define CF_DELAY_CONNECT 12
     86#define CF_EOF 13
     87	struct list_head writequeue;  /* List of outgoing writequeue_entries */
     88	spinlock_t writequeue_lock;
     89	atomic_t writequeue_cnt;
     90	int retries;
     91#define MAX_CONNECT_RETRIES 3
     92	struct hlist_node list;
     93	struct connection *othercon;
     94	struct connection *sendcon;
     95	struct work_struct rwork; /* Receive workqueue */
     96	struct work_struct swork; /* Send workqueue */
     97	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
     98	unsigned char *rx_buf;
     99	int rx_buflen;
    100	int rx_leftover;
    101	struct rcu_head rcu;
    102};
    103#define sock2con(x) ((struct connection *)(x)->sk_user_data)
    104
    105struct listen_connection {
    106	struct socket *sock;
    107	struct work_struct rwork;
    108};
    109
    110#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
    111#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
    112
    113/* An entry waiting to be sent */
    114struct writequeue_entry {
    115	struct list_head list;
    116	struct page *page;
    117	int offset;
    118	int len;
    119	int end;
    120	int users;
    121	bool dirty;
    122	struct connection *con;
    123	struct list_head msgs;
    124	struct kref ref;
    125};
    126
    127struct dlm_msg {
    128	struct writequeue_entry *entry;
    129	struct dlm_msg *orig_msg;
    130	bool retransmit;
    131	void *ppc;
    132	int len;
    133	int idx; /* new()/commit() idx exchange */
    134
    135	struct list_head list;
    136	struct kref ref;
    137};
    138
    139struct dlm_node_addr {
    140	struct list_head list;
    141	int nodeid;
    142	int mark;
    143	int addr_count;
    144	int curr_addr_index;
    145	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
    146};
    147
    148struct dlm_proto_ops {
    149	bool try_new_addr;
    150	const char *name;
    151	int proto;
    152
    153	int (*connect)(struct connection *con, struct socket *sock,
    154		       struct sockaddr *addr, int addr_len);
    155	void (*sockopts)(struct socket *sock);
    156	int (*bind)(struct socket *sock);
    157	int (*listen_validate)(void);
    158	void (*listen_sockopts)(struct socket *sock);
    159	int (*listen_bind)(struct socket *sock);
    160	/* What to do to shutdown */
    161	void (*shutdown_action)(struct connection *con);
    162	/* What to do to eof check */
    163	bool (*eof_condition)(struct connection *con);
    164};
    165
    166static struct listen_sock_callbacks {
    167	void (*sk_error_report)(struct sock *);
    168	void (*sk_data_ready)(struct sock *);
    169	void (*sk_state_change)(struct sock *);
    170	void (*sk_write_space)(struct sock *);
    171} listen_sock;
    172
    173static LIST_HEAD(dlm_node_addrs);
    174static DEFINE_SPINLOCK(dlm_node_addrs_spin);
    175
    176static struct listen_connection listen_con;
    177static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
    178static int dlm_local_count;
    179int dlm_allow_conn;
    180
    181/* Work queues */
    182static struct workqueue_struct *recv_workqueue;
    183static struct workqueue_struct *send_workqueue;
    184
    185static struct hlist_head connection_hash[CONN_HASH_SIZE];
    186static DEFINE_SPINLOCK(connections_lock);
    187DEFINE_STATIC_SRCU(connections_srcu);
    188
    189static const struct dlm_proto_ops *dlm_proto_ops;
    190
    191static void process_recv_sockets(struct work_struct *work);
    192static void process_send_sockets(struct work_struct *work);
    193
    194static void writequeue_entry_ctor(void *data)
    195{
    196	struct writequeue_entry *entry = data;
    197
    198	INIT_LIST_HEAD(&entry->msgs);
    199}
    200
    201struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
    202{
    203	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
    204				 0, 0, writequeue_entry_ctor);
    205}
    206
    207struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
    208{
    209	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
    210}
    211
    212/* need to held writequeue_lock */
    213static struct writequeue_entry *con_next_wq(struct connection *con)
    214{
    215	struct writequeue_entry *e;
    216
    217	if (list_empty(&con->writequeue))
    218		return NULL;
    219
    220	e = list_first_entry(&con->writequeue, struct writequeue_entry,
    221			     list);
    222	/* if len is zero nothing is to send, if there are users filling
    223	 * buffers we wait until the users are done so we can send more.
    224	 */
    225	if (e->users || e->len == 0)
    226		return NULL;
    227
    228	return e;
    229}
    230
    231static struct connection *__find_con(int nodeid, int r)
    232{
    233	struct connection *con;
    234
    235	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
    236		if (con->nodeid == nodeid)
    237			return con;
    238	}
    239
    240	return NULL;
    241}
    242
    243static bool tcp_eof_condition(struct connection *con)
    244{
    245	return atomic_read(&con->writequeue_cnt);
    246}
    247
    248static int dlm_con_init(struct connection *con, int nodeid)
    249{
    250	con->rx_buflen = dlm_config.ci_buffer_size;
    251	con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
    252	if (!con->rx_buf)
    253		return -ENOMEM;
    254
    255	con->nodeid = nodeid;
    256	mutex_init(&con->sock_mutex);
    257	INIT_LIST_HEAD(&con->writequeue);
    258	spin_lock_init(&con->writequeue_lock);
    259	atomic_set(&con->writequeue_cnt, 0);
    260	INIT_WORK(&con->swork, process_send_sockets);
    261	INIT_WORK(&con->rwork, process_recv_sockets);
    262	init_waitqueue_head(&con->shutdown_wait);
    263
    264	return 0;
    265}
    266
    267/*
    268 * If 'allocation' is zero then we don't attempt to create a new
    269 * connection structure for this node.
    270 */
    271static struct connection *nodeid2con(int nodeid, gfp_t alloc)
    272{
    273	struct connection *con, *tmp;
    274	int r, ret;
    275
    276	r = nodeid_hash(nodeid);
    277	con = __find_con(nodeid, r);
    278	if (con || !alloc)
    279		return con;
    280
    281	con = kzalloc(sizeof(*con), alloc);
    282	if (!con)
    283		return NULL;
    284
    285	ret = dlm_con_init(con, nodeid);
    286	if (ret) {
    287		kfree(con);
    288		return NULL;
    289	}
    290
    291	spin_lock(&connections_lock);
    292	/* Because multiple workqueues/threads calls this function it can
    293	 * race on multiple cpu's. Instead of locking hot path __find_con()
    294	 * we just check in rare cases of recently added nodes again
    295	 * under protection of connections_lock. If this is the case we
    296	 * abort our connection creation and return the existing connection.
    297	 */
    298	tmp = __find_con(nodeid, r);
    299	if (tmp) {
    300		spin_unlock(&connections_lock);
    301		kfree(con->rx_buf);
    302		kfree(con);
    303		return tmp;
    304	}
    305
    306	hlist_add_head_rcu(&con->list, &connection_hash[r]);
    307	spin_unlock(&connections_lock);
    308
    309	return con;
    310}
    311
    312/* Loop round all connections */
    313static void foreach_conn(void (*conn_func)(struct connection *c))
    314{
    315	int i;
    316	struct connection *con;
    317
    318	for (i = 0; i < CONN_HASH_SIZE; i++) {
    319		hlist_for_each_entry_rcu(con, &connection_hash[i], list)
    320			conn_func(con);
    321	}
    322}
    323
    324static struct dlm_node_addr *find_node_addr(int nodeid)
    325{
    326	struct dlm_node_addr *na;
    327
    328	list_for_each_entry(na, &dlm_node_addrs, list) {
    329		if (na->nodeid == nodeid)
    330			return na;
    331	}
    332	return NULL;
    333}
    334
    335static int addr_compare(const struct sockaddr_storage *x,
    336			const struct sockaddr_storage *y)
    337{
    338	switch (x->ss_family) {
    339	case AF_INET: {
    340		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
    341		struct sockaddr_in *siny = (struct sockaddr_in *)y;
    342		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
    343			return 0;
    344		if (sinx->sin_port != siny->sin_port)
    345			return 0;
    346		break;
    347	}
    348	case AF_INET6: {
    349		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
    350		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
    351		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
    352			return 0;
    353		if (sinx->sin6_port != siny->sin6_port)
    354			return 0;
    355		break;
    356	}
    357	default:
    358		return 0;
    359	}
    360	return 1;
    361}
    362
    363static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
    364			  struct sockaddr *sa_out, bool try_new_addr,
    365			  unsigned int *mark)
    366{
    367	struct sockaddr_storage sas;
    368	struct dlm_node_addr *na;
    369
    370	if (!dlm_local_count)
    371		return -1;
    372
    373	spin_lock(&dlm_node_addrs_spin);
    374	na = find_node_addr(nodeid);
    375	if (na && na->addr_count) {
    376		memcpy(&sas, na->addr[na->curr_addr_index],
    377		       sizeof(struct sockaddr_storage));
    378
    379		if (try_new_addr) {
    380			na->curr_addr_index++;
    381			if (na->curr_addr_index == na->addr_count)
    382				na->curr_addr_index = 0;
    383		}
    384	}
    385	spin_unlock(&dlm_node_addrs_spin);
    386
    387	if (!na)
    388		return -EEXIST;
    389
    390	if (!na->addr_count)
    391		return -ENOENT;
    392
    393	*mark = na->mark;
    394
    395	if (sas_out)
    396		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
    397
    398	if (!sa_out)
    399		return 0;
    400
    401	if (dlm_local_addr[0]->ss_family == AF_INET) {
    402		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
    403		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
    404		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
    405	} else {
    406		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
    407		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
    408		ret6->sin6_addr = in6->sin6_addr;
    409	}
    410
    411	return 0;
    412}
    413
    414static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
    415			  unsigned int *mark)
    416{
    417	struct dlm_node_addr *na;
    418	int rv = -EEXIST;
    419	int addr_i;
    420
    421	spin_lock(&dlm_node_addrs_spin);
    422	list_for_each_entry(na, &dlm_node_addrs, list) {
    423		if (!na->addr_count)
    424			continue;
    425
    426		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
    427			if (addr_compare(na->addr[addr_i], addr)) {
    428				*nodeid = na->nodeid;
    429				*mark = na->mark;
    430				rv = 0;
    431				goto unlock;
    432			}
    433		}
    434	}
    435unlock:
    436	spin_unlock(&dlm_node_addrs_spin);
    437	return rv;
    438}
    439
    440/* caller need to held dlm_node_addrs_spin lock */
    441static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
    442				     const struct sockaddr_storage *addr)
    443{
    444	int i;
    445
    446	for (i = 0; i < na->addr_count; i++) {
    447		if (addr_compare(na->addr[i], addr))
    448			return true;
    449	}
    450
    451	return false;
    452}
    453
    454int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
    455{
    456	struct sockaddr_storage *new_addr;
    457	struct dlm_node_addr *new_node, *na;
    458	bool ret;
    459
    460	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
    461	if (!new_node)
    462		return -ENOMEM;
    463
    464	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
    465	if (!new_addr) {
    466		kfree(new_node);
    467		return -ENOMEM;
    468	}
    469
    470	memcpy(new_addr, addr, len);
    471
    472	spin_lock(&dlm_node_addrs_spin);
    473	na = find_node_addr(nodeid);
    474	if (!na) {
    475		new_node->nodeid = nodeid;
    476		new_node->addr[0] = new_addr;
    477		new_node->addr_count = 1;
    478		new_node->mark = dlm_config.ci_mark;
    479		list_add(&new_node->list, &dlm_node_addrs);
    480		spin_unlock(&dlm_node_addrs_spin);
    481		return 0;
    482	}
    483
    484	ret = dlm_lowcomms_na_has_addr(na, addr);
    485	if (ret) {
    486		spin_unlock(&dlm_node_addrs_spin);
    487		kfree(new_addr);
    488		kfree(new_node);
    489		return -EEXIST;
    490	}
    491
    492	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
    493		spin_unlock(&dlm_node_addrs_spin);
    494		kfree(new_addr);
    495		kfree(new_node);
    496		return -ENOSPC;
    497	}
    498
    499	na->addr[na->addr_count++] = new_addr;
    500	spin_unlock(&dlm_node_addrs_spin);
    501	kfree(new_node);
    502	return 0;
    503}
    504
    505/* Data available on socket or listen socket received a connect */
    506static void lowcomms_data_ready(struct sock *sk)
    507{
    508	struct connection *con;
    509
    510	con = sock2con(sk);
    511	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
    512		queue_work(recv_workqueue, &con->rwork);
    513}
    514
    515static void lowcomms_listen_data_ready(struct sock *sk)
    516{
    517	if (!dlm_allow_conn)
    518		return;
    519
    520	queue_work(recv_workqueue, &listen_con.rwork);
    521}
    522
    523static void lowcomms_write_space(struct sock *sk)
    524{
    525	struct connection *con;
    526
    527	con = sock2con(sk);
    528	if (!con)
    529		return;
    530
    531	if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
    532		log_print("successful connected to node %d", con->nodeid);
    533		queue_work(send_workqueue, &con->swork);
    534		return;
    535	}
    536
    537	clear_bit(SOCK_NOSPACE, &con->sock->flags);
    538
    539	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
    540		con->sock->sk->sk_write_pending--;
    541		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
    542	}
    543
    544	queue_work(send_workqueue, &con->swork);
    545}
    546
    547static inline void lowcomms_connect_sock(struct connection *con)
    548{
    549	if (test_bit(CF_CLOSE, &con->flags))
    550		return;
    551	queue_work(send_workqueue, &con->swork);
    552	cond_resched();
    553}
    554
    555static void lowcomms_state_change(struct sock *sk)
    556{
    557	/* SCTP layer is not calling sk_data_ready when the connection
    558	 * is done, so we catch the signal through here. Also, it
    559	 * doesn't switch socket state when entering shutdown, so we
    560	 * skip the write in that case.
    561	 */
    562	if (sk->sk_shutdown) {
    563		if (sk->sk_shutdown == RCV_SHUTDOWN)
    564			lowcomms_data_ready(sk);
    565	} else if (sk->sk_state == TCP_ESTABLISHED) {
    566		lowcomms_write_space(sk);
    567	}
    568}
    569
    570int dlm_lowcomms_connect_node(int nodeid)
    571{
    572	struct connection *con;
    573	int idx;
    574
    575	if (nodeid == dlm_our_nodeid())
    576		return 0;
    577
    578	idx = srcu_read_lock(&connections_srcu);
    579	con = nodeid2con(nodeid, GFP_NOFS);
    580	if (!con) {
    581		srcu_read_unlock(&connections_srcu, idx);
    582		return -ENOMEM;
    583	}
    584
    585	lowcomms_connect_sock(con);
    586	srcu_read_unlock(&connections_srcu, idx);
    587
    588	return 0;
    589}
    590
    591int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
    592{
    593	struct dlm_node_addr *na;
    594
    595	spin_lock(&dlm_node_addrs_spin);
    596	na = find_node_addr(nodeid);
    597	if (!na) {
    598		spin_unlock(&dlm_node_addrs_spin);
    599		return -ENOENT;
    600	}
    601
    602	na->mark = mark;
    603	spin_unlock(&dlm_node_addrs_spin);
    604
    605	return 0;
    606}
    607
    608static void lowcomms_error_report(struct sock *sk)
    609{
    610	struct connection *con;
    611	void (*orig_report)(struct sock *) = NULL;
    612	struct inet_sock *inet;
    613
    614	con = sock2con(sk);
    615	if (con == NULL)
    616		goto out;
    617
    618	orig_report = listen_sock.sk_error_report;
    619
    620	inet = inet_sk(sk);
    621	switch (sk->sk_family) {
    622	case AF_INET:
    623		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
    624				   "sending to node %d at %pI4, dport %d, "
    625				   "sk_err=%d/%d\n", dlm_our_nodeid(),
    626				   con->nodeid, &inet->inet_daddr,
    627				   ntohs(inet->inet_dport), sk->sk_err,
    628				   sk->sk_err_soft);
    629		break;
    630#if IS_ENABLED(CONFIG_IPV6)
    631	case AF_INET6:
    632		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
    633				   "sending to node %d at %pI6c, "
    634				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
    635				   con->nodeid, &sk->sk_v6_daddr,
    636				   ntohs(inet->inet_dport), sk->sk_err,
    637				   sk->sk_err_soft);
    638		break;
    639#endif
    640	default:
    641		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
    642				   "invalid socket family %d set, "
    643				   "sk_err=%d/%d\n", dlm_our_nodeid(),
    644				   sk->sk_family, sk->sk_err, sk->sk_err_soft);
    645		goto out;
    646	}
    647
    648	/* below sendcon only handling */
    649	if (test_bit(CF_IS_OTHERCON, &con->flags))
    650		con = con->sendcon;
    651
    652	switch (sk->sk_err) {
    653	case ECONNREFUSED:
    654		set_bit(CF_DELAY_CONNECT, &con->flags);
    655		break;
    656	default:
    657		break;
    658	}
    659
    660	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
    661		queue_work(send_workqueue, &con->swork);
    662
    663out:
    664	if (orig_report)
    665		orig_report(sk);
    666}
    667
    668/* Note: sk_callback_lock must be locked before calling this function. */
    669static void save_listen_callbacks(struct socket *sock)
    670{
    671	struct sock *sk = sock->sk;
    672
    673	listen_sock.sk_data_ready = sk->sk_data_ready;
    674	listen_sock.sk_state_change = sk->sk_state_change;
    675	listen_sock.sk_write_space = sk->sk_write_space;
    676	listen_sock.sk_error_report = sk->sk_error_report;
    677}
    678
    679static void restore_callbacks(struct socket *sock)
    680{
    681	struct sock *sk = sock->sk;
    682
    683	lock_sock(sk);
    684	sk->sk_user_data = NULL;
    685	sk->sk_data_ready = listen_sock.sk_data_ready;
    686	sk->sk_state_change = listen_sock.sk_state_change;
    687	sk->sk_write_space = listen_sock.sk_write_space;
    688	sk->sk_error_report = listen_sock.sk_error_report;
    689	release_sock(sk);
    690}
    691
    692static void add_listen_sock(struct socket *sock, struct listen_connection *con)
    693{
    694	struct sock *sk = sock->sk;
    695
    696	lock_sock(sk);
    697	save_listen_callbacks(sock);
    698	con->sock = sock;
    699
    700	sk->sk_user_data = con;
    701	sk->sk_allocation = GFP_NOFS;
    702	/* Install a data_ready callback */
    703	sk->sk_data_ready = lowcomms_listen_data_ready;
    704	release_sock(sk);
    705}
    706
    707/* Make a socket active */
    708static void add_sock(struct socket *sock, struct connection *con)
    709{
    710	struct sock *sk = sock->sk;
    711
    712	lock_sock(sk);
    713	con->sock = sock;
    714
    715	sk->sk_user_data = con;
    716	/* Install a data_ready callback */
    717	sk->sk_data_ready = lowcomms_data_ready;
    718	sk->sk_write_space = lowcomms_write_space;
    719	sk->sk_state_change = lowcomms_state_change;
    720	sk->sk_allocation = GFP_NOFS;
    721	sk->sk_error_report = lowcomms_error_report;
    722	release_sock(sk);
    723}
    724
    725/* Add the port number to an IPv6 or 4 sockaddr and return the address
    726   length */
    727static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
    728			  int *addr_len)
    729{
    730	saddr->ss_family =  dlm_local_addr[0]->ss_family;
    731	if (saddr->ss_family == AF_INET) {
    732		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
    733		in4_addr->sin_port = cpu_to_be16(port);
    734		*addr_len = sizeof(struct sockaddr_in);
    735		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
    736	} else {
    737		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
    738		in6_addr->sin6_port = cpu_to_be16(port);
    739		*addr_len = sizeof(struct sockaddr_in6);
    740	}
    741	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
    742}
    743
    744static void dlm_page_release(struct kref *kref)
    745{
    746	struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
    747						  ref);
    748
    749	__free_page(e->page);
    750	dlm_free_writequeue(e);
    751}
    752
    753static void dlm_msg_release(struct kref *kref)
    754{
    755	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
    756
    757	kref_put(&msg->entry->ref, dlm_page_release);
    758	dlm_free_msg(msg);
    759}
    760
    761static void free_entry(struct writequeue_entry *e)
    762{
    763	struct dlm_msg *msg, *tmp;
    764
    765	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
    766		if (msg->orig_msg) {
    767			msg->orig_msg->retransmit = false;
    768			kref_put(&msg->orig_msg->ref, dlm_msg_release);
    769		}
    770
    771		list_del(&msg->list);
    772		kref_put(&msg->ref, dlm_msg_release);
    773	}
    774
    775	list_del(&e->list);
    776	atomic_dec(&e->con->writequeue_cnt);
    777	kref_put(&e->ref, dlm_page_release);
    778}
    779
    780static void dlm_close_sock(struct socket **sock)
    781{
    782	if (*sock) {
    783		restore_callbacks(*sock);
    784		sock_release(*sock);
    785		*sock = NULL;
    786	}
    787}
    788
    789/* Close a remote connection and tidy up */
    790static void close_connection(struct connection *con, bool and_other,
    791			     bool tx, bool rx)
    792{
    793	bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
    794	struct writequeue_entry *e;
    795
    796	if (tx && !closing && cancel_work_sync(&con->swork)) {
    797		log_print("canceled swork for node %d", con->nodeid);
    798		clear_bit(CF_WRITE_PENDING, &con->flags);
    799	}
    800	if (rx && !closing && cancel_work_sync(&con->rwork)) {
    801		log_print("canceled rwork for node %d", con->nodeid);
    802		clear_bit(CF_READ_PENDING, &con->flags);
    803	}
    804
    805	mutex_lock(&con->sock_mutex);
    806	dlm_close_sock(&con->sock);
    807
    808	if (con->othercon && and_other) {
    809		/* Will only re-enter once. */
    810		close_connection(con->othercon, false, tx, rx);
    811	}
    812
    813	/* if we send a writequeue entry only a half way, we drop the
    814	 * whole entry because reconnection and that we not start of the
    815	 * middle of a msg which will confuse the other end.
    816	 *
    817	 * we can always drop messages because retransmits, but what we
    818	 * cannot allow is to transmit half messages which may be processed
    819	 * at the other side.
    820	 *
    821	 * our policy is to start on a clean state when disconnects, we don't
    822	 * know what's send/received on transport layer in this case.
    823	 */
    824	spin_lock(&con->writequeue_lock);
    825	if (!list_empty(&con->writequeue)) {
    826		e = list_first_entry(&con->writequeue, struct writequeue_entry,
    827				     list);
    828		if (e->dirty)
    829			free_entry(e);
    830	}
    831	spin_unlock(&con->writequeue_lock);
    832
    833	con->rx_leftover = 0;
    834	con->retries = 0;
    835	clear_bit(CF_APP_LIMITED, &con->flags);
    836	clear_bit(CF_CONNECTED, &con->flags);
    837	clear_bit(CF_DELAY_CONNECT, &con->flags);
    838	clear_bit(CF_RECONNECT, &con->flags);
    839	clear_bit(CF_EOF, &con->flags);
    840	mutex_unlock(&con->sock_mutex);
    841	clear_bit(CF_CLOSING, &con->flags);
    842}
    843
    844static void shutdown_connection(struct connection *con)
    845{
    846	int ret;
    847
    848	flush_work(&con->swork);
    849
    850	mutex_lock(&con->sock_mutex);
    851	/* nothing to shutdown */
    852	if (!con->sock) {
    853		mutex_unlock(&con->sock_mutex);
    854		return;
    855	}
    856
    857	set_bit(CF_SHUTDOWN, &con->flags);
    858	ret = kernel_sock_shutdown(con->sock, SHUT_WR);
    859	mutex_unlock(&con->sock_mutex);
    860	if (ret) {
    861		log_print("Connection %p failed to shutdown: %d will force close",
    862			  con, ret);
    863		goto force_close;
    864	} else {
    865		ret = wait_event_timeout(con->shutdown_wait,
    866					 !test_bit(CF_SHUTDOWN, &con->flags),
    867					 DLM_SHUTDOWN_WAIT_TIMEOUT);
    868		if (ret == 0) {
    869			log_print("Connection %p shutdown timed out, will force close",
    870				  con);
    871			goto force_close;
    872		}
    873	}
    874
    875	return;
    876
    877force_close:
    878	clear_bit(CF_SHUTDOWN, &con->flags);
    879	close_connection(con, false, true, true);
    880}
    881
    882static void dlm_tcp_shutdown(struct connection *con)
    883{
    884	if (con->othercon)
    885		shutdown_connection(con->othercon);
    886	shutdown_connection(con);
    887}
    888
    889static int con_realloc_receive_buf(struct connection *con, int newlen)
    890{
    891	unsigned char *newbuf;
    892
    893	newbuf = kmalloc(newlen, GFP_NOFS);
    894	if (!newbuf)
    895		return -ENOMEM;
    896
    897	/* copy any leftover from last receive */
    898	if (con->rx_leftover)
    899		memmove(newbuf, con->rx_buf, con->rx_leftover);
    900
    901	/* swap to new buffer space */
    902	kfree(con->rx_buf);
    903	con->rx_buflen = newlen;
    904	con->rx_buf = newbuf;
    905
    906	return 0;
    907}
    908
    909/* Data received from remote end */
    910static int receive_from_sock(struct connection *con)
    911{
    912	struct msghdr msg;
    913	struct kvec iov;
    914	int ret, buflen;
    915
    916	mutex_lock(&con->sock_mutex);
    917
    918	if (con->sock == NULL) {
    919		ret = -EAGAIN;
    920		goto out_close;
    921	}
    922
    923	/* realloc if we get new buffer size to read out */
    924	buflen = dlm_config.ci_buffer_size;
    925	if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
    926		ret = con_realloc_receive_buf(con, buflen);
    927		if (ret < 0)
    928			goto out_resched;
    929	}
    930
    931	for (;;) {
    932		/* calculate new buffer parameter regarding last receive and
    933		 * possible leftover bytes
    934		 */
    935		iov.iov_base = con->rx_buf + con->rx_leftover;
    936		iov.iov_len = con->rx_buflen - con->rx_leftover;
    937
    938		memset(&msg, 0, sizeof(msg));
    939		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
    940		ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
    941				     msg.msg_flags);
    942		trace_dlm_recv(con->nodeid, ret);
    943		if (ret == -EAGAIN)
    944			break;
    945		else if (ret <= 0)
    946			goto out_close;
    947
    948		/* new buflen according readed bytes and leftover from last receive */
    949		buflen = ret + con->rx_leftover;
    950		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
    951		if (ret < 0)
    952			goto out_close;
    953
    954		/* calculate leftover bytes from process and put it into begin of
    955		 * the receive buffer, so next receive we have the full message
    956		 * at the start address of the receive buffer.
    957		 */
    958		con->rx_leftover = buflen - ret;
    959		if (con->rx_leftover) {
    960			memmove(con->rx_buf, con->rx_buf + ret,
    961				con->rx_leftover);
    962		}
    963	}
    964
    965	dlm_midcomms_receive_done(con->nodeid);
    966	mutex_unlock(&con->sock_mutex);
    967	return 0;
    968
    969out_resched:
    970	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
    971		queue_work(recv_workqueue, &con->rwork);
    972	mutex_unlock(&con->sock_mutex);
    973	return -EAGAIN;
    974
    975out_close:
    976	if (ret == 0) {
    977		log_print("connection %p got EOF from %d",
    978			  con, con->nodeid);
    979
    980		if (dlm_proto_ops->eof_condition &&
    981		    dlm_proto_ops->eof_condition(con)) {
    982			set_bit(CF_EOF, &con->flags);
    983			mutex_unlock(&con->sock_mutex);
    984		} else {
    985			mutex_unlock(&con->sock_mutex);
    986			close_connection(con, false, true, false);
    987
    988			/* handling for tcp shutdown */
    989			clear_bit(CF_SHUTDOWN, &con->flags);
    990			wake_up(&con->shutdown_wait);
    991		}
    992
    993		/* signal to breaking receive worker */
    994		ret = -1;
    995	} else {
    996		mutex_unlock(&con->sock_mutex);
    997	}
    998	return ret;
    999}
   1000
   1001/* Listening socket is busy, accept a connection */
   1002static int accept_from_sock(struct listen_connection *con)
   1003{
   1004	int result;
   1005	struct sockaddr_storage peeraddr;
   1006	struct socket *newsock;
   1007	int len, idx;
   1008	int nodeid;
   1009	struct connection *newcon;
   1010	struct connection *addcon;
   1011	unsigned int mark;
   1012
   1013	if (!con->sock)
   1014		return -ENOTCONN;
   1015
   1016	result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
   1017	if (result < 0)
   1018		goto accept_err;
   1019
   1020	/* Get the connected socket's peer */
   1021	memset(&peeraddr, 0, sizeof(peeraddr));
   1022	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
   1023	if (len < 0) {
   1024		result = -ECONNABORTED;
   1025		goto accept_err;
   1026	}
   1027
   1028	/* Get the new node's NODEID */
   1029	make_sockaddr(&peeraddr, 0, &len);
   1030	if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
   1031		switch (peeraddr.ss_family) {
   1032		case AF_INET: {
   1033			struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
   1034
   1035			log_print("connect from non cluster IPv4 node %pI4",
   1036				  &sin->sin_addr);
   1037			break;
   1038		}
   1039#if IS_ENABLED(CONFIG_IPV6)
   1040		case AF_INET6: {
   1041			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
   1042
   1043			log_print("connect from non cluster IPv6 node %pI6c",
   1044				  &sin6->sin6_addr);
   1045			break;
   1046		}
   1047#endif
   1048		default:
   1049			log_print("invalid family from non cluster node");
   1050			break;
   1051		}
   1052
   1053		sock_release(newsock);
   1054		return -1;
   1055	}
   1056
   1057	log_print("got connection from %d", nodeid);
   1058
   1059	/*  Check to see if we already have a connection to this node. This
   1060	 *  could happen if the two nodes initiate a connection at roughly
   1061	 *  the same time and the connections cross on the wire.
   1062	 *  In this case we store the incoming one in "othercon"
   1063	 */
   1064	idx = srcu_read_lock(&connections_srcu);
   1065	newcon = nodeid2con(nodeid, GFP_NOFS);
   1066	if (!newcon) {
   1067		srcu_read_unlock(&connections_srcu, idx);
   1068		result = -ENOMEM;
   1069		goto accept_err;
   1070	}
   1071
   1072	sock_set_mark(newsock->sk, mark);
   1073
   1074	mutex_lock(&newcon->sock_mutex);
   1075	if (newcon->sock) {
   1076		struct connection *othercon = newcon->othercon;
   1077
   1078		if (!othercon) {
   1079			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
   1080			if (!othercon) {
   1081				log_print("failed to allocate incoming socket");
   1082				mutex_unlock(&newcon->sock_mutex);
   1083				srcu_read_unlock(&connections_srcu, idx);
   1084				result = -ENOMEM;
   1085				goto accept_err;
   1086			}
   1087
   1088			result = dlm_con_init(othercon, nodeid);
   1089			if (result < 0) {
   1090				kfree(othercon);
   1091				mutex_unlock(&newcon->sock_mutex);
   1092				srcu_read_unlock(&connections_srcu, idx);
   1093				goto accept_err;
   1094			}
   1095
   1096			lockdep_set_subclass(&othercon->sock_mutex, 1);
   1097			set_bit(CF_IS_OTHERCON, &othercon->flags);
   1098			newcon->othercon = othercon;
   1099			othercon->sendcon = newcon;
   1100		} else {
   1101			/* close other sock con if we have something new */
   1102			close_connection(othercon, false, true, false);
   1103		}
   1104
   1105		mutex_lock(&othercon->sock_mutex);
   1106		add_sock(newsock, othercon);
   1107		addcon = othercon;
   1108		mutex_unlock(&othercon->sock_mutex);
   1109	}
   1110	else {
   1111		/* accept copies the sk after we've saved the callbacks, so we
   1112		   don't want to save them a second time or comm errors will
   1113		   result in calling sk_error_report recursively. */
   1114		add_sock(newsock, newcon);
   1115		addcon = newcon;
   1116	}
   1117
   1118	set_bit(CF_CONNECTED, &addcon->flags);
   1119	mutex_unlock(&newcon->sock_mutex);
   1120
   1121	/*
   1122	 * Add it to the active queue in case we got data
   1123	 * between processing the accept adding the socket
   1124	 * to the read_sockets list
   1125	 */
   1126	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
   1127		queue_work(recv_workqueue, &addcon->rwork);
   1128
   1129	srcu_read_unlock(&connections_srcu, idx);
   1130
   1131	return 0;
   1132
   1133accept_err:
   1134	if (newsock)
   1135		sock_release(newsock);
   1136
   1137	if (result != -EAGAIN)
   1138		log_print("error accepting connection from node: %d", result);
   1139	return result;
   1140}
   1141
   1142/*
   1143 * writequeue_entry_complete - try to delete and free write queue entry
   1144 * @e: write queue entry to try to delete
   1145 * @completed: bytes completed
   1146 *
   1147 * writequeue_lock must be held.
   1148 */
   1149static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
   1150{
   1151	e->offset += completed;
   1152	e->len -= completed;
   1153	/* signal that page was half way transmitted */
   1154	e->dirty = true;
   1155
   1156	if (e->len == 0 && e->users == 0)
   1157		free_entry(e);
   1158}
   1159
   1160/*
   1161 * sctp_bind_addrs - bind a SCTP socket to all our addresses
   1162 */
   1163static int sctp_bind_addrs(struct socket *sock, uint16_t port)
   1164{
   1165	struct sockaddr_storage localaddr;
   1166	struct sockaddr *addr = (struct sockaddr *)&localaddr;
   1167	int i, addr_len, result = 0;
   1168
   1169	for (i = 0; i < dlm_local_count; i++) {
   1170		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
   1171		make_sockaddr(&localaddr, port, &addr_len);
   1172
   1173		if (!i)
   1174			result = kernel_bind(sock, addr, addr_len);
   1175		else
   1176			result = sock_bind_add(sock->sk, addr, addr_len);
   1177
   1178		if (result < 0) {
   1179			log_print("Can't bind to %d addr number %d, %d.\n",
   1180				  port, i + 1, result);
   1181			break;
   1182		}
   1183	}
   1184	return result;
   1185}
   1186
   1187/* Get local addresses */
   1188static void init_local(void)
   1189{
   1190	struct sockaddr_storage sas, *addr;
   1191	int i;
   1192
   1193	dlm_local_count = 0;
   1194	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
   1195		if (dlm_our_addr(&sas, i))
   1196			break;
   1197
   1198		addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
   1199		if (!addr)
   1200			break;
   1201		dlm_local_addr[dlm_local_count++] = addr;
   1202	}
   1203}
   1204
   1205static void deinit_local(void)
   1206{
   1207	int i;
   1208
   1209	for (i = 0; i < dlm_local_count; i++)
   1210		kfree(dlm_local_addr[i]);
   1211}
   1212
   1213static struct writequeue_entry *new_writequeue_entry(struct connection *con)
   1214{
   1215	struct writequeue_entry *entry;
   1216
   1217	entry = dlm_allocate_writequeue();
   1218	if (!entry)
   1219		return NULL;
   1220
   1221	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
   1222	if (!entry->page) {
   1223		dlm_free_writequeue(entry);
   1224		return NULL;
   1225	}
   1226
   1227	entry->offset = 0;
   1228	entry->len = 0;
   1229	entry->end = 0;
   1230	entry->dirty = false;
   1231	entry->con = con;
   1232	entry->users = 1;
   1233	kref_init(&entry->ref);
   1234	return entry;
   1235}
   1236
   1237static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
   1238					     char **ppc, void (*cb)(void *data),
   1239					     void *data)
   1240{
   1241	struct writequeue_entry *e;
   1242
   1243	spin_lock(&con->writequeue_lock);
   1244	if (!list_empty(&con->writequeue)) {
   1245		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
   1246		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
   1247			kref_get(&e->ref);
   1248
   1249			*ppc = page_address(e->page) + e->end;
   1250			if (cb)
   1251				cb(data);
   1252
   1253			e->end += len;
   1254			e->users++;
   1255			goto out;
   1256		}
   1257	}
   1258
   1259	e = new_writequeue_entry(con);
   1260	if (!e)
   1261		goto out;
   1262
   1263	kref_get(&e->ref);
   1264	*ppc = page_address(e->page);
   1265	e->end += len;
   1266	atomic_inc(&con->writequeue_cnt);
   1267	if (cb)
   1268		cb(data);
   1269
   1270	list_add_tail(&e->list, &con->writequeue);
   1271
   1272out:
   1273	spin_unlock(&con->writequeue_lock);
   1274	return e;
   1275};
   1276
   1277static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
   1278						gfp_t allocation, char **ppc,
   1279						void (*cb)(void *data),
   1280						void *data)
   1281{
   1282	struct writequeue_entry *e;
   1283	struct dlm_msg *msg;
   1284
   1285	msg = dlm_allocate_msg(allocation);
   1286	if (!msg)
   1287		return NULL;
   1288
   1289	kref_init(&msg->ref);
   1290
   1291	e = new_wq_entry(con, len, ppc, cb, data);
   1292	if (!e) {
   1293		dlm_free_msg(msg);
   1294		return NULL;
   1295	}
   1296
   1297	msg->retransmit = false;
   1298	msg->orig_msg = NULL;
   1299	msg->ppc = *ppc;
   1300	msg->len = len;
   1301	msg->entry = e;
   1302
   1303	return msg;
   1304}
   1305
   1306/* avoid false positive for nodes_srcu, unlock happens in
   1307 * dlm_lowcomms_commit_msg which is a must call if success
   1308 */
   1309#ifndef __CHECKER__
   1310struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
   1311				     char **ppc, void (*cb)(void *data),
   1312				     void *data)
   1313{
   1314	struct connection *con;
   1315	struct dlm_msg *msg;
   1316	int idx;
   1317
   1318	if (len > DLM_MAX_SOCKET_BUFSIZE ||
   1319	    len < sizeof(struct dlm_header)) {
   1320		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
   1321		log_print("failed to allocate a buffer of size %d", len);
   1322		WARN_ON(1);
   1323		return NULL;
   1324	}
   1325
   1326	idx = srcu_read_lock(&connections_srcu);
   1327	con = nodeid2con(nodeid, allocation);
   1328	if (!con) {
   1329		srcu_read_unlock(&connections_srcu, idx);
   1330		return NULL;
   1331	}
   1332
   1333	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
   1334	if (!msg) {
   1335		srcu_read_unlock(&connections_srcu, idx);
   1336		return NULL;
   1337	}
   1338
   1339	/* we assume if successful commit must called */
   1340	msg->idx = idx;
   1341	return msg;
   1342}
   1343#endif
   1344
   1345static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
   1346{
   1347	struct writequeue_entry *e = msg->entry;
   1348	struct connection *con = e->con;
   1349	int users;
   1350
   1351	spin_lock(&con->writequeue_lock);
   1352	kref_get(&msg->ref);
   1353	list_add(&msg->list, &e->msgs);
   1354
   1355	users = --e->users;
   1356	if (users)
   1357		goto out;
   1358
   1359	e->len = DLM_WQ_LENGTH_BYTES(e);
   1360	spin_unlock(&con->writequeue_lock);
   1361
   1362	queue_work(send_workqueue, &con->swork);
   1363	return;
   1364
   1365out:
   1366	spin_unlock(&con->writequeue_lock);
   1367	return;
   1368}
   1369
   1370/* avoid false positive for nodes_srcu, lock was happen in
   1371 * dlm_lowcomms_new_msg
   1372 */
   1373#ifndef __CHECKER__
   1374void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
   1375{
   1376	_dlm_lowcomms_commit_msg(msg);
   1377	srcu_read_unlock(&connections_srcu, msg->idx);
   1378}
   1379#endif
   1380
   1381void dlm_lowcomms_put_msg(struct dlm_msg *msg)
   1382{
   1383	kref_put(&msg->ref, dlm_msg_release);
   1384}
   1385
   1386/* does not held connections_srcu, usage workqueue only */
   1387int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
   1388{
   1389	struct dlm_msg *msg_resend;
   1390	char *ppc;
   1391
   1392	if (msg->retransmit)
   1393		return 1;
   1394
   1395	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
   1396					      GFP_ATOMIC, &ppc, NULL, NULL);
   1397	if (!msg_resend)
   1398		return -ENOMEM;
   1399
   1400	msg->retransmit = true;
   1401	kref_get(&msg->ref);
   1402	msg_resend->orig_msg = msg;
   1403
   1404	memcpy(ppc, msg->ppc, msg->len);
   1405	_dlm_lowcomms_commit_msg(msg_resend);
   1406	dlm_lowcomms_put_msg(msg_resend);
   1407
   1408	return 0;
   1409}
   1410
   1411/* Send a message */
   1412static void send_to_sock(struct connection *con)
   1413{
   1414	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
   1415	struct writequeue_entry *e;
   1416	int len, offset, ret;
   1417	int count = 0;
   1418
   1419	mutex_lock(&con->sock_mutex);
   1420	if (con->sock == NULL)
   1421		goto out_connect;
   1422
   1423	spin_lock(&con->writequeue_lock);
   1424	for (;;) {
   1425		e = con_next_wq(con);
   1426		if (!e)
   1427			break;
   1428
   1429		len = e->len;
   1430		offset = e->offset;
   1431		BUG_ON(len == 0 && e->users == 0);
   1432		spin_unlock(&con->writequeue_lock);
   1433
   1434		ret = kernel_sendpage(con->sock, e->page, offset, len,
   1435				      msg_flags);
   1436		trace_dlm_send(con->nodeid, ret);
   1437		if (ret == -EAGAIN || ret == 0) {
   1438			if (ret == -EAGAIN &&
   1439			    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
   1440			    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
   1441				/* Notify TCP that we're limited by the
   1442				 * application window size.
   1443				 */
   1444				set_bit(SOCK_NOSPACE, &con->sock->flags);
   1445				con->sock->sk->sk_write_pending++;
   1446			}
   1447			cond_resched();
   1448			goto out;
   1449		} else if (ret < 0)
   1450			goto out;
   1451
   1452		/* Don't starve people filling buffers */
   1453		if (++count >= MAX_SEND_MSG_COUNT) {
   1454			cond_resched();
   1455			count = 0;
   1456		}
   1457
   1458		spin_lock(&con->writequeue_lock);
   1459		writequeue_entry_complete(e, ret);
   1460	}
   1461	spin_unlock(&con->writequeue_lock);
   1462
   1463	/* close if we got EOF */
   1464	if (test_and_clear_bit(CF_EOF, &con->flags)) {
   1465		mutex_unlock(&con->sock_mutex);
   1466		close_connection(con, false, false, true);
   1467
   1468		/* handling for tcp shutdown */
   1469		clear_bit(CF_SHUTDOWN, &con->flags);
   1470		wake_up(&con->shutdown_wait);
   1471	} else {
   1472		mutex_unlock(&con->sock_mutex);
   1473	}
   1474
   1475	return;
   1476
   1477out:
   1478	mutex_unlock(&con->sock_mutex);
   1479	return;
   1480
   1481out_connect:
   1482	mutex_unlock(&con->sock_mutex);
   1483	queue_work(send_workqueue, &con->swork);
   1484	cond_resched();
   1485}
   1486
   1487static void clean_one_writequeue(struct connection *con)
   1488{
   1489	struct writequeue_entry *e, *safe;
   1490
   1491	spin_lock(&con->writequeue_lock);
   1492	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
   1493		free_entry(e);
   1494	}
   1495	spin_unlock(&con->writequeue_lock);
   1496}
   1497
   1498/* Called from recovery when it knows that a node has
   1499   left the cluster */
   1500int dlm_lowcomms_close(int nodeid)
   1501{
   1502	struct connection *con;
   1503	struct dlm_node_addr *na;
   1504	int idx;
   1505
   1506	log_print("closing connection to node %d", nodeid);
   1507	idx = srcu_read_lock(&connections_srcu);
   1508	con = nodeid2con(nodeid, 0);
   1509	if (con) {
   1510		set_bit(CF_CLOSE, &con->flags);
   1511		close_connection(con, true, true, true);
   1512		clean_one_writequeue(con);
   1513		if (con->othercon)
   1514			clean_one_writequeue(con->othercon);
   1515	}
   1516	srcu_read_unlock(&connections_srcu, idx);
   1517
   1518	spin_lock(&dlm_node_addrs_spin);
   1519	na = find_node_addr(nodeid);
   1520	if (na) {
   1521		list_del(&na->list);
   1522		while (na->addr_count--)
   1523			kfree(na->addr[na->addr_count]);
   1524		kfree(na);
   1525	}
   1526	spin_unlock(&dlm_node_addrs_spin);
   1527
   1528	return 0;
   1529}
   1530
   1531/* Receive workqueue function */
   1532static void process_recv_sockets(struct work_struct *work)
   1533{
   1534	struct connection *con = container_of(work, struct connection, rwork);
   1535
   1536	clear_bit(CF_READ_PENDING, &con->flags);
   1537	receive_from_sock(con);
   1538}
   1539
   1540static void process_listen_recv_socket(struct work_struct *work)
   1541{
   1542	accept_from_sock(&listen_con);
   1543}
   1544
   1545static void dlm_connect(struct connection *con)
   1546{
   1547	struct sockaddr_storage addr;
   1548	int result, addr_len;
   1549	struct socket *sock;
   1550	unsigned int mark;
   1551
   1552	/* Some odd races can cause double-connects, ignore them */
   1553	if (con->retries++ > MAX_CONNECT_RETRIES)
   1554		return;
   1555
   1556	if (con->sock) {
   1557		log_print("node %d already connected.", con->nodeid);
   1558		return;
   1559	}
   1560
   1561	memset(&addr, 0, sizeof(addr));
   1562	result = nodeid_to_addr(con->nodeid, &addr, NULL,
   1563				dlm_proto_ops->try_new_addr, &mark);
   1564	if (result < 0) {
   1565		log_print("no address for nodeid %d", con->nodeid);
   1566		return;
   1567	}
   1568
   1569	/* Create a socket to communicate with */
   1570	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
   1571				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
   1572	if (result < 0)
   1573		goto socket_err;
   1574
   1575	sock_set_mark(sock->sk, mark);
   1576	dlm_proto_ops->sockopts(sock);
   1577
   1578	add_sock(sock, con);
   1579
   1580	result = dlm_proto_ops->bind(sock);
   1581	if (result < 0)
   1582		goto add_sock_err;
   1583
   1584	log_print_ratelimited("connecting to %d", con->nodeid);
   1585	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
   1586	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
   1587					addr_len);
   1588	if (result < 0)
   1589		goto add_sock_err;
   1590
   1591	return;
   1592
   1593add_sock_err:
   1594	dlm_close_sock(&con->sock);
   1595
   1596socket_err:
   1597	/*
   1598	 * Some errors are fatal and this list might need adjusting. For other
   1599	 * errors we try again until the max number of retries is reached.
   1600	 */
   1601	if (result != -EHOSTUNREACH &&
   1602	    result != -ENETUNREACH &&
   1603	    result != -ENETDOWN &&
   1604	    result != -EINVAL &&
   1605	    result != -EPROTONOSUPPORT) {
   1606		log_print("connect %d try %d error %d", con->nodeid,
   1607			  con->retries, result);
   1608		msleep(1000);
   1609		lowcomms_connect_sock(con);
   1610	}
   1611}
   1612
   1613/* Send workqueue function */
   1614static void process_send_sockets(struct work_struct *work)
   1615{
   1616	struct connection *con = container_of(work, struct connection, swork);
   1617
   1618	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
   1619
   1620	clear_bit(CF_WRITE_PENDING, &con->flags);
   1621
   1622	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
   1623		close_connection(con, false, false, true);
   1624		dlm_midcomms_unack_msg_resend(con->nodeid);
   1625	}
   1626
   1627	if (con->sock == NULL) {
   1628		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
   1629			msleep(1000);
   1630
   1631		mutex_lock(&con->sock_mutex);
   1632		dlm_connect(con);
   1633		mutex_unlock(&con->sock_mutex);
   1634	}
   1635
   1636	if (!list_empty(&con->writequeue))
   1637		send_to_sock(con);
   1638}
   1639
   1640static void work_stop(void)
   1641{
   1642	if (recv_workqueue) {
   1643		destroy_workqueue(recv_workqueue);
   1644		recv_workqueue = NULL;
   1645	}
   1646
   1647	if (send_workqueue) {
   1648		destroy_workqueue(send_workqueue);
   1649		send_workqueue = NULL;
   1650	}
   1651}
   1652
   1653static int work_start(void)
   1654{
   1655	recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
   1656	if (!recv_workqueue) {
   1657		log_print("can't start dlm_recv");
   1658		return -ENOMEM;
   1659	}
   1660
   1661	send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
   1662	if (!send_workqueue) {
   1663		log_print("can't start dlm_send");
   1664		destroy_workqueue(recv_workqueue);
   1665		recv_workqueue = NULL;
   1666		return -ENOMEM;
   1667	}
   1668
   1669	return 0;
   1670}
   1671
   1672static void shutdown_conn(struct connection *con)
   1673{
   1674	if (dlm_proto_ops->shutdown_action)
   1675		dlm_proto_ops->shutdown_action(con);
   1676}
   1677
   1678void dlm_lowcomms_shutdown(void)
   1679{
   1680	int idx;
   1681
   1682	/* Set all the flags to prevent any
   1683	 * socket activity.
   1684	 */
   1685	dlm_allow_conn = 0;
   1686
   1687	if (recv_workqueue)
   1688		flush_workqueue(recv_workqueue);
   1689	if (send_workqueue)
   1690		flush_workqueue(send_workqueue);
   1691
   1692	dlm_close_sock(&listen_con.sock);
   1693
   1694	idx = srcu_read_lock(&connections_srcu);
   1695	foreach_conn(shutdown_conn);
   1696	srcu_read_unlock(&connections_srcu, idx);
   1697}
   1698
   1699static void _stop_conn(struct connection *con, bool and_other)
   1700{
   1701	mutex_lock(&con->sock_mutex);
   1702	set_bit(CF_CLOSE, &con->flags);
   1703	set_bit(CF_READ_PENDING, &con->flags);
   1704	set_bit(CF_WRITE_PENDING, &con->flags);
   1705	if (con->sock && con->sock->sk) {
   1706		lock_sock(con->sock->sk);
   1707		con->sock->sk->sk_user_data = NULL;
   1708		release_sock(con->sock->sk);
   1709	}
   1710	if (con->othercon && and_other)
   1711		_stop_conn(con->othercon, false);
   1712	mutex_unlock(&con->sock_mutex);
   1713}
   1714
   1715static void stop_conn(struct connection *con)
   1716{
   1717	_stop_conn(con, true);
   1718}
   1719
   1720static void connection_release(struct rcu_head *rcu)
   1721{
   1722	struct connection *con = container_of(rcu, struct connection, rcu);
   1723
   1724	kfree(con->rx_buf);
   1725	kfree(con);
   1726}
   1727
   1728static void free_conn(struct connection *con)
   1729{
   1730	close_connection(con, true, true, true);
   1731	spin_lock(&connections_lock);
   1732	hlist_del_rcu(&con->list);
   1733	spin_unlock(&connections_lock);
   1734	if (con->othercon) {
   1735		clean_one_writequeue(con->othercon);
   1736		call_srcu(&connections_srcu, &con->othercon->rcu,
   1737			  connection_release);
   1738	}
   1739	clean_one_writequeue(con);
   1740	call_srcu(&connections_srcu, &con->rcu, connection_release);
   1741}
   1742
   1743static void work_flush(void)
   1744{
   1745	int ok;
   1746	int i;
   1747	struct connection *con;
   1748
   1749	do {
   1750		ok = 1;
   1751		foreach_conn(stop_conn);
   1752		if (recv_workqueue)
   1753			flush_workqueue(recv_workqueue);
   1754		if (send_workqueue)
   1755			flush_workqueue(send_workqueue);
   1756		for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
   1757			hlist_for_each_entry_rcu(con, &connection_hash[i],
   1758						 list) {
   1759				ok &= test_bit(CF_READ_PENDING, &con->flags);
   1760				ok &= test_bit(CF_WRITE_PENDING, &con->flags);
   1761				if (con->othercon) {
   1762					ok &= test_bit(CF_READ_PENDING,
   1763						       &con->othercon->flags);
   1764					ok &= test_bit(CF_WRITE_PENDING,
   1765						       &con->othercon->flags);
   1766				}
   1767			}
   1768		}
   1769	} while (!ok);
   1770}
   1771
   1772void dlm_lowcomms_stop(void)
   1773{
   1774	int idx;
   1775
   1776	idx = srcu_read_lock(&connections_srcu);
   1777	work_flush();
   1778	foreach_conn(free_conn);
   1779	srcu_read_unlock(&connections_srcu, idx);
   1780	work_stop();
   1781	deinit_local();
   1782
   1783	dlm_proto_ops = NULL;
   1784}
   1785
   1786static int dlm_listen_for_all(void)
   1787{
   1788	struct socket *sock;
   1789	int result;
   1790
   1791	log_print("Using %s for communications",
   1792		  dlm_proto_ops->name);
   1793
   1794	result = dlm_proto_ops->listen_validate();
   1795	if (result < 0)
   1796		return result;
   1797
   1798	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
   1799				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
   1800	if (result < 0) {
   1801		log_print("Can't create comms socket: %d", result);
   1802		return result;
   1803	}
   1804
   1805	sock_set_mark(sock->sk, dlm_config.ci_mark);
   1806	dlm_proto_ops->listen_sockopts(sock);
   1807
   1808	result = dlm_proto_ops->listen_bind(sock);
   1809	if (result < 0)
   1810		goto out;
   1811
   1812	save_listen_callbacks(sock);
   1813	add_listen_sock(sock, &listen_con);
   1814
   1815	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
   1816	result = sock->ops->listen(sock, 5);
   1817	if (result < 0) {
   1818		dlm_close_sock(&listen_con.sock);
   1819		goto out;
   1820	}
   1821
   1822	return 0;
   1823
   1824out:
   1825	sock_release(sock);
   1826	return result;
   1827}
   1828
   1829static int dlm_tcp_bind(struct socket *sock)
   1830{
   1831	struct sockaddr_storage src_addr;
   1832	int result, addr_len;
   1833
   1834	/* Bind to our cluster-known address connecting to avoid
   1835	 * routing problems.
   1836	 */
   1837	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
   1838	make_sockaddr(&src_addr, 0, &addr_len);
   1839
   1840	result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
   1841				 addr_len);
   1842	if (result < 0) {
   1843		/* This *may* not indicate a critical error */
   1844		log_print("could not bind for connect: %d", result);
   1845	}
   1846
   1847	return 0;
   1848}
   1849
   1850static int dlm_tcp_connect(struct connection *con, struct socket *sock,
   1851			   struct sockaddr *addr, int addr_len)
   1852{
   1853	int ret;
   1854
   1855	ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
   1856	switch (ret) {
   1857	case -EINPROGRESS:
   1858		fallthrough;
   1859	case 0:
   1860		return 0;
   1861	}
   1862
   1863	return ret;
   1864}
   1865
   1866static int dlm_tcp_listen_validate(void)
   1867{
   1868	/* We don't support multi-homed hosts */
   1869	if (dlm_local_count > 1) {
   1870		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
   1871		return -EINVAL;
   1872	}
   1873
   1874	return 0;
   1875}
   1876
   1877static void dlm_tcp_sockopts(struct socket *sock)
   1878{
   1879	/* Turn off Nagle's algorithm */
   1880	tcp_sock_set_nodelay(sock->sk);
   1881}
   1882
   1883static void dlm_tcp_listen_sockopts(struct socket *sock)
   1884{
   1885	dlm_tcp_sockopts(sock);
   1886	sock_set_reuseaddr(sock->sk);
   1887}
   1888
   1889static int dlm_tcp_listen_bind(struct socket *sock)
   1890{
   1891	int addr_len;
   1892
   1893	/* Bind to our port */
   1894	make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
   1895	return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
   1896			       addr_len);
   1897}
   1898
   1899static const struct dlm_proto_ops dlm_tcp_ops = {
   1900	.name = "TCP",
   1901	.proto = IPPROTO_TCP,
   1902	.connect = dlm_tcp_connect,
   1903	.sockopts = dlm_tcp_sockopts,
   1904	.bind = dlm_tcp_bind,
   1905	.listen_validate = dlm_tcp_listen_validate,
   1906	.listen_sockopts = dlm_tcp_listen_sockopts,
   1907	.listen_bind = dlm_tcp_listen_bind,
   1908	.shutdown_action = dlm_tcp_shutdown,
   1909	.eof_condition = tcp_eof_condition,
   1910};
   1911
   1912static int dlm_sctp_bind(struct socket *sock)
   1913{
   1914	return sctp_bind_addrs(sock, 0);
   1915}
   1916
   1917static int dlm_sctp_connect(struct connection *con, struct socket *sock,
   1918			    struct sockaddr *addr, int addr_len)
   1919{
   1920	int ret;
   1921
   1922	/*
   1923	 * Make sock->ops->connect() function return in specified time,
   1924	 * since O_NONBLOCK argument in connect() function does not work here,
   1925	 * then, we should restore the default value of this attribute.
   1926	 */
   1927	sock_set_sndtimeo(sock->sk, 5);
   1928	ret = sock->ops->connect(sock, addr, addr_len, 0);
   1929	sock_set_sndtimeo(sock->sk, 0);
   1930	if (ret < 0)
   1931		return ret;
   1932
   1933	if (!test_and_set_bit(CF_CONNECTED, &con->flags))
   1934		log_print("successful connected to node %d", con->nodeid);
   1935
   1936	return 0;
   1937}
   1938
   1939static int dlm_sctp_listen_validate(void)
   1940{
   1941	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
   1942		log_print("SCTP is not enabled by this kernel");
   1943		return -EOPNOTSUPP;
   1944	}
   1945
   1946	request_module("sctp");
   1947	return 0;
   1948}
   1949
   1950static int dlm_sctp_bind_listen(struct socket *sock)
   1951{
   1952	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
   1953}
   1954
   1955static void dlm_sctp_sockopts(struct socket *sock)
   1956{
   1957	/* Turn off Nagle's algorithm */
   1958	sctp_sock_set_nodelay(sock->sk);
   1959	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
   1960}
   1961
   1962static const struct dlm_proto_ops dlm_sctp_ops = {
   1963	.name = "SCTP",
   1964	.proto = IPPROTO_SCTP,
   1965	.try_new_addr = true,
   1966	.connect = dlm_sctp_connect,
   1967	.sockopts = dlm_sctp_sockopts,
   1968	.bind = dlm_sctp_bind,
   1969	.listen_validate = dlm_sctp_listen_validate,
   1970	.listen_sockopts = dlm_sctp_sockopts,
   1971	.listen_bind = dlm_sctp_bind_listen,
   1972};
   1973
   1974int dlm_lowcomms_start(void)
   1975{
   1976	int error = -EINVAL;
   1977	int i;
   1978
   1979	for (i = 0; i < CONN_HASH_SIZE; i++)
   1980		INIT_HLIST_HEAD(&connection_hash[i]);
   1981
   1982	init_local();
   1983	if (!dlm_local_count) {
   1984		error = -ENOTCONN;
   1985		log_print("no local IP address has been set");
   1986		goto fail;
   1987	}
   1988
   1989	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
   1990
   1991	error = work_start();
   1992	if (error)
   1993		goto fail_local;
   1994
   1995	dlm_allow_conn = 1;
   1996
   1997	/* Start listening */
   1998	switch (dlm_config.ci_protocol) {
   1999	case DLM_PROTO_TCP:
   2000		dlm_proto_ops = &dlm_tcp_ops;
   2001		break;
   2002	case DLM_PROTO_SCTP:
   2003		dlm_proto_ops = &dlm_sctp_ops;
   2004		break;
   2005	default:
   2006		log_print("Invalid protocol identifier %d set",
   2007			  dlm_config.ci_protocol);
   2008		error = -EINVAL;
   2009		goto fail_proto_ops;
   2010	}
   2011
   2012	error = dlm_listen_for_all();
   2013	if (error)
   2014		goto fail_listen;
   2015
   2016	return 0;
   2017
   2018fail_listen:
   2019	dlm_proto_ops = NULL;
   2020fail_proto_ops:
   2021	dlm_allow_conn = 0;
   2022	dlm_close_sock(&listen_con.sock);
   2023	work_stop();
   2024fail_local:
   2025	deinit_local();
   2026fail:
   2027	return error;
   2028}
   2029
   2030void dlm_lowcomms_exit(void)
   2031{
   2032	struct dlm_node_addr *na, *safe;
   2033
   2034	spin_lock(&dlm_node_addrs_spin);
   2035	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
   2036		list_del(&na->list);
   2037		while (na->addr_count--)
   2038			kfree(na->addr[na->addr_count]);
   2039		kfree(na);
   2040	}
   2041	spin_unlock(&dlm_node_addrs_spin);
   2042}