cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

socket.c (107219B)


      1/*
      2 * net/tipc/socket.c: TIPC socket API
      3 *
      4 * Copyright (c) 2001-2007, 2012-2019, Ericsson AB
      5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
      6 * Copyright (c) 2020-2021, Red Hat Inc
      7 * All rights reserved.
      8 *
      9 * Redistribution and use in source and binary forms, with or without
     10 * modification, are permitted provided that the following conditions are met:
     11 *
     12 * 1. Redistributions of source code must retain the above copyright
     13 *    notice, this list of conditions and the following disclaimer.
     14 * 2. Redistributions in binary form must reproduce the above copyright
     15 *    notice, this list of conditions and the following disclaimer in the
     16 *    documentation and/or other materials provided with the distribution.
     17 * 3. Neither the names of the copyright holders nor the names of its
     18 *    contributors may be used to endorse or promote products derived from
     19 *    this software without specific prior written permission.
     20 *
     21 * Alternatively, this software may be distributed under the terms of the
     22 * GNU General Public License ("GPL") version 2 as published by the Free
     23 * Software Foundation.
     24 *
     25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
     29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     35 * POSSIBILITY OF SUCH DAMAGE.
     36 */
     37
     38#include <linux/rhashtable.h>
     39#include <linux/sched/signal.h>
     40
     41#include "core.h"
     42#include "name_table.h"
     43#include "node.h"
     44#include "link.h"
     45#include "name_distr.h"
     46#include "socket.h"
     47#include "bcast.h"
     48#include "netlink.h"
     49#include "group.h"
     50#include "trace.h"
     51
     52#define NAGLE_START_INIT	4
     53#define NAGLE_START_MAX		1024
     54#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
     55#define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
     56#define TIPC_MAX_PORT		0xffffffff
     57#define TIPC_MIN_PORT		1
     58#define TIPC_ACK_RATE		4       /* ACK at 1/4 of rcv window size */
     59
     60enum {
     61	TIPC_LISTEN = TCP_LISTEN,
     62	TIPC_ESTABLISHED = TCP_ESTABLISHED,
     63	TIPC_OPEN = TCP_CLOSE,
     64	TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
     65	TIPC_CONNECTING = TCP_SYN_SENT,
     66};
     67
     68struct sockaddr_pair {
     69	struct sockaddr_tipc sock;
     70	struct sockaddr_tipc member;
     71};
     72
     73/**
     74 * struct tipc_sock - TIPC socket structure
     75 * @sk: socket - interacts with 'port' and with user via the socket API
     76 * @max_pkt: maximum packet size "hint" used when building messages sent by port
     77 * @maxnagle: maximum size of msg which can be subject to nagle
     78 * @portid: unique port identity in TIPC socket hash table
     79 * @phdr: preformatted message header used when sending messages
     80 * @cong_links: list of congested links
     81 * @publications: list of publications for port
     82 * @blocking_link: address of the congested link we are currently sleeping on
     83 * @pub_count: total # of publications port has made during its lifetime
     84 * @conn_timeout: the time we can wait for an unresponded setup request
     85 * @probe_unacked: probe has not received ack yet
     86 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
     87 * @cong_link_cnt: number of congested links
     88 * @snt_unacked: # messages sent by socket, and not yet acked by peer
     89 * @snd_win: send window size
     90 * @peer_caps: peer capabilities mask
     91 * @rcv_unacked: # messages read by user, but not yet acked back to peer
     92 * @rcv_win: receive window size
     93 * @peer: 'connected' peer for dgram/rdm
     94 * @node: hash table node
     95 * @mc_method: cookie for use between socket and broadcast layer
     96 * @rcu: rcu struct for tipc_sock
     97 * @group: TIPC communications group
     98 * @oneway: message count in one direction (FIXME)
     99 * @nagle_start: current nagle value
    100 * @snd_backlog: send backlog count
    101 * @msg_acc: messages accepted; used in managing backlog and nagle
    102 * @pkt_cnt: TIPC socket packet count
    103 * @expect_ack: whether this TIPC socket is expecting an ack
    104 * @nodelay: setsockopt() TIPC_NODELAY setting
    105 * @group_is_open: TIPC socket group is fully open (FIXME)
    106 * @published: true if port has one or more associated names
    107 * @conn_addrtype: address type used when establishing connection
    108 */
    109struct tipc_sock {
    110	struct sock sk;
    111	u32 max_pkt;
    112	u32 maxnagle;
    113	u32 portid;
    114	struct tipc_msg phdr;
    115	struct list_head cong_links;
    116	struct list_head publications;
    117	u32 pub_count;
    118	atomic_t dupl_rcvcnt;
    119	u16 conn_timeout;
    120	bool probe_unacked;
    121	u16 cong_link_cnt;
    122	u16 snt_unacked;
    123	u16 snd_win;
    124	u16 peer_caps;
    125	u16 rcv_unacked;
    126	u16 rcv_win;
    127	struct sockaddr_tipc peer;
    128	struct rhash_head node;
    129	struct tipc_mc_method mc_method;
    130	struct rcu_head rcu;
    131	struct tipc_group *group;
    132	u32 oneway;
    133	u32 nagle_start;
    134	u16 snd_backlog;
    135	u16 msg_acc;
    136	u16 pkt_cnt;
    137	bool expect_ack;
    138	bool nodelay;
    139	bool group_is_open;
    140	bool published;
    141	u8 conn_addrtype;
    142};
    143
    144static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
    145static void tipc_data_ready(struct sock *sk);
    146static void tipc_write_space(struct sock *sk);
    147static void tipc_sock_destruct(struct sock *sk);
    148static int tipc_release(struct socket *sock);
    149static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
    150		       bool kern);
    151static void tipc_sk_timeout(struct timer_list *t);
    152static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua);
    153static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua);
    154static int tipc_sk_leave(struct tipc_sock *tsk);
    155static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
    156static int tipc_sk_insert(struct tipc_sock *tsk);
    157static void tipc_sk_remove(struct tipc_sock *tsk);
    158static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
    159static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
    160static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
    161static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);
    162
    163static const struct proto_ops packet_ops;
    164static const struct proto_ops stream_ops;
    165static const struct proto_ops msg_ops;
    166static struct proto tipc_proto;
    167static const struct rhashtable_params tsk_rht_params;
    168
    169static u32 tsk_own_node(struct tipc_sock *tsk)
    170{
    171	return msg_prevnode(&tsk->phdr);
    172}
    173
    174static u32 tsk_peer_node(struct tipc_sock *tsk)
    175{
    176	return msg_destnode(&tsk->phdr);
    177}
    178
    179static u32 tsk_peer_port(struct tipc_sock *tsk)
    180{
    181	return msg_destport(&tsk->phdr);
    182}
    183
    184static  bool tsk_unreliable(struct tipc_sock *tsk)
    185{
    186	return msg_src_droppable(&tsk->phdr) != 0;
    187}
    188
    189static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
    190{
    191	msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
    192}
    193
    194static bool tsk_unreturnable(struct tipc_sock *tsk)
    195{
    196	return msg_dest_droppable(&tsk->phdr) != 0;
    197}
    198
    199static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
    200{
    201	msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
    202}
    203
    204static int tsk_importance(struct tipc_sock *tsk)
    205{
    206	return msg_importance(&tsk->phdr);
    207}
    208
    209static struct tipc_sock *tipc_sk(const struct sock *sk)
    210{
    211	return container_of(sk, struct tipc_sock, sk);
    212}
    213
    214int tsk_set_importance(struct sock *sk, int imp)
    215{
    216	if (imp > TIPC_CRITICAL_IMPORTANCE)
    217		return -EINVAL;
    218	msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
    219	return 0;
    220}
    221
    222static bool tsk_conn_cong(struct tipc_sock *tsk)
    223{
    224	return tsk->snt_unacked > tsk->snd_win;
    225}
    226
    227static u16 tsk_blocks(int len)
    228{
    229	return ((len / FLOWCTL_BLK_SZ) + 1);
    230}
    231
    232/* tsk_blocks(): translate a buffer size in bytes to number of
    233 * advertisable blocks, taking into account the ratio truesize(len)/len
    234 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
    235 */
    236static u16 tsk_adv_blocks(int len)
    237{
    238	return len / FLOWCTL_BLK_SZ / 4;
    239}
    240
    241/* tsk_inc(): increment counter for sent or received data
    242 * - If block based flow control is not supported by peer we
    243 *   fall back to message based ditto, incrementing the counter
    244 */
    245static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
    246{
    247	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
    248		return ((msglen / FLOWCTL_BLK_SZ) + 1);
    249	return 1;
    250}
    251
    252/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
    253 */
    254static void tsk_set_nagle(struct tipc_sock *tsk)
    255{
    256	struct sock *sk = &tsk->sk;
    257
    258	tsk->maxnagle = 0;
    259	if (sk->sk_type != SOCK_STREAM)
    260		return;
    261	if (tsk->nodelay)
    262		return;
    263	if (!(tsk->peer_caps & TIPC_NAGLE))
    264		return;
    265	/* Limit node local buffer size to avoid receive queue overflow */
    266	if (tsk->max_pkt == MAX_MSG_SIZE)
    267		tsk->maxnagle = 1500;
    268	else
    269		tsk->maxnagle = tsk->max_pkt;
    270}
    271
    272/**
    273 * tsk_advance_rx_queue - discard first buffer in socket receive queue
    274 * @sk: network socket
    275 *
    276 * Caller must hold socket lock
    277 */
    278static void tsk_advance_rx_queue(struct sock *sk)
    279{
    280	trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
    281	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
    282}
    283
    284/* tipc_sk_respond() : send response message back to sender
    285 */
    286static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
    287{
    288	u32 selector;
    289	u32 dnode;
    290	u32 onode = tipc_own_addr(sock_net(sk));
    291
    292	if (!tipc_msg_reverse(onode, &skb, err))
    293		return;
    294
    295	trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
    296	dnode = msg_destnode(buf_msg(skb));
    297	selector = msg_origport(buf_msg(skb));
    298	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
    299}
    300
    301/**
    302 * tsk_rej_rx_queue - reject all buffers in socket receive queue
    303 * @sk: network socket
    304 * @error: response error code
    305 *
    306 * Caller must hold socket lock
    307 */
    308static void tsk_rej_rx_queue(struct sock *sk, int error)
    309{
    310	struct sk_buff *skb;
    311
    312	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
    313		tipc_sk_respond(sk, skb, error);
    314}
    315
    316static bool tipc_sk_connected(struct sock *sk)
    317{
    318	return sk->sk_state == TIPC_ESTABLISHED;
    319}
    320
    321/* tipc_sk_type_connectionless - check if the socket is datagram socket
    322 * @sk: socket
    323 *
    324 * Returns true if connection less, false otherwise
    325 */
    326static bool tipc_sk_type_connectionless(struct sock *sk)
    327{
    328	return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
    329}
    330
    331/* tsk_peer_msg - verify if message was sent by connected port's peer
    332 *
    333 * Handles cases where the node's network address has changed from
    334 * the default of <0.0.0> to its configured setting.
    335 */
    336static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
    337{
    338	struct sock *sk = &tsk->sk;
    339	u32 self = tipc_own_addr(sock_net(sk));
    340	u32 peer_port = tsk_peer_port(tsk);
    341	u32 orig_node, peer_node;
    342
    343	if (unlikely(!tipc_sk_connected(sk)))
    344		return false;
    345
    346	if (unlikely(msg_origport(msg) != peer_port))
    347		return false;
    348
    349	orig_node = msg_orignode(msg);
    350	peer_node = tsk_peer_node(tsk);
    351
    352	if (likely(orig_node == peer_node))
    353		return true;
    354
    355	if (!orig_node && peer_node == self)
    356		return true;
    357
    358	if (!peer_node && orig_node == self)
    359		return true;
    360
    361	return false;
    362}
    363
    364/* tipc_set_sk_state - set the sk_state of the socket
    365 * @sk: socket
    366 *
    367 * Caller must hold socket lock
    368 *
    369 * Returns 0 on success, errno otherwise
    370 */
    371static int tipc_set_sk_state(struct sock *sk, int state)
    372{
    373	int oldsk_state = sk->sk_state;
    374	int res = -EINVAL;
    375
    376	switch (state) {
    377	case TIPC_OPEN:
    378		res = 0;
    379		break;
    380	case TIPC_LISTEN:
    381	case TIPC_CONNECTING:
    382		if (oldsk_state == TIPC_OPEN)
    383			res = 0;
    384		break;
    385	case TIPC_ESTABLISHED:
    386		if (oldsk_state == TIPC_CONNECTING ||
    387		    oldsk_state == TIPC_OPEN)
    388			res = 0;
    389		break;
    390	case TIPC_DISCONNECTING:
    391		if (oldsk_state == TIPC_CONNECTING ||
    392		    oldsk_state == TIPC_ESTABLISHED)
    393			res = 0;
    394		break;
    395	}
    396
    397	if (!res)
    398		sk->sk_state = state;
    399
    400	return res;
    401}
    402
    403static int tipc_sk_sock_err(struct socket *sock, long *timeout)
    404{
    405	struct sock *sk = sock->sk;
    406	int err = sock_error(sk);
    407	int typ = sock->type;
    408
    409	if (err)
    410		return err;
    411	if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
    412		if (sk->sk_state == TIPC_DISCONNECTING)
    413			return -EPIPE;
    414		else if (!tipc_sk_connected(sk))
    415			return -ENOTCONN;
    416	}
    417	if (!*timeout)
    418		return -EAGAIN;
    419	if (signal_pending(current))
    420		return sock_intr_errno(*timeout);
    421
    422	return 0;
    423}
    424
    425#define tipc_wait_for_cond(sock_, timeo_, condition_)			       \
    426({                                                                             \
    427	DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
    428	struct sock *sk_;						       \
    429	int rc_;							       \
    430									       \
    431	while ((rc_ = !(condition_))) {					       \
    432		/* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
    433		smp_rmb();                                                     \
    434		sk_ = (sock_)->sk;					       \
    435		rc_ = tipc_sk_sock_err((sock_), timeo_);		       \
    436		if (rc_)						       \
    437			break;						       \
    438		add_wait_queue(sk_sleep(sk_), &wait_);                         \
    439		release_sock(sk_);					       \
    440		*(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
    441		sched_annotate_sleep();				               \
    442		lock_sock(sk_);						       \
    443		remove_wait_queue(sk_sleep(sk_), &wait_);		       \
    444	}								       \
    445	rc_;								       \
    446})
    447
    448/**
    449 * tipc_sk_create - create a TIPC socket
    450 * @net: network namespace (must be default network)
    451 * @sock: pre-allocated socket structure
    452 * @protocol: protocol indicator (must be 0)
    453 * @kern: caused by kernel or by userspace?
    454 *
    455 * This routine creates additional data structures used by the TIPC socket,
    456 * initializes them, and links them together.
    457 *
    458 * Return: 0 on success, errno otherwise
    459 */
    460static int tipc_sk_create(struct net *net, struct socket *sock,
    461			  int protocol, int kern)
    462{
    463	const struct proto_ops *ops;
    464	struct sock *sk;
    465	struct tipc_sock *tsk;
    466	struct tipc_msg *msg;
    467
    468	/* Validate arguments */
    469	if (unlikely(protocol != 0))
    470		return -EPROTONOSUPPORT;
    471
    472	switch (sock->type) {
    473	case SOCK_STREAM:
    474		ops = &stream_ops;
    475		break;
    476	case SOCK_SEQPACKET:
    477		ops = &packet_ops;
    478		break;
    479	case SOCK_DGRAM:
    480	case SOCK_RDM:
    481		ops = &msg_ops;
    482		break;
    483	default:
    484		return -EPROTOTYPE;
    485	}
    486
    487	/* Allocate socket's protocol area */
    488	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
    489	if (sk == NULL)
    490		return -ENOMEM;
    491
    492	tsk = tipc_sk(sk);
    493	tsk->max_pkt = MAX_PKT_DEFAULT;
    494	tsk->maxnagle = 0;
    495	tsk->nagle_start = NAGLE_START_INIT;
    496	INIT_LIST_HEAD(&tsk->publications);
    497	INIT_LIST_HEAD(&tsk->cong_links);
    498	msg = &tsk->phdr;
    499
    500	/* Finish initializing socket data structures */
    501	sock->ops = ops;
    502	sock_init_data(sock, sk);
    503	tipc_set_sk_state(sk, TIPC_OPEN);
    504	if (tipc_sk_insert(tsk)) {
    505		sk_free(sk);
    506		pr_warn("Socket create failed; port number exhausted\n");
    507		return -EINVAL;
    508	}
    509
    510	/* Ensure tsk is visible before we read own_addr. */
    511	smp_mb();
    512
    513	tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
    514		      TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
    515
    516	msg_set_origport(msg, tsk->portid);
    517	timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
    518	sk->sk_shutdown = 0;
    519	sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
    520	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
    521	sk->sk_data_ready = tipc_data_ready;
    522	sk->sk_write_space = tipc_write_space;
    523	sk->sk_destruct = tipc_sock_destruct;
    524	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
    525	tsk->group_is_open = true;
    526	atomic_set(&tsk->dupl_rcvcnt, 0);
    527
    528	/* Start out with safe limits until we receive an advertised window */
    529	tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
    530	tsk->rcv_win = tsk->snd_win;
    531
    532	if (tipc_sk_type_connectionless(sk)) {
    533		tsk_set_unreturnable(tsk, true);
    534		if (sock->type == SOCK_DGRAM)
    535			tsk_set_unreliable(tsk, true);
    536	}
    537	__skb_queue_head_init(&tsk->mc_method.deferredq);
    538	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
    539	return 0;
    540}
    541
    542static void tipc_sk_callback(struct rcu_head *head)
    543{
    544	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
    545
    546	sock_put(&tsk->sk);
    547}
    548
    549/* Caller should hold socket lock for the socket. */
    550static void __tipc_shutdown(struct socket *sock, int error)
    551{
    552	struct sock *sk = sock->sk;
    553	struct tipc_sock *tsk = tipc_sk(sk);
    554	struct net *net = sock_net(sk);
    555	long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
    556	u32 dnode = tsk_peer_node(tsk);
    557	struct sk_buff *skb;
    558
    559	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
    560	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
    561					    !tsk_conn_cong(tsk)));
    562
    563	/* Push out delayed messages if in Nagle mode */
    564	tipc_sk_push_backlog(tsk, false);
    565	/* Remove pending SYN */
    566	__skb_queue_purge(&sk->sk_write_queue);
    567
    568	/* Remove partially received buffer if any */
    569	skb = skb_peek(&sk->sk_receive_queue);
    570	if (skb && TIPC_SKB_CB(skb)->bytes_read) {
    571		__skb_unlink(skb, &sk->sk_receive_queue);
    572		kfree_skb(skb);
    573	}
    574
    575	/* Reject all unreceived messages if connectionless */
    576	if (tipc_sk_type_connectionless(sk)) {
    577		tsk_rej_rx_queue(sk, error);
    578		return;
    579	}
    580
    581	switch (sk->sk_state) {
    582	case TIPC_CONNECTING:
    583	case TIPC_ESTABLISHED:
    584		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
    585		tipc_node_remove_conn(net, dnode, tsk->portid);
    586		/* Send a FIN+/- to its peer */
    587		skb = __skb_dequeue(&sk->sk_receive_queue);
    588		if (skb) {
    589			__skb_queue_purge(&sk->sk_receive_queue);
    590			tipc_sk_respond(sk, skb, error);
    591			break;
    592		}
    593		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
    594				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
    595				      tsk_own_node(tsk), tsk_peer_port(tsk),
    596				      tsk->portid, error);
    597		if (skb)
    598			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
    599		break;
    600	case TIPC_LISTEN:
    601		/* Reject all SYN messages */
    602		tsk_rej_rx_queue(sk, error);
    603		break;
    604	default:
    605		__skb_queue_purge(&sk->sk_receive_queue);
    606		break;
    607	}
    608}
    609
    610/**
    611 * tipc_release - destroy a TIPC socket
    612 * @sock: socket to destroy
    613 *
    614 * This routine cleans up any messages that are still queued on the socket.
    615 * For DGRAM and RDM socket types, all queued messages are rejected.
    616 * For SEQPACKET and STREAM socket types, the first message is rejected
    617 * and any others are discarded.  (If the first message on a STREAM socket
    618 * is partially-read, it is discarded and the next one is rejected instead.)
    619 *
    620 * NOTE: Rejected messages are not necessarily returned to the sender!  They
    621 * are returned or discarded according to the "destination droppable" setting
    622 * specified for the message by the sender.
    623 *
    624 * Return: 0 on success, errno otherwise
    625 */
    626static int tipc_release(struct socket *sock)
    627{
    628	struct sock *sk = sock->sk;
    629	struct tipc_sock *tsk;
    630
    631	/*
    632	 * Exit if socket isn't fully initialized (occurs when a failed accept()
    633	 * releases a pre-allocated child socket that was never used)
    634	 */
    635	if (sk == NULL)
    636		return 0;
    637
    638	tsk = tipc_sk(sk);
    639	lock_sock(sk);
    640
    641	trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
    642	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
    643	sk->sk_shutdown = SHUTDOWN_MASK;
    644	tipc_sk_leave(tsk);
    645	tipc_sk_withdraw(tsk, NULL);
    646	__skb_queue_purge(&tsk->mc_method.deferredq);
    647	sk_stop_timer(sk, &sk->sk_timer);
    648	tipc_sk_remove(tsk);
    649
    650	sock_orphan(sk);
    651	/* Reject any messages that accumulated in backlog queue */
    652	release_sock(sk);
    653	tipc_dest_list_purge(&tsk->cong_links);
    654	tsk->cong_link_cnt = 0;
    655	call_rcu(&tsk->rcu, tipc_sk_callback);
    656	sock->sk = NULL;
    657
    658	return 0;
    659}
    660
    661/**
    662 * __tipc_bind - associate or disassocate TIPC name(s) with a socket
    663 * @sock: socket structure
    664 * @skaddr: socket address describing name(s) and desired operation
    665 * @alen: size of socket address data structure
    666 *
    667 * Name and name sequence binding are indicated using a positive scope value;
    668 * a negative scope value unbinds the specified name.  Specifying no name
    669 * (i.e. a socket address length of 0) unbinds all names from the socket.
    670 *
    671 * Return: 0 on success, errno otherwise
    672 *
    673 * NOTE: This routine doesn't need to take the socket lock since it doesn't
    674 *       access any non-constant socket information.
    675 */
    676static int __tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
    677{
    678	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
    679	struct tipc_sock *tsk = tipc_sk(sock->sk);
    680	bool unbind = false;
    681
    682	if (unlikely(!alen))
    683		return tipc_sk_withdraw(tsk, NULL);
    684
    685	if (ua->addrtype == TIPC_SERVICE_ADDR) {
    686		ua->addrtype = TIPC_SERVICE_RANGE;
    687		ua->sr.upper = ua->sr.lower;
    688	}
    689	if (ua->scope < 0) {
    690		unbind = true;
    691		ua->scope = -ua->scope;
    692	}
    693	/* Users may still use deprecated TIPC_ZONE_SCOPE */
    694	if (ua->scope != TIPC_NODE_SCOPE)
    695		ua->scope = TIPC_CLUSTER_SCOPE;
    696
    697	if (tsk->group)
    698		return -EACCES;
    699
    700	if (unbind)
    701		return tipc_sk_withdraw(tsk, ua);
    702	return tipc_sk_publish(tsk, ua);
    703}
    704
    705int tipc_sk_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
    706{
    707	int res;
    708
    709	lock_sock(sock->sk);
    710	res = __tipc_bind(sock, skaddr, alen);
    711	release_sock(sock->sk);
    712	return res;
    713}
    714
    715static int tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
    716{
    717	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
    718	u32 atype = ua->addrtype;
    719
    720	if (alen) {
    721		if (!tipc_uaddr_valid(ua, alen))
    722			return -EINVAL;
    723		if (atype == TIPC_SOCKET_ADDR)
    724			return -EAFNOSUPPORT;
    725		if (ua->sr.type < TIPC_RESERVED_TYPES) {
    726			pr_warn_once("Can't bind to reserved service type %u\n",
    727				     ua->sr.type);
    728			return -EACCES;
    729		}
    730	}
    731	return tipc_sk_bind(sock, skaddr, alen);
    732}
    733
    734/**
    735 * tipc_getname - get port ID of socket or peer socket
    736 * @sock: socket structure
    737 * @uaddr: area for returned socket address
    738 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
    739 *
    740 * Return: 0 on success, errno otherwise
    741 *
    742 * NOTE: This routine doesn't need to take the socket lock since it only
    743 *       accesses socket information that is unchanging (or which changes in
    744 *       a completely predictable manner).
    745 */
    746static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
    747			int peer)
    748{
    749	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
    750	struct sock *sk = sock->sk;
    751	struct tipc_sock *tsk = tipc_sk(sk);
    752
    753	memset(addr, 0, sizeof(*addr));
    754	if (peer) {
    755		if ((!tipc_sk_connected(sk)) &&
    756		    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
    757			return -ENOTCONN;
    758		addr->addr.id.ref = tsk_peer_port(tsk);
    759		addr->addr.id.node = tsk_peer_node(tsk);
    760	} else {
    761		addr->addr.id.ref = tsk->portid;
    762		addr->addr.id.node = tipc_own_addr(sock_net(sk));
    763	}
    764
    765	addr->addrtype = TIPC_SOCKET_ADDR;
    766	addr->family = AF_TIPC;
    767	addr->scope = 0;
    768	addr->addr.name.domain = 0;
    769
    770	return sizeof(*addr);
    771}
    772
    773/**
    774 * tipc_poll - read and possibly block on pollmask
    775 * @file: file structure associated with the socket
    776 * @sock: socket for which to calculate the poll bits
    777 * @wait: ???
    778 *
    779 * Return: pollmask value
    780 *
    781 * COMMENTARY:
    782 * It appears that the usual socket locking mechanisms are not useful here
    783 * since the pollmask info is potentially out-of-date the moment this routine
    784 * exits.  TCP and other protocols seem to rely on higher level poll routines
    785 * to handle any preventable race conditions, so TIPC will do the same ...
    786 *
    787 * IMPORTANT: The fact that a read or write operation is indicated does NOT
    788 * imply that the operation will succeed, merely that it should be performed
    789 * and will not block.
    790 */
    791static __poll_t tipc_poll(struct file *file, struct socket *sock,
    792			      poll_table *wait)
    793{
    794	struct sock *sk = sock->sk;
    795	struct tipc_sock *tsk = tipc_sk(sk);
    796	__poll_t revents = 0;
    797
    798	sock_poll_wait(file, sock, wait);
    799	trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
    800
    801	if (sk->sk_shutdown & RCV_SHUTDOWN)
    802		revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
    803	if (sk->sk_shutdown == SHUTDOWN_MASK)
    804		revents |= EPOLLHUP;
    805
    806	switch (sk->sk_state) {
    807	case TIPC_ESTABLISHED:
    808		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
    809			revents |= EPOLLOUT;
    810		fallthrough;
    811	case TIPC_LISTEN:
    812	case TIPC_CONNECTING:
    813		if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
    814			revents |= EPOLLIN | EPOLLRDNORM;
    815		break;
    816	case TIPC_OPEN:
    817		if (tsk->group_is_open && !tsk->cong_link_cnt)
    818			revents |= EPOLLOUT;
    819		if (!tipc_sk_type_connectionless(sk))
    820			break;
    821		if (skb_queue_empty_lockless(&sk->sk_receive_queue))
    822			break;
    823		revents |= EPOLLIN | EPOLLRDNORM;
    824		break;
    825	case TIPC_DISCONNECTING:
    826		revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
    827		break;
    828	}
    829	return revents;
    830}
    831
    832/**
    833 * tipc_sendmcast - send multicast message
    834 * @sock: socket structure
    835 * @ua: destination address struct
    836 * @msg: message to send
    837 * @dlen: length of data to send
    838 * @timeout: timeout to wait for wakeup
    839 *
    840 * Called from function tipc_sendmsg(), which has done all sanity checks
    841 * Return: the number of bytes sent on success, or errno
    842 */
    843static int tipc_sendmcast(struct  socket *sock, struct tipc_uaddr *ua,
    844			  struct msghdr *msg, size_t dlen, long timeout)
    845{
    846	struct sock *sk = sock->sk;
    847	struct tipc_sock *tsk = tipc_sk(sk);
    848	struct tipc_msg *hdr = &tsk->phdr;
    849	struct net *net = sock_net(sk);
    850	int mtu = tipc_bcast_get_mtu(net);
    851	struct sk_buff_head pkts;
    852	struct tipc_nlist dsts;
    853	int rc;
    854
    855	if (tsk->group)
    856		return -EACCES;
    857
    858	/* Block or return if any destination link is congested */
    859	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
    860	if (unlikely(rc))
    861		return rc;
    862
    863	/* Lookup destination nodes */
    864	tipc_nlist_init(&dsts, tipc_own_addr(net));
    865	tipc_nametbl_lookup_mcast_nodes(net, ua, &dsts);
    866	if (!dsts.local && !dsts.remote)
    867		return -EHOSTUNREACH;
    868
    869	/* Build message header */
    870	msg_set_type(hdr, TIPC_MCAST_MSG);
    871	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
    872	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
    873	msg_set_destport(hdr, 0);
    874	msg_set_destnode(hdr, 0);
    875	msg_set_nametype(hdr, ua->sr.type);
    876	msg_set_namelower(hdr, ua->sr.lower);
    877	msg_set_nameupper(hdr, ua->sr.upper);
    878
    879	/* Build message as chain of buffers */
    880	__skb_queue_head_init(&pkts);
    881	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
    882
    883	/* Send message if build was successful */
    884	if (unlikely(rc == dlen)) {
    885		trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
    886					TIPC_DUMP_SK_SNDQ, " ");
    887		rc = tipc_mcast_xmit(net, &pkts, &tsk->mc_method, &dsts,
    888				     &tsk->cong_link_cnt);
    889	}
    890
    891	tipc_nlist_purge(&dsts);
    892
    893	return rc ? rc : dlen;
    894}
    895
    896/**
    897 * tipc_send_group_msg - send a message to a member in the group
    898 * @net: network namespace
    899 * @tsk: tipc socket
    900 * @m: message to send
    901 * @mb: group member
    902 * @dnode: destination node
    903 * @dport: destination port
    904 * @dlen: total length of message data
    905 */
    906static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
    907			       struct msghdr *m, struct tipc_member *mb,
    908			       u32 dnode, u32 dport, int dlen)
    909{
    910	u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
    911	struct tipc_mc_method *method = &tsk->mc_method;
    912	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
    913	struct tipc_msg *hdr = &tsk->phdr;
    914	struct sk_buff_head pkts;
    915	int mtu, rc;
    916
    917	/* Complete message header */
    918	msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
    919	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
    920	msg_set_destport(hdr, dport);
    921	msg_set_destnode(hdr, dnode);
    922	msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
    923
    924	/* Build message as chain of buffers */
    925	__skb_queue_head_init(&pkts);
    926	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
    927	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
    928	if (unlikely(rc != dlen))
    929		return rc;
    930
    931	/* Send message */
    932	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
    933	if (unlikely(rc == -ELINKCONG)) {
    934		tipc_dest_push(&tsk->cong_links, dnode, 0);
    935		tsk->cong_link_cnt++;
    936	}
    937
    938	/* Update send window */
    939	tipc_group_update_member(mb, blks);
    940
    941	/* A broadcast sent within next EXPIRE period must follow same path */
    942	method->rcast = true;
    943	method->mandatory = true;
    944	return dlen;
    945}
    946
    947/**
    948 * tipc_send_group_unicast - send message to a member in the group
    949 * @sock: socket structure
    950 * @m: message to send
    951 * @dlen: total length of message data
    952 * @timeout: timeout to wait for wakeup
    953 *
    954 * Called from function tipc_sendmsg(), which has done all sanity checks
    955 * Return: the number of bytes sent on success, or errno
    956 */
    957static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
    958				   int dlen, long timeout)
    959{
    960	struct sock *sk = sock->sk;
    961	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
    962	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
    963	struct tipc_sock *tsk = tipc_sk(sk);
    964	struct net *net = sock_net(sk);
    965	struct tipc_member *mb = NULL;
    966	u32 node, port;
    967	int rc;
    968
    969	node = ua->sk.node;
    970	port = ua->sk.ref;
    971	if (!port && !node)
    972		return -EHOSTUNREACH;
    973
    974	/* Block or return if destination link or member is congested */
    975	rc = tipc_wait_for_cond(sock, &timeout,
    976				!tipc_dest_find(&tsk->cong_links, node, 0) &&
    977				tsk->group &&
    978				!tipc_group_cong(tsk->group, node, port, blks,
    979						 &mb));
    980	if (unlikely(rc))
    981		return rc;
    982
    983	if (unlikely(!mb))
    984		return -EHOSTUNREACH;
    985
    986	rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
    987
    988	return rc ? rc : dlen;
    989}
    990
    991/**
    992 * tipc_send_group_anycast - send message to any member with given identity
    993 * @sock: socket structure
    994 * @m: message to send
    995 * @dlen: total length of message data
    996 * @timeout: timeout to wait for wakeup
    997 *
    998 * Called from function tipc_sendmsg(), which has done all sanity checks
    999 * Return: the number of bytes sent on success, or errno
   1000 */
   1001static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
   1002				   int dlen, long timeout)
   1003{
   1004	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
   1005	struct sock *sk = sock->sk;
   1006	struct tipc_sock *tsk = tipc_sk(sk);
   1007	struct list_head *cong_links = &tsk->cong_links;
   1008	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
   1009	struct tipc_msg *hdr = &tsk->phdr;
   1010	struct tipc_member *first = NULL;
   1011	struct tipc_member *mbr = NULL;
   1012	struct net *net = sock_net(sk);
   1013	u32 node, port, exclude;
   1014	struct list_head dsts;
   1015	int lookups = 0;
   1016	int dstcnt, rc;
   1017	bool cong;
   1018
   1019	INIT_LIST_HEAD(&dsts);
   1020	ua->sa.type = msg_nametype(hdr);
   1021	ua->scope = msg_lookup_scope(hdr);
   1022
   1023	while (++lookups < 4) {
   1024		exclude = tipc_group_exclude(tsk->group);
   1025
   1026		first = NULL;
   1027
   1028		/* Look for a non-congested destination member, if any */
   1029		while (1) {
   1030			if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
   1031						       exclude, false))
   1032				return -EHOSTUNREACH;
   1033			tipc_dest_pop(&dsts, &node, &port);
   1034			cong = tipc_group_cong(tsk->group, node, port, blks,
   1035					       &mbr);
   1036			if (!cong)
   1037				break;
   1038			if (mbr == first)
   1039				break;
   1040			if (!first)
   1041				first = mbr;
   1042		}
   1043
   1044		/* Start over if destination was not in member list */
   1045		if (unlikely(!mbr))
   1046			continue;
   1047
   1048		if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
   1049			break;
   1050
   1051		/* Block or return if destination link or member is congested */
   1052		rc = tipc_wait_for_cond(sock, &timeout,
   1053					!tipc_dest_find(cong_links, node, 0) &&
   1054					tsk->group &&
   1055					!tipc_group_cong(tsk->group, node, port,
   1056							 blks, &mbr));
   1057		if (unlikely(rc))
   1058			return rc;
   1059
   1060		/* Send, unless destination disappeared while waiting */
   1061		if (likely(mbr))
   1062			break;
   1063	}
   1064
   1065	if (unlikely(lookups >= 4))
   1066		return -EHOSTUNREACH;
   1067
   1068	rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
   1069
   1070	return rc ? rc : dlen;
   1071}
   1072
   1073/**
   1074 * tipc_send_group_bcast - send message to all members in communication group
   1075 * @sock: socket structure
   1076 * @m: message to send
   1077 * @dlen: total length of message data
   1078 * @timeout: timeout to wait for wakeup
   1079 *
   1080 * Called from function tipc_sendmsg(), which has done all sanity checks
   1081 * Return: the number of bytes sent on success, or errno
   1082 */
   1083static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
   1084				 int dlen, long timeout)
   1085{
   1086	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
   1087	struct sock *sk = sock->sk;
   1088	struct net *net = sock_net(sk);
   1089	struct tipc_sock *tsk = tipc_sk(sk);
   1090	struct tipc_nlist *dsts;
   1091	struct tipc_mc_method *method = &tsk->mc_method;
   1092	bool ack = method->mandatory && method->rcast;
   1093	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
   1094	struct tipc_msg *hdr = &tsk->phdr;
   1095	int mtu = tipc_bcast_get_mtu(net);
   1096	struct sk_buff_head pkts;
   1097	int rc = -EHOSTUNREACH;
   1098
   1099	/* Block or return if any destination link or member is congested */
   1100	rc = tipc_wait_for_cond(sock, &timeout,
   1101				!tsk->cong_link_cnt && tsk->group &&
   1102				!tipc_group_bc_cong(tsk->group, blks));
   1103	if (unlikely(rc))
   1104		return rc;
   1105
   1106	dsts = tipc_group_dests(tsk->group);
   1107	if (!dsts->local && !dsts->remote)
   1108		return -EHOSTUNREACH;
   1109
   1110	/* Complete message header */
   1111	if (ua) {
   1112		msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
   1113		msg_set_nameinst(hdr, ua->sa.instance);
   1114	} else {
   1115		msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
   1116		msg_set_nameinst(hdr, 0);
   1117	}
   1118	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
   1119	msg_set_destport(hdr, 0);
   1120	msg_set_destnode(hdr, 0);
   1121	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
   1122
   1123	/* Avoid getting stuck with repeated forced replicasts */
   1124	msg_set_grp_bc_ack_req(hdr, ack);
   1125
   1126	/* Build message as chain of buffers */
   1127	__skb_queue_head_init(&pkts);
   1128	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
   1129	if (unlikely(rc != dlen))
   1130		return rc;
   1131
   1132	/* Send message */
   1133	rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
   1134	if (unlikely(rc))
   1135		return rc;
   1136
   1137	/* Update broadcast sequence number and send windows */
   1138	tipc_group_update_bc_members(tsk->group, blks, ack);
   1139
   1140	/* Broadcast link is now free to choose method for next broadcast */
   1141	method->mandatory = false;
   1142	method->expires = jiffies;
   1143
   1144	return dlen;
   1145}
   1146
   1147/**
   1148 * tipc_send_group_mcast - send message to all members with given identity
   1149 * @sock: socket structure
   1150 * @m: message to send
   1151 * @dlen: total length of message data
   1152 * @timeout: timeout to wait for wakeup
   1153 *
   1154 * Called from function tipc_sendmsg(), which has done all sanity checks
   1155 * Return: the number of bytes sent on success, or errno
   1156 */
   1157static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
   1158				 int dlen, long timeout)
   1159{
   1160	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
   1161	struct sock *sk = sock->sk;
   1162	struct tipc_sock *tsk = tipc_sk(sk);
   1163	struct tipc_group *grp = tsk->group;
   1164	struct tipc_msg *hdr = &tsk->phdr;
   1165	struct net *net = sock_net(sk);
   1166	struct list_head dsts;
   1167	u32 dstcnt, exclude;
   1168
   1169	INIT_LIST_HEAD(&dsts);
   1170	ua->sa.type = msg_nametype(hdr);
   1171	ua->scope = msg_lookup_scope(hdr);
   1172	exclude = tipc_group_exclude(grp);
   1173
   1174	if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
   1175		return -EHOSTUNREACH;
   1176
   1177	if (dstcnt == 1) {
   1178		tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
   1179		return tipc_send_group_unicast(sock, m, dlen, timeout);
   1180	}
   1181
   1182	tipc_dest_list_purge(&dsts);
   1183	return tipc_send_group_bcast(sock, m, dlen, timeout);
   1184}
   1185
   1186/**
   1187 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
   1188 * @net: the associated network namespace
   1189 * @arrvq: queue with arriving messages, to be cloned after destination lookup
   1190 * @inputq: queue with cloned messages, delivered to socket after dest lookup
   1191 *
   1192 * Multi-threaded: parallel calls with reference to same queues may occur
   1193 */
   1194void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
   1195		       struct sk_buff_head *inputq)
   1196{
   1197	u32 self = tipc_own_addr(net);
   1198	struct sk_buff *skb, *_skb;
   1199	u32 portid, onode;
   1200	struct sk_buff_head tmpq;
   1201	struct list_head dports;
   1202	struct tipc_msg *hdr;
   1203	struct tipc_uaddr ua;
   1204	int user, mtyp, hlen;
   1205
   1206	__skb_queue_head_init(&tmpq);
   1207	INIT_LIST_HEAD(&dports);
   1208	ua.addrtype = TIPC_SERVICE_RANGE;
   1209
   1210	/* tipc_skb_peek() increments the head skb's reference counter */
   1211	skb = tipc_skb_peek(arrvq, &inputq->lock);
   1212	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
   1213		hdr = buf_msg(skb);
   1214		user = msg_user(hdr);
   1215		mtyp = msg_type(hdr);
   1216		hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
   1217		onode = msg_orignode(hdr);
   1218		ua.sr.type = msg_nametype(hdr);
   1219		ua.sr.lower = msg_namelower(hdr);
   1220		ua.sr.upper = msg_nameupper(hdr);
   1221		if (onode == self)
   1222			ua.scope = TIPC_ANY_SCOPE;
   1223		else
   1224			ua.scope = TIPC_CLUSTER_SCOPE;
   1225
   1226		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
   1227			spin_lock_bh(&inputq->lock);
   1228			if (skb_peek(arrvq) == skb) {
   1229				__skb_dequeue(arrvq);
   1230				__skb_queue_tail(inputq, skb);
   1231			}
   1232			kfree_skb(skb);
   1233			spin_unlock_bh(&inputq->lock);
   1234			continue;
   1235		}
   1236
   1237		/* Group messages require exact scope match */
   1238		if (msg_in_group(hdr)) {
   1239			ua.sr.lower = 0;
   1240			ua.sr.upper = ~0;
   1241			ua.scope = msg_lookup_scope(hdr);
   1242		}
   1243
   1244		/* Create destination port list: */
   1245		tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);
   1246
   1247		/* Clone message per destination */
   1248		while (tipc_dest_pop(&dports, NULL, &portid)) {
   1249			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
   1250			if (_skb) {
   1251				msg_set_destport(buf_msg(_skb), portid);
   1252				__skb_queue_tail(&tmpq, _skb);
   1253				continue;
   1254			}
   1255			pr_warn("Failed to clone mcast rcv buffer\n");
   1256		}
   1257		/* Append clones to inputq only if skb is still head of arrvq */
   1258		spin_lock_bh(&inputq->lock);
   1259		if (skb_peek(arrvq) == skb) {
   1260			skb_queue_splice_tail_init(&tmpq, inputq);
   1261			/* Decrement the skb's refcnt */
   1262			kfree_skb(__skb_dequeue(arrvq));
   1263		}
   1264		spin_unlock_bh(&inputq->lock);
   1265		__skb_queue_purge(&tmpq);
   1266		kfree_skb(skb);
   1267	}
   1268	tipc_sk_rcv(net, inputq);
   1269}
   1270
   1271/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
   1272 *                         when socket is in Nagle mode
   1273 */
   1274static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
   1275{
   1276	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
   1277	struct sk_buff *skb = skb_peek_tail(txq);
   1278	struct net *net = sock_net(&tsk->sk);
   1279	u32 dnode = tsk_peer_node(tsk);
   1280	int rc;
   1281
   1282	if (nagle_ack) {
   1283		tsk->pkt_cnt += skb_queue_len(txq);
   1284		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
   1285			tsk->oneway = 0;
   1286			if (tsk->nagle_start < NAGLE_START_MAX)
   1287				tsk->nagle_start *= 2;
   1288			tsk->expect_ack = false;
   1289			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
   1290				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
   1291				 tsk->nagle_start);
   1292		} else {
   1293			tsk->nagle_start = NAGLE_START_INIT;
   1294			if (skb) {
   1295				msg_set_ack_required(buf_msg(skb));
   1296				tsk->expect_ack = true;
   1297			} else {
   1298				tsk->expect_ack = false;
   1299			}
   1300		}
   1301		tsk->msg_acc = 0;
   1302		tsk->pkt_cnt = 0;
   1303	}
   1304
   1305	if (!skb || tsk->cong_link_cnt)
   1306		return;
   1307
   1308	/* Do not send SYN again after congestion */
   1309	if (msg_is_syn(buf_msg(skb)))
   1310		return;
   1311
   1312	if (tsk->msg_acc)
   1313		tsk->pkt_cnt += skb_queue_len(txq);
   1314	tsk->snt_unacked += tsk->snd_backlog;
   1315	tsk->snd_backlog = 0;
   1316	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
   1317	if (rc == -ELINKCONG)
   1318		tsk->cong_link_cnt = 1;
   1319}
   1320
   1321/**
   1322 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
   1323 * @tsk: receiving socket
   1324 * @skb: pointer to message buffer.
   1325 * @inputq: buffer list containing the buffers
   1326 * @xmitq: output message area
   1327 */
   1328static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
   1329				   struct sk_buff_head *inputq,
   1330				   struct sk_buff_head *xmitq)
   1331{
   1332	struct tipc_msg *hdr = buf_msg(skb);
   1333	u32 onode = tsk_own_node(tsk);
   1334	struct sock *sk = &tsk->sk;
   1335	int mtyp = msg_type(hdr);
   1336	bool was_cong;
   1337
   1338	/* Ignore if connection cannot be validated: */
   1339	if (!tsk_peer_msg(tsk, hdr)) {
   1340		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
   1341		goto exit;
   1342	}
   1343
   1344	if (unlikely(msg_errcode(hdr))) {
   1345		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
   1346		tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
   1347				      tsk_peer_port(tsk));
   1348		sk->sk_state_change(sk);
   1349
   1350		/* State change is ignored if socket already awake,
   1351		 * - convert msg to abort msg and add to inqueue
   1352		 */
   1353		msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
   1354		msg_set_type(hdr, TIPC_CONN_MSG);
   1355		msg_set_size(hdr, BASIC_H_SIZE);
   1356		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
   1357		__skb_queue_tail(inputq, skb);
   1358		return;
   1359	}
   1360
   1361	tsk->probe_unacked = false;
   1362
   1363	if (mtyp == CONN_PROBE) {
   1364		msg_set_type(hdr, CONN_PROBE_REPLY);
   1365		if (tipc_msg_reverse(onode, &skb, TIPC_OK))
   1366			__skb_queue_tail(xmitq, skb);
   1367		return;
   1368	} else if (mtyp == CONN_ACK) {
   1369		was_cong = tsk_conn_cong(tsk);
   1370		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
   1371		tsk->snt_unacked -= msg_conn_ack(hdr);
   1372		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
   1373			tsk->snd_win = msg_adv_win(hdr);
   1374		if (was_cong && !tsk_conn_cong(tsk))
   1375			sk->sk_write_space(sk);
   1376	} else if (mtyp != CONN_PROBE_REPLY) {
   1377		pr_warn("Received unknown CONN_PROTO msg\n");
   1378	}
   1379exit:
   1380	kfree_skb(skb);
   1381}
   1382
   1383/**
   1384 * tipc_sendmsg - send message in connectionless manner
   1385 * @sock: socket structure
   1386 * @m: message to send
   1387 * @dsz: amount of user data to be sent
   1388 *
   1389 * Message must have an destination specified explicitly.
   1390 * Used for SOCK_RDM and SOCK_DGRAM messages,
   1391 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
   1392 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
   1393 *
   1394 * Return: the number of bytes sent on success, or errno otherwise
   1395 */
   1396static int tipc_sendmsg(struct socket *sock,
   1397			struct msghdr *m, size_t dsz)
   1398{
   1399	struct sock *sk = sock->sk;
   1400	int ret;
   1401
   1402	lock_sock(sk);
   1403	ret = __tipc_sendmsg(sock, m, dsz);
   1404	release_sock(sk);
   1405
   1406	return ret;
   1407}
   1408
   1409static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
   1410{
   1411	struct sock *sk = sock->sk;
   1412	struct net *net = sock_net(sk);
   1413	struct tipc_sock *tsk = tipc_sk(sk);
   1414	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
   1415	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
   1416	struct list_head *clinks = &tsk->cong_links;
   1417	bool syn = !tipc_sk_type_connectionless(sk);
   1418	struct tipc_group *grp = tsk->group;
   1419	struct tipc_msg *hdr = &tsk->phdr;
   1420	struct tipc_socket_addr skaddr;
   1421	struct sk_buff_head pkts;
   1422	int atype, mtu, rc;
   1423
   1424	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
   1425		return -EMSGSIZE;
   1426
   1427	if (ua) {
   1428		if (!tipc_uaddr_valid(ua, m->msg_namelen))
   1429			return -EINVAL;
   1430		atype = ua->addrtype;
   1431	}
   1432
   1433	/* If socket belongs to a communication group follow other paths */
   1434	if (grp) {
   1435		if (!ua)
   1436			return tipc_send_group_bcast(sock, m, dlen, timeout);
   1437		if (atype == TIPC_SERVICE_ADDR)
   1438			return tipc_send_group_anycast(sock, m, dlen, timeout);
   1439		if (atype == TIPC_SOCKET_ADDR)
   1440			return tipc_send_group_unicast(sock, m, dlen, timeout);
   1441		if (atype == TIPC_SERVICE_RANGE)
   1442			return tipc_send_group_mcast(sock, m, dlen, timeout);
   1443		return -EINVAL;
   1444	}
   1445
   1446	if (!ua) {
   1447		ua = (struct tipc_uaddr *)&tsk->peer;
   1448		if (!syn && ua->family != AF_TIPC)
   1449			return -EDESTADDRREQ;
   1450		atype = ua->addrtype;
   1451	}
   1452
   1453	if (unlikely(syn)) {
   1454		if (sk->sk_state == TIPC_LISTEN)
   1455			return -EPIPE;
   1456		if (sk->sk_state != TIPC_OPEN)
   1457			return -EISCONN;
   1458		if (tsk->published)
   1459			return -EOPNOTSUPP;
   1460		if (atype == TIPC_SERVICE_ADDR)
   1461			tsk->conn_addrtype = atype;
   1462		msg_set_syn(hdr, 1);
   1463	}
   1464
   1465	memset(&skaddr, 0, sizeof(skaddr));
   1466
   1467	/* Determine destination */
   1468	if (atype == TIPC_SERVICE_RANGE) {
   1469		return tipc_sendmcast(sock, ua, m, dlen, timeout);
   1470	} else if (atype == TIPC_SERVICE_ADDR) {
   1471		skaddr.node = ua->lookup_node;
   1472		ua->scope = tipc_node2scope(skaddr.node);
   1473		if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
   1474			return -EHOSTUNREACH;
   1475	} else if (atype == TIPC_SOCKET_ADDR) {
   1476		skaddr = ua->sk;
   1477	} else {
   1478		return -EINVAL;
   1479	}
   1480
   1481	/* Block or return if destination link is congested */
   1482	rc = tipc_wait_for_cond(sock, &timeout,
   1483				!tipc_dest_find(clinks, skaddr.node, 0));
   1484	if (unlikely(rc))
   1485		return rc;
   1486
   1487	/* Finally build message header */
   1488	msg_set_destnode(hdr, skaddr.node);
   1489	msg_set_destport(hdr, skaddr.ref);
   1490	if (atype == TIPC_SERVICE_ADDR) {
   1491		msg_set_type(hdr, TIPC_NAMED_MSG);
   1492		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
   1493		msg_set_nametype(hdr, ua->sa.type);
   1494		msg_set_nameinst(hdr, ua->sa.instance);
   1495		msg_set_lookup_scope(hdr, ua->scope);
   1496	} else { /* TIPC_SOCKET_ADDR */
   1497		msg_set_type(hdr, TIPC_DIRECT_MSG);
   1498		msg_set_lookup_scope(hdr, 0);
   1499		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
   1500	}
   1501
   1502	/* Add message body */
   1503	__skb_queue_head_init(&pkts);
   1504	mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
   1505	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
   1506	if (unlikely(rc != dlen))
   1507		return rc;
   1508	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
   1509		__skb_queue_purge(&pkts);
   1510		return -ENOMEM;
   1511	}
   1512
   1513	/* Send message */
   1514	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
   1515	rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
   1516	if (unlikely(rc == -ELINKCONG)) {
   1517		tipc_dest_push(clinks, skaddr.node, 0);
   1518		tsk->cong_link_cnt++;
   1519		rc = 0;
   1520	}
   1521
   1522	if (unlikely(syn && !rc)) {
   1523		tipc_set_sk_state(sk, TIPC_CONNECTING);
   1524		if (dlen && timeout) {
   1525			timeout = msecs_to_jiffies(timeout);
   1526			tipc_wait_for_connect(sock, &timeout);
   1527		}
   1528	}
   1529
   1530	return rc ? rc : dlen;
   1531}
   1532
   1533/**
   1534 * tipc_sendstream - send stream-oriented data
   1535 * @sock: socket structure
   1536 * @m: data to send
   1537 * @dsz: total length of data to be transmitted
   1538 *
   1539 * Used for SOCK_STREAM data.
   1540 *
   1541 * Return: the number of bytes sent on success (or partial success),
   1542 * or errno if no data sent
   1543 */
   1544static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
   1545{
   1546	struct sock *sk = sock->sk;
   1547	int ret;
   1548
   1549	lock_sock(sk);
   1550	ret = __tipc_sendstream(sock, m, dsz);
   1551	release_sock(sk);
   1552
   1553	return ret;
   1554}
   1555
   1556static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
   1557{
   1558	struct sock *sk = sock->sk;
   1559	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
   1560	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
   1561	struct sk_buff_head *txq = &sk->sk_write_queue;
   1562	struct tipc_sock *tsk = tipc_sk(sk);
   1563	struct tipc_msg *hdr = &tsk->phdr;
   1564	struct net *net = sock_net(sk);
   1565	struct sk_buff *skb;
   1566	u32 dnode = tsk_peer_node(tsk);
   1567	int maxnagle = tsk->maxnagle;
   1568	int maxpkt = tsk->max_pkt;
   1569	int send, sent = 0;
   1570	int blocks, rc = 0;
   1571
   1572	if (unlikely(dlen > INT_MAX))
   1573		return -EMSGSIZE;
   1574
   1575	/* Handle implicit connection setup */
   1576	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
   1577		rc = __tipc_sendmsg(sock, m, dlen);
   1578		if (dlen && dlen == rc) {
   1579			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
   1580			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
   1581		}
   1582		return rc;
   1583	}
   1584
   1585	do {
   1586		rc = tipc_wait_for_cond(sock, &timeout,
   1587					(!tsk->cong_link_cnt &&
   1588					 !tsk_conn_cong(tsk) &&
   1589					 tipc_sk_connected(sk)));
   1590		if (unlikely(rc))
   1591			break;
   1592		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
   1593		blocks = tsk->snd_backlog;
   1594		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
   1595		    send <= maxnagle) {
   1596			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
   1597			if (unlikely(rc < 0))
   1598				break;
   1599			blocks += rc;
   1600			tsk->msg_acc++;
   1601			if (blocks <= 64 && tsk->expect_ack) {
   1602				tsk->snd_backlog = blocks;
   1603				sent += send;
   1604				break;
   1605			} else if (blocks > 64) {
   1606				tsk->pkt_cnt += skb_queue_len(txq);
   1607			} else {
   1608				skb = skb_peek_tail(txq);
   1609				if (skb) {
   1610					msg_set_ack_required(buf_msg(skb));
   1611					tsk->expect_ack = true;
   1612				} else {
   1613					tsk->expect_ack = false;
   1614				}
   1615				tsk->msg_acc = 0;
   1616				tsk->pkt_cnt = 0;
   1617			}
   1618		} else {
   1619			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
   1620			if (unlikely(rc != send))
   1621				break;
   1622			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
   1623		}
   1624		trace_tipc_sk_sendstream(sk, skb_peek(txq),
   1625					 TIPC_DUMP_SK_SNDQ, " ");
   1626		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
   1627		if (unlikely(rc == -ELINKCONG)) {
   1628			tsk->cong_link_cnt = 1;
   1629			rc = 0;
   1630		}
   1631		if (likely(!rc)) {
   1632			tsk->snt_unacked += blocks;
   1633			tsk->snd_backlog = 0;
   1634			sent += send;
   1635		}
   1636	} while (sent < dlen && !rc);
   1637
   1638	return sent ? sent : rc;
   1639}
   1640
   1641/**
   1642 * tipc_send_packet - send a connection-oriented message
   1643 * @sock: socket structure
   1644 * @m: message to send
   1645 * @dsz: length of data to be transmitted
   1646 *
   1647 * Used for SOCK_SEQPACKET messages.
   1648 *
   1649 * Return: the number of bytes sent on success, or errno otherwise
   1650 */
   1651static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
   1652{
   1653	if (dsz > TIPC_MAX_USER_MSG_SIZE)
   1654		return -EMSGSIZE;
   1655
   1656	return tipc_sendstream(sock, m, dsz);
   1657}
   1658
   1659/* tipc_sk_finish_conn - complete the setup of a connection
   1660 */
   1661static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
   1662				u32 peer_node)
   1663{
   1664	struct sock *sk = &tsk->sk;
   1665	struct net *net = sock_net(sk);
   1666	struct tipc_msg *msg = &tsk->phdr;
   1667
   1668	msg_set_syn(msg, 0);
   1669	msg_set_destnode(msg, peer_node);
   1670	msg_set_destport(msg, peer_port);
   1671	msg_set_type(msg, TIPC_CONN_MSG);
   1672	msg_set_lookup_scope(msg, 0);
   1673	msg_set_hdr_sz(msg, SHORT_H_SIZE);
   1674
   1675	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
   1676	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
   1677	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
   1678	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
   1679	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
   1680	tsk_set_nagle(tsk);
   1681	__skb_queue_purge(&sk->sk_write_queue);
   1682	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
   1683		return;
   1684
   1685	/* Fall back to message based flow control */
   1686	tsk->rcv_win = FLOWCTL_MSG_WIN;
   1687	tsk->snd_win = FLOWCTL_MSG_WIN;
   1688}
   1689
   1690/**
   1691 * tipc_sk_set_orig_addr - capture sender's address for received message
   1692 * @m: descriptor for message info
   1693 * @skb: received message
   1694 *
   1695 * Note: Address is not captured if not requested by receiver.
   1696 */
   1697static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
   1698{
   1699	DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
   1700	struct tipc_msg *hdr = buf_msg(skb);
   1701
   1702	if (!srcaddr)
   1703		return;
   1704
   1705	srcaddr->sock.family = AF_TIPC;
   1706	srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
   1707	srcaddr->sock.scope = 0;
   1708	srcaddr->sock.addr.id.ref = msg_origport(hdr);
   1709	srcaddr->sock.addr.id.node = msg_orignode(hdr);
   1710	srcaddr->sock.addr.name.domain = 0;
   1711	m->msg_namelen = sizeof(struct sockaddr_tipc);
   1712
   1713	if (!msg_in_group(hdr))
   1714		return;
   1715
   1716	/* Group message users may also want to know sending member's id */
   1717	srcaddr->member.family = AF_TIPC;
   1718	srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
   1719	srcaddr->member.scope = 0;
   1720	srcaddr->member.addr.name.name.type = msg_nametype(hdr);
   1721	srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
   1722	srcaddr->member.addr.name.domain = 0;
   1723	m->msg_namelen = sizeof(*srcaddr);
   1724}
   1725
   1726/**
   1727 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
   1728 * @m: descriptor for message info
   1729 * @skb: received message buffer
   1730 * @tsk: TIPC port associated with message
   1731 *
   1732 * Note: Ancillary data is not captured if not requested by receiver.
   1733 *
   1734 * Return: 0 if successful, otherwise errno
   1735 */
   1736static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
   1737				 struct tipc_sock *tsk)
   1738{
   1739	struct tipc_msg *hdr;
   1740	u32 data[3] = {0,};
   1741	bool has_addr;
   1742	int dlen, rc;
   1743
   1744	if (likely(m->msg_controllen == 0))
   1745		return 0;
   1746
   1747	hdr = buf_msg(skb);
   1748	dlen = msg_data_sz(hdr);
   1749
   1750	/* Capture errored message object, if any */
   1751	if (msg_errcode(hdr)) {
   1752		if (skb_linearize(skb))
   1753			return -ENOMEM;
   1754		hdr = buf_msg(skb);
   1755		data[0] = msg_errcode(hdr);
   1756		data[1] = dlen;
   1757		rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
   1758		if (rc || !dlen)
   1759			return rc;
   1760		rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
   1761		if (rc)
   1762			return rc;
   1763	}
   1764
   1765	/* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
   1766	switch (msg_type(hdr)) {
   1767	case TIPC_NAMED_MSG:
   1768		has_addr = true;
   1769		data[0] = msg_nametype(hdr);
   1770		data[1] = msg_namelower(hdr);
   1771		data[2] = data[1];
   1772		break;
   1773	case TIPC_MCAST_MSG:
   1774		has_addr = true;
   1775		data[0] = msg_nametype(hdr);
   1776		data[1] = msg_namelower(hdr);
   1777		data[2] = msg_nameupper(hdr);
   1778		break;
   1779	case TIPC_CONN_MSG:
   1780		has_addr = !!tsk->conn_addrtype;
   1781		data[0] = msg_nametype(&tsk->phdr);
   1782		data[1] = msg_nameinst(&tsk->phdr);
   1783		data[2] = data[1];
   1784		break;
   1785	default:
   1786		has_addr = false;
   1787	}
   1788	if (!has_addr)
   1789		return 0;
   1790	return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
   1791}
   1792
   1793static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
   1794{
   1795	struct sock *sk = &tsk->sk;
   1796	struct sk_buff *skb = NULL;
   1797	struct tipc_msg *msg;
   1798	u32 peer_port = tsk_peer_port(tsk);
   1799	u32 dnode = tsk_peer_node(tsk);
   1800
   1801	if (!tipc_sk_connected(sk))
   1802		return NULL;
   1803	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
   1804			      dnode, tsk_own_node(tsk), peer_port,
   1805			      tsk->portid, TIPC_OK);
   1806	if (!skb)
   1807		return NULL;
   1808	msg = buf_msg(skb);
   1809	msg_set_conn_ack(msg, tsk->rcv_unacked);
   1810	tsk->rcv_unacked = 0;
   1811
   1812	/* Adjust to and advertize the correct window limit */
   1813	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
   1814		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
   1815		msg_set_adv_win(msg, tsk->rcv_win);
   1816	}
   1817	return skb;
   1818}
   1819
   1820static void tipc_sk_send_ack(struct tipc_sock *tsk)
   1821{
   1822	struct sk_buff *skb;
   1823
   1824	skb = tipc_sk_build_ack(tsk);
   1825	if (!skb)
   1826		return;
   1827
   1828	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
   1829			   msg_link_selector(buf_msg(skb)));
   1830}
   1831
   1832static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
   1833{
   1834	struct sock *sk = sock->sk;
   1835	DEFINE_WAIT_FUNC(wait, woken_wake_function);
   1836	long timeo = *timeop;
   1837	int err = sock_error(sk);
   1838
   1839	if (err)
   1840		return err;
   1841
   1842	for (;;) {
   1843		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
   1844			if (sk->sk_shutdown & RCV_SHUTDOWN) {
   1845				err = -ENOTCONN;
   1846				break;
   1847			}
   1848			add_wait_queue(sk_sleep(sk), &wait);
   1849			release_sock(sk);
   1850			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
   1851			sched_annotate_sleep();
   1852			lock_sock(sk);
   1853			remove_wait_queue(sk_sleep(sk), &wait);
   1854		}
   1855		err = 0;
   1856		if (!skb_queue_empty(&sk->sk_receive_queue))
   1857			break;
   1858		err = -EAGAIN;
   1859		if (!timeo)
   1860			break;
   1861		err = sock_intr_errno(timeo);
   1862		if (signal_pending(current))
   1863			break;
   1864
   1865		err = sock_error(sk);
   1866		if (err)
   1867			break;
   1868	}
   1869	*timeop = timeo;
   1870	return err;
   1871}
   1872
   1873/**
   1874 * tipc_recvmsg - receive packet-oriented message
   1875 * @sock: network socket
   1876 * @m: descriptor for message info
   1877 * @buflen: length of user buffer area
   1878 * @flags: receive flags
   1879 *
   1880 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
   1881 * If the complete message doesn't fit in user area, truncate it.
   1882 *
   1883 * Return: size of returned message data, errno otherwise
   1884 */
   1885static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
   1886			size_t buflen,	int flags)
   1887{
   1888	struct sock *sk = sock->sk;
   1889	bool connected = !tipc_sk_type_connectionless(sk);
   1890	struct tipc_sock *tsk = tipc_sk(sk);
   1891	int rc, err, hlen, dlen, copy;
   1892	struct tipc_skb_cb *skb_cb;
   1893	struct sk_buff_head xmitq;
   1894	struct tipc_msg *hdr;
   1895	struct sk_buff *skb;
   1896	bool grp_evt;
   1897	long timeout;
   1898
   1899	/* Catch invalid receive requests */
   1900	if (unlikely(!buflen))
   1901		return -EINVAL;
   1902
   1903	lock_sock(sk);
   1904	if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
   1905		rc = -ENOTCONN;
   1906		goto exit;
   1907	}
   1908	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
   1909
   1910	/* Step rcv queue to first msg with data or error; wait if necessary */
   1911	do {
   1912		rc = tipc_wait_for_rcvmsg(sock, &timeout);
   1913		if (unlikely(rc))
   1914			goto exit;
   1915		skb = skb_peek(&sk->sk_receive_queue);
   1916		skb_cb = TIPC_SKB_CB(skb);
   1917		hdr = buf_msg(skb);
   1918		dlen = msg_data_sz(hdr);
   1919		hlen = msg_hdr_sz(hdr);
   1920		err = msg_errcode(hdr);
   1921		grp_evt = msg_is_grp_evt(hdr);
   1922		if (likely(dlen || err))
   1923			break;
   1924		tsk_advance_rx_queue(sk);
   1925	} while (1);
   1926
   1927	/* Collect msg meta data, including error code and rejected data */
   1928	tipc_sk_set_orig_addr(m, skb);
   1929	rc = tipc_sk_anc_data_recv(m, skb, tsk);
   1930	if (unlikely(rc))
   1931		goto exit;
   1932	hdr = buf_msg(skb);
   1933
   1934	/* Capture data if non-error msg, otherwise just set return value */
   1935	if (likely(!err)) {
   1936		int offset = skb_cb->bytes_read;
   1937
   1938		copy = min_t(int, dlen - offset, buflen);
   1939		rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
   1940		if (unlikely(rc))
   1941			goto exit;
   1942		if (unlikely(offset + copy < dlen)) {
   1943			if (flags & MSG_EOR) {
   1944				if (!(flags & MSG_PEEK))
   1945					skb_cb->bytes_read = offset + copy;
   1946			} else {
   1947				m->msg_flags |= MSG_TRUNC;
   1948				skb_cb->bytes_read = 0;
   1949			}
   1950		} else {
   1951			if (flags & MSG_EOR)
   1952				m->msg_flags |= MSG_EOR;
   1953			skb_cb->bytes_read = 0;
   1954		}
   1955	} else {
   1956		copy = 0;
   1957		rc = 0;
   1958		if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
   1959			rc = -ECONNRESET;
   1960			goto exit;
   1961		}
   1962	}
   1963
   1964	/* Mark message as group event if applicable */
   1965	if (unlikely(grp_evt)) {
   1966		if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
   1967			m->msg_flags |= MSG_EOR;
   1968		m->msg_flags |= MSG_OOB;
   1969		copy = 0;
   1970	}
   1971
   1972	/* Caption of data or error code/rejected data was successful */
   1973	if (unlikely(flags & MSG_PEEK))
   1974		goto exit;
   1975
   1976	/* Send group flow control advertisement when applicable */
   1977	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
   1978		__skb_queue_head_init(&xmitq);
   1979		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
   1980					  msg_orignode(hdr), msg_origport(hdr),
   1981					  &xmitq);
   1982		tipc_node_distr_xmit(sock_net(sk), &xmitq);
   1983	}
   1984
   1985	if (skb_cb->bytes_read)
   1986		goto exit;
   1987
   1988	tsk_advance_rx_queue(sk);
   1989
   1990	if (likely(!connected))
   1991		goto exit;
   1992
   1993	/* Send connection flow control advertisement when applicable */
   1994	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
   1995	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
   1996		tipc_sk_send_ack(tsk);
   1997exit:
   1998	release_sock(sk);
   1999	return rc ? rc : copy;
   2000}
   2001
   2002/**
   2003 * tipc_recvstream - receive stream-oriented data
   2004 * @sock: network socket
   2005 * @m: descriptor for message info
   2006 * @buflen: total size of user buffer area
   2007 * @flags: receive flags
   2008 *
   2009 * Used for SOCK_STREAM messages only.  If not enough data is available
   2010 * will optionally wait for more; never truncates data.
   2011 *
   2012 * Return: size of returned message data, errno otherwise
   2013 */
   2014static int tipc_recvstream(struct socket *sock, struct msghdr *m,
   2015			   size_t buflen, int flags)
   2016{
   2017	struct sock *sk = sock->sk;
   2018	struct tipc_sock *tsk = tipc_sk(sk);
   2019	struct sk_buff *skb;
   2020	struct tipc_msg *hdr;
   2021	struct tipc_skb_cb *skb_cb;
   2022	bool peek = flags & MSG_PEEK;
   2023	int offset, required, copy, copied = 0;
   2024	int hlen, dlen, err, rc;
   2025	long timeout;
   2026
   2027	/* Catch invalid receive attempts */
   2028	if (unlikely(!buflen))
   2029		return -EINVAL;
   2030
   2031	lock_sock(sk);
   2032
   2033	if (unlikely(sk->sk_state == TIPC_OPEN)) {
   2034		rc = -ENOTCONN;
   2035		goto exit;
   2036	}
   2037	required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
   2038	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
   2039
   2040	do {
   2041		/* Look at first msg in receive queue; wait if necessary */
   2042		rc = tipc_wait_for_rcvmsg(sock, &timeout);
   2043		if (unlikely(rc))
   2044			break;
   2045		skb = skb_peek(&sk->sk_receive_queue);
   2046		skb_cb = TIPC_SKB_CB(skb);
   2047		hdr = buf_msg(skb);
   2048		dlen = msg_data_sz(hdr);
   2049		hlen = msg_hdr_sz(hdr);
   2050		err = msg_errcode(hdr);
   2051
   2052		/* Discard any empty non-errored (SYN-) message */
   2053		if (unlikely(!dlen && !err)) {
   2054			tsk_advance_rx_queue(sk);
   2055			continue;
   2056		}
   2057
   2058		/* Collect msg meta data, incl. error code and rejected data */
   2059		if (!copied) {
   2060			tipc_sk_set_orig_addr(m, skb);
   2061			rc = tipc_sk_anc_data_recv(m, skb, tsk);
   2062			if (rc)
   2063				break;
   2064			hdr = buf_msg(skb);
   2065		}
   2066
   2067		/* Copy data if msg ok, otherwise return error/partial data */
   2068		if (likely(!err)) {
   2069			offset = skb_cb->bytes_read;
   2070			copy = min_t(int, dlen - offset, buflen - copied);
   2071			rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
   2072			if (unlikely(rc))
   2073				break;
   2074			copied += copy;
   2075			offset += copy;
   2076			if (unlikely(offset < dlen)) {
   2077				if (!peek)
   2078					skb_cb->bytes_read = offset;
   2079				break;
   2080			}
   2081		} else {
   2082			rc = 0;
   2083			if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
   2084				rc = -ECONNRESET;
   2085			if (copied || rc)
   2086				break;
   2087		}
   2088
   2089		if (unlikely(peek))
   2090			break;
   2091
   2092		tsk_advance_rx_queue(sk);
   2093
   2094		/* Send connection flow control advertisement when applicable */
   2095		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
   2096		if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
   2097			tipc_sk_send_ack(tsk);
   2098
   2099		/* Exit if all requested data or FIN/error received */
   2100		if (copied == buflen || err)
   2101			break;
   2102
   2103	} while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
   2104exit:
   2105	release_sock(sk);
   2106	return copied ? copied : rc;
   2107}
   2108
   2109/**
   2110 * tipc_write_space - wake up thread if port congestion is released
   2111 * @sk: socket
   2112 */
   2113static void tipc_write_space(struct sock *sk)
   2114{
   2115	struct socket_wq *wq;
   2116
   2117	rcu_read_lock();
   2118	wq = rcu_dereference(sk->sk_wq);
   2119	if (skwq_has_sleeper(wq))
   2120		wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
   2121						EPOLLWRNORM | EPOLLWRBAND);
   2122	rcu_read_unlock();
   2123}
   2124
   2125/**
   2126 * tipc_data_ready - wake up threads to indicate messages have been received
   2127 * @sk: socket
   2128 */
   2129static void tipc_data_ready(struct sock *sk)
   2130{
   2131	struct socket_wq *wq;
   2132
   2133	rcu_read_lock();
   2134	wq = rcu_dereference(sk->sk_wq);
   2135	if (skwq_has_sleeper(wq))
   2136		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
   2137						EPOLLRDNORM | EPOLLRDBAND);
   2138	rcu_read_unlock();
   2139}
   2140
   2141static void tipc_sock_destruct(struct sock *sk)
   2142{
   2143	__skb_queue_purge(&sk->sk_receive_queue);
   2144}
   2145
   2146static void tipc_sk_proto_rcv(struct sock *sk,
   2147			      struct sk_buff_head *inputq,
   2148			      struct sk_buff_head *xmitq)
   2149{
   2150	struct sk_buff *skb = __skb_dequeue(inputq);
   2151	struct tipc_sock *tsk = tipc_sk(sk);
   2152	struct tipc_msg *hdr = buf_msg(skb);
   2153	struct tipc_group *grp = tsk->group;
   2154	bool wakeup = false;
   2155
   2156	switch (msg_user(hdr)) {
   2157	case CONN_MANAGER:
   2158		tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
   2159		return;
   2160	case SOCK_WAKEUP:
   2161		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
   2162		/* coupled with smp_rmb() in tipc_wait_for_cond() */
   2163		smp_wmb();
   2164		tsk->cong_link_cnt--;
   2165		wakeup = true;
   2166		tipc_sk_push_backlog(tsk, false);
   2167		break;
   2168	case GROUP_PROTOCOL:
   2169		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
   2170		break;
   2171	case TOP_SRV:
   2172		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
   2173				      hdr, inputq, xmitq);
   2174		break;
   2175	default:
   2176		break;
   2177	}
   2178
   2179	if (wakeup)
   2180		sk->sk_write_space(sk);
   2181
   2182	kfree_skb(skb);
   2183}
   2184
   2185/**
   2186 * tipc_sk_filter_connect - check incoming message for a connection-based socket
   2187 * @tsk: TIPC socket
   2188 * @skb: pointer to message buffer.
   2189 * @xmitq: for Nagle ACK if any
   2190 * Return: true if message should be added to receive queue, false otherwise
   2191 */
   2192static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
   2193				   struct sk_buff_head *xmitq)
   2194{
   2195	struct sock *sk = &tsk->sk;
   2196	struct net *net = sock_net(sk);
   2197	struct tipc_msg *hdr = buf_msg(skb);
   2198	bool con_msg = msg_connected(hdr);
   2199	u32 pport = tsk_peer_port(tsk);
   2200	u32 pnode = tsk_peer_node(tsk);
   2201	u32 oport = msg_origport(hdr);
   2202	u32 onode = msg_orignode(hdr);
   2203	int err = msg_errcode(hdr);
   2204	unsigned long delay;
   2205
   2206	if (unlikely(msg_mcast(hdr)))
   2207		return false;
   2208	tsk->oneway = 0;
   2209
   2210	switch (sk->sk_state) {
   2211	case TIPC_CONNECTING:
   2212		/* Setup ACK */
   2213		if (likely(con_msg)) {
   2214			if (err)
   2215				break;
   2216			tipc_sk_finish_conn(tsk, oport, onode);
   2217			msg_set_importance(&tsk->phdr, msg_importance(hdr));
   2218			/* ACK+ message with data is added to receive queue */
   2219			if (msg_data_sz(hdr))
   2220				return true;
   2221			/* Empty ACK-, - wake up sleeping connect() and drop */
   2222			sk->sk_state_change(sk);
   2223			msg_set_dest_droppable(hdr, 1);
   2224			return false;
   2225		}
   2226		/* Ignore connectionless message if not from listening socket */
   2227		if (oport != pport || onode != pnode)
   2228			return false;
   2229
   2230		/* Rejected SYN */
   2231		if (err != TIPC_ERR_OVERLOAD)
   2232			break;
   2233
   2234		/* Prepare for new setup attempt if we have a SYN clone */
   2235		if (skb_queue_empty(&sk->sk_write_queue))
   2236			break;
   2237		get_random_bytes(&delay, 2);
   2238		delay %= (tsk->conn_timeout / 4);
   2239		delay = msecs_to_jiffies(delay + 100);
   2240		sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
   2241		return false;
   2242	case TIPC_OPEN:
   2243	case TIPC_DISCONNECTING:
   2244		return false;
   2245	case TIPC_LISTEN:
   2246		/* Accept only SYN message */
   2247		if (!msg_is_syn(hdr) &&
   2248		    tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
   2249			return false;
   2250		if (!con_msg && !err)
   2251			return true;
   2252		return false;
   2253	case TIPC_ESTABLISHED:
   2254		if (!skb_queue_empty(&sk->sk_write_queue))
   2255			tipc_sk_push_backlog(tsk, false);
   2256		/* Accept only connection-based messages sent by peer */
   2257		if (likely(con_msg && !err && pport == oport &&
   2258			   pnode == onode)) {
   2259			if (msg_ack_required(hdr)) {
   2260				struct sk_buff *skb;
   2261
   2262				skb = tipc_sk_build_ack(tsk);
   2263				if (skb) {
   2264					msg_set_nagle_ack(buf_msg(skb));
   2265					__skb_queue_tail(xmitq, skb);
   2266				}
   2267			}
   2268			return true;
   2269		}
   2270		if (!tsk_peer_msg(tsk, hdr))
   2271			return false;
   2272		if (!err)
   2273			return true;
   2274		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
   2275		tipc_node_remove_conn(net, pnode, tsk->portid);
   2276		sk->sk_state_change(sk);
   2277		return true;
   2278	default:
   2279		pr_err("Unknown sk_state %u\n", sk->sk_state);
   2280	}
   2281	/* Abort connection setup attempt */
   2282	tipc_set_sk_state(sk, TIPC_DISCONNECTING);
   2283	sk->sk_err = ECONNREFUSED;
   2284	sk->sk_state_change(sk);
   2285	return true;
   2286}
   2287
   2288/**
   2289 * rcvbuf_limit - get proper overload limit of socket receive queue
   2290 * @sk: socket
   2291 * @skb: message
   2292 *
   2293 * For connection oriented messages, irrespective of importance,
   2294 * default queue limit is 2 MB.
   2295 *
   2296 * For connectionless messages, queue limits are based on message
   2297 * importance as follows:
   2298 *
   2299 * TIPC_LOW_IMPORTANCE       (2 MB)
   2300 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
   2301 * TIPC_HIGH_IMPORTANCE      (8 MB)
   2302 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
   2303 *
   2304 * Return: overload limit according to corresponding message importance
   2305 */
   2306static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
   2307{
   2308	struct tipc_sock *tsk = tipc_sk(sk);
   2309	struct tipc_msg *hdr = buf_msg(skb);
   2310
   2311	if (unlikely(msg_in_group(hdr)))
   2312		return READ_ONCE(sk->sk_rcvbuf);
   2313
   2314	if (unlikely(!msg_connected(hdr)))
   2315		return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
   2316
   2317	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
   2318		return READ_ONCE(sk->sk_rcvbuf);
   2319
   2320	return FLOWCTL_MSG_LIM;
   2321}
   2322
   2323/**
   2324 * tipc_sk_filter_rcv - validate incoming message
   2325 * @sk: socket
   2326 * @skb: pointer to message.
   2327 * @xmitq: output message area (FIXME)
   2328 *
   2329 * Enqueues message on receive queue if acceptable; optionally handles
   2330 * disconnect indication for a connected socket.
   2331 *
   2332 * Called with socket lock already taken
   2333 */
   2334static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
   2335			       struct sk_buff_head *xmitq)
   2336{
   2337	bool sk_conn = !tipc_sk_type_connectionless(sk);
   2338	struct tipc_sock *tsk = tipc_sk(sk);
   2339	struct tipc_group *grp = tsk->group;
   2340	struct tipc_msg *hdr = buf_msg(skb);
   2341	struct net *net = sock_net(sk);
   2342	struct sk_buff_head inputq;
   2343	int mtyp = msg_type(hdr);
   2344	int limit, err = TIPC_OK;
   2345
   2346	trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
   2347	TIPC_SKB_CB(skb)->bytes_read = 0;
   2348	__skb_queue_head_init(&inputq);
   2349	__skb_queue_tail(&inputq, skb);
   2350
   2351	if (unlikely(!msg_isdata(hdr)))
   2352		tipc_sk_proto_rcv(sk, &inputq, xmitq);
   2353
   2354	if (unlikely(grp))
   2355		tipc_group_filter_msg(grp, &inputq, xmitq);
   2356
   2357	if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
   2358		tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
   2359
   2360	/* Validate and add to receive buffer if there is space */
   2361	while ((skb = __skb_dequeue(&inputq))) {
   2362		hdr = buf_msg(skb);
   2363		limit = rcvbuf_limit(sk, skb);
   2364		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
   2365		    (!sk_conn && msg_connected(hdr)) ||
   2366		    (!grp && msg_in_group(hdr)))
   2367			err = TIPC_ERR_NO_PORT;
   2368		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
   2369			trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
   2370					   "err_overload2!");
   2371			atomic_inc(&sk->sk_drops);
   2372			err = TIPC_ERR_OVERLOAD;
   2373		}
   2374
   2375		if (unlikely(err)) {
   2376			if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
   2377				trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
   2378						      "@filter_rcv!");
   2379				__skb_queue_tail(xmitq, skb);
   2380			}
   2381			err = TIPC_OK;
   2382			continue;
   2383		}
   2384		__skb_queue_tail(&sk->sk_receive_queue, skb);
   2385		skb_set_owner_r(skb, sk);
   2386		trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
   2387					 "rcvq >90% allocated!");
   2388		sk->sk_data_ready(sk);
   2389	}
   2390}
   2391
   2392/**
   2393 * tipc_sk_backlog_rcv - handle incoming message from backlog queue
   2394 * @sk: socket
   2395 * @skb: message
   2396 *
   2397 * Caller must hold socket lock
   2398 */
   2399static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
   2400{
   2401	unsigned int before = sk_rmem_alloc_get(sk);
   2402	struct sk_buff_head xmitq;
   2403	unsigned int added;
   2404
   2405	__skb_queue_head_init(&xmitq);
   2406
   2407	tipc_sk_filter_rcv(sk, skb, &xmitq);
   2408	added = sk_rmem_alloc_get(sk) - before;
   2409	atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
   2410
   2411	/* Send pending response/rejected messages, if any */
   2412	tipc_node_distr_xmit(sock_net(sk), &xmitq);
   2413	return 0;
   2414}
   2415
   2416/**
   2417 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
   2418 *                   inputq and try adding them to socket or backlog queue
   2419 * @inputq: list of incoming buffers with potentially different destinations
   2420 * @sk: socket where the buffers should be enqueued
   2421 * @dport: port number for the socket
   2422 * @xmitq: output queue
   2423 *
   2424 * Caller must hold socket lock
   2425 */
   2426static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
   2427			    u32 dport, struct sk_buff_head *xmitq)
   2428{
   2429	unsigned long time_limit = jiffies + usecs_to_jiffies(20000);
   2430	struct sk_buff *skb;
   2431	unsigned int lim;
   2432	atomic_t *dcnt;
   2433	u32 onode;
   2434
   2435	while (skb_queue_len(inputq)) {
   2436		if (unlikely(time_after_eq(jiffies, time_limit)))
   2437			return;
   2438
   2439		skb = tipc_skb_dequeue(inputq, dport);
   2440		if (unlikely(!skb))
   2441			return;
   2442
   2443		/* Add message directly to receive queue if possible */
   2444		if (!sock_owned_by_user(sk)) {
   2445			tipc_sk_filter_rcv(sk, skb, xmitq);
   2446			continue;
   2447		}
   2448
   2449		/* Try backlog, compensating for double-counted bytes */
   2450		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
   2451		if (!sk->sk_backlog.len)
   2452			atomic_set(dcnt, 0);
   2453		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
   2454		if (likely(!sk_add_backlog(sk, skb, lim))) {
   2455			trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
   2456						 "bklg & rcvq >90% allocated!");
   2457			continue;
   2458		}
   2459
   2460		trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
   2461		/* Overload => reject message back to sender */
   2462		onode = tipc_own_addr(sock_net(sk));
   2463		atomic_inc(&sk->sk_drops);
   2464		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
   2465			trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
   2466					      "@sk_enqueue!");
   2467			__skb_queue_tail(xmitq, skb);
   2468		}
   2469		break;
   2470	}
   2471}
   2472
   2473/**
   2474 * tipc_sk_rcv - handle a chain of incoming buffers
   2475 * @net: the associated network namespace
   2476 * @inputq: buffer list containing the buffers
   2477 * Consumes all buffers in list until inputq is empty
   2478 * Note: may be called in multiple threads referring to the same queue
   2479 */
   2480void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
   2481{
   2482	struct sk_buff_head xmitq;
   2483	u32 dnode, dport = 0;
   2484	int err;
   2485	struct tipc_sock *tsk;
   2486	struct sock *sk;
   2487	struct sk_buff *skb;
   2488
   2489	__skb_queue_head_init(&xmitq);
   2490	while (skb_queue_len(inputq)) {
   2491		dport = tipc_skb_peek_port(inputq, dport);
   2492		tsk = tipc_sk_lookup(net, dport);
   2493
   2494		if (likely(tsk)) {
   2495			sk = &tsk->sk;
   2496			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
   2497				tipc_sk_enqueue(inputq, sk, dport, &xmitq);
   2498				spin_unlock_bh(&sk->sk_lock.slock);
   2499			}
   2500			/* Send pending response/rejected messages, if any */
   2501			tipc_node_distr_xmit(sock_net(sk), &xmitq);
   2502			sock_put(sk);
   2503			continue;
   2504		}
   2505		/* No destination socket => dequeue skb if still there */
   2506		skb = tipc_skb_dequeue(inputq, dport);
   2507		if (!skb)
   2508			return;
   2509
   2510		/* Try secondary lookup if unresolved named message */
   2511		err = TIPC_ERR_NO_PORT;
   2512		if (tipc_msg_lookup_dest(net, skb, &err))
   2513			goto xmit;
   2514
   2515		/* Prepare for message rejection */
   2516		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
   2517			continue;
   2518
   2519		trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
   2520xmit:
   2521		dnode = msg_destnode(buf_msg(skb));
   2522		tipc_node_xmit_skb(net, skb, dnode, dport);
   2523	}
   2524}
   2525
   2526static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
   2527{
   2528	DEFINE_WAIT_FUNC(wait, woken_wake_function);
   2529	struct sock *sk = sock->sk;
   2530	int done;
   2531
   2532	do {
   2533		int err = sock_error(sk);
   2534		if (err)
   2535			return err;
   2536		if (!*timeo_p)
   2537			return -ETIMEDOUT;
   2538		if (signal_pending(current))
   2539			return sock_intr_errno(*timeo_p);
   2540		if (sk->sk_state == TIPC_DISCONNECTING)
   2541			break;
   2542
   2543		add_wait_queue(sk_sleep(sk), &wait);
   2544		done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
   2545				     &wait);
   2546		remove_wait_queue(sk_sleep(sk), &wait);
   2547	} while (!done);
   2548	return 0;
   2549}
   2550
   2551static bool tipc_sockaddr_is_sane(struct sockaddr_tipc *addr)
   2552{
   2553	if (addr->family != AF_TIPC)
   2554		return false;
   2555	if (addr->addrtype == TIPC_SERVICE_RANGE)
   2556		return (addr->addr.nameseq.lower <= addr->addr.nameseq.upper);
   2557	return (addr->addrtype == TIPC_SERVICE_ADDR ||
   2558		addr->addrtype == TIPC_SOCKET_ADDR);
   2559}
   2560
   2561/**
   2562 * tipc_connect - establish a connection to another TIPC port
   2563 * @sock: socket structure
   2564 * @dest: socket address for destination port
   2565 * @destlen: size of socket address data structure
   2566 * @flags: file-related flags associated with socket
   2567 *
   2568 * Return: 0 on success, errno otherwise
   2569 */
   2570static int tipc_connect(struct socket *sock, struct sockaddr *dest,
   2571			int destlen, int flags)
   2572{
   2573	struct sock *sk = sock->sk;
   2574	struct tipc_sock *tsk = tipc_sk(sk);
   2575	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
   2576	struct msghdr m = {NULL,};
   2577	long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
   2578	int previous;
   2579	int res = 0;
   2580
   2581	if (destlen != sizeof(struct sockaddr_tipc))
   2582		return -EINVAL;
   2583
   2584	lock_sock(sk);
   2585
   2586	if (tsk->group) {
   2587		res = -EINVAL;
   2588		goto exit;
   2589	}
   2590
   2591	if (dst->family == AF_UNSPEC) {
   2592		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
   2593		if (!tipc_sk_type_connectionless(sk))
   2594			res = -EINVAL;
   2595		goto exit;
   2596	}
   2597	if (!tipc_sockaddr_is_sane(dst)) {
   2598		res = -EINVAL;
   2599		goto exit;
   2600	}
   2601	/* DGRAM/RDM connect(), just save the destaddr */
   2602	if (tipc_sk_type_connectionless(sk)) {
   2603		memcpy(&tsk->peer, dest, destlen);
   2604		goto exit;
   2605	} else if (dst->addrtype == TIPC_SERVICE_RANGE) {
   2606		res = -EINVAL;
   2607		goto exit;
   2608	}
   2609
   2610	previous = sk->sk_state;
   2611
   2612	switch (sk->sk_state) {
   2613	case TIPC_OPEN:
   2614		/* Send a 'SYN-' to destination */
   2615		m.msg_name = dest;
   2616		m.msg_namelen = destlen;
   2617
   2618		/* If connect is in non-blocking case, set MSG_DONTWAIT to
   2619		 * indicate send_msg() is never blocked.
   2620		 */
   2621		if (!timeout)
   2622			m.msg_flags = MSG_DONTWAIT;
   2623
   2624		res = __tipc_sendmsg(sock, &m, 0);
   2625		if ((res < 0) && (res != -EWOULDBLOCK))
   2626			goto exit;
   2627
   2628		/* Just entered TIPC_CONNECTING state; the only
   2629		 * difference is that return value in non-blocking
   2630		 * case is EINPROGRESS, rather than EALREADY.
   2631		 */
   2632		res = -EINPROGRESS;
   2633		fallthrough;
   2634	case TIPC_CONNECTING:
   2635		if (!timeout) {
   2636			if (previous == TIPC_CONNECTING)
   2637				res = -EALREADY;
   2638			goto exit;
   2639		}
   2640		timeout = msecs_to_jiffies(timeout);
   2641		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
   2642		res = tipc_wait_for_connect(sock, &timeout);
   2643		break;
   2644	case TIPC_ESTABLISHED:
   2645		res = -EISCONN;
   2646		break;
   2647	default:
   2648		res = -EINVAL;
   2649	}
   2650
   2651exit:
   2652	release_sock(sk);
   2653	return res;
   2654}
   2655
   2656/**
   2657 * tipc_listen - allow socket to listen for incoming connections
   2658 * @sock: socket structure
   2659 * @len: (unused)
   2660 *
   2661 * Return: 0 on success, errno otherwise
   2662 */
   2663static int tipc_listen(struct socket *sock, int len)
   2664{
   2665	struct sock *sk = sock->sk;
   2666	int res;
   2667
   2668	lock_sock(sk);
   2669	res = tipc_set_sk_state(sk, TIPC_LISTEN);
   2670	release_sock(sk);
   2671
   2672	return res;
   2673}
   2674
   2675static int tipc_wait_for_accept(struct socket *sock, long timeo)
   2676{
   2677	struct sock *sk = sock->sk;
   2678	DEFINE_WAIT_FUNC(wait, woken_wake_function);
   2679	int err;
   2680
   2681	/* True wake-one mechanism for incoming connections: only
   2682	 * one process gets woken up, not the 'whole herd'.
   2683	 * Since we do not 'race & poll' for established sockets
   2684	 * anymore, the common case will execute the loop only once.
   2685	*/
   2686	for (;;) {
   2687		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
   2688			add_wait_queue(sk_sleep(sk), &wait);
   2689			release_sock(sk);
   2690			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
   2691			lock_sock(sk);
   2692			remove_wait_queue(sk_sleep(sk), &wait);
   2693		}
   2694		err = 0;
   2695		if (!skb_queue_empty(&sk->sk_receive_queue))
   2696			break;
   2697		err = -EAGAIN;
   2698		if (!timeo)
   2699			break;
   2700		err = sock_intr_errno(timeo);
   2701		if (signal_pending(current))
   2702			break;
   2703	}
   2704	return err;
   2705}
   2706
   2707/**
   2708 * tipc_accept - wait for connection request
   2709 * @sock: listening socket
   2710 * @new_sock: new socket that is to be connected
   2711 * @flags: file-related flags associated with socket
   2712 * @kern: caused by kernel or by userspace?
   2713 *
   2714 * Return: 0 on success, errno otherwise
   2715 */
   2716static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
   2717		       bool kern)
   2718{
   2719	struct sock *new_sk, *sk = sock->sk;
   2720	struct tipc_sock *new_tsock;
   2721	struct msghdr m = {NULL,};
   2722	struct tipc_msg *msg;
   2723	struct sk_buff *buf;
   2724	long timeo;
   2725	int res;
   2726
   2727	lock_sock(sk);
   2728
   2729	if (sk->sk_state != TIPC_LISTEN) {
   2730		res = -EINVAL;
   2731		goto exit;
   2732	}
   2733	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
   2734	res = tipc_wait_for_accept(sock, timeo);
   2735	if (res)
   2736		goto exit;
   2737
   2738	buf = skb_peek(&sk->sk_receive_queue);
   2739
   2740	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
   2741	if (res)
   2742		goto exit;
   2743	security_sk_clone(sock->sk, new_sock->sk);
   2744
   2745	new_sk = new_sock->sk;
   2746	new_tsock = tipc_sk(new_sk);
   2747	msg = buf_msg(buf);
   2748
   2749	/* we lock on new_sk; but lockdep sees the lock on sk */
   2750	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
   2751
   2752	/*
   2753	 * Reject any stray messages received by new socket
   2754	 * before the socket lock was taken (very, very unlikely)
   2755	 */
   2756	tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
   2757
   2758	/* Connect new socket to it's peer */
   2759	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
   2760
   2761	tsk_set_importance(new_sk, msg_importance(msg));
   2762	if (msg_named(msg)) {
   2763		new_tsock->conn_addrtype = TIPC_SERVICE_ADDR;
   2764		msg_set_nametype(&new_tsock->phdr, msg_nametype(msg));
   2765		msg_set_nameinst(&new_tsock->phdr, msg_nameinst(msg));
   2766	}
   2767
   2768	/*
   2769	 * Respond to 'SYN-' by discarding it & returning 'ACK'.
   2770	 * Respond to 'SYN+' by queuing it on new socket & returning 'ACK'.
   2771	 */
   2772	if (!msg_data_sz(msg)) {
   2773		tsk_advance_rx_queue(sk);
   2774	} else {
   2775		__skb_dequeue(&sk->sk_receive_queue);
   2776		__skb_queue_head(&new_sk->sk_receive_queue, buf);
   2777		skb_set_owner_r(buf, new_sk);
   2778	}
   2779	__tipc_sendstream(new_sock, &m, 0);
   2780	release_sock(new_sk);
   2781exit:
   2782	release_sock(sk);
   2783	return res;
   2784}
   2785
   2786/**
   2787 * tipc_shutdown - shutdown socket connection
   2788 * @sock: socket structure
   2789 * @how: direction to close (must be SHUT_RDWR)
   2790 *
   2791 * Terminates connection (if necessary), then purges socket's receive queue.
   2792 *
   2793 * Return: 0 on success, errno otherwise
   2794 */
   2795static int tipc_shutdown(struct socket *sock, int how)
   2796{
   2797	struct sock *sk = sock->sk;
   2798	int res;
   2799
   2800	if (how != SHUT_RDWR)
   2801		return -EINVAL;
   2802
   2803	lock_sock(sk);
   2804
   2805	trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
   2806	__tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
   2807	sk->sk_shutdown = SHUTDOWN_MASK;
   2808
   2809	if (sk->sk_state == TIPC_DISCONNECTING) {
   2810		/* Discard any unreceived messages */
   2811		__skb_queue_purge(&sk->sk_receive_queue);
   2812
   2813		res = 0;
   2814	} else {
   2815		res = -ENOTCONN;
   2816	}
   2817	/* Wake up anyone sleeping in poll. */
   2818	sk->sk_state_change(sk);
   2819
   2820	release_sock(sk);
   2821	return res;
   2822}
   2823
   2824static void tipc_sk_check_probing_state(struct sock *sk,
   2825					struct sk_buff_head *list)
   2826{
   2827	struct tipc_sock *tsk = tipc_sk(sk);
   2828	u32 pnode = tsk_peer_node(tsk);
   2829	u32 pport = tsk_peer_port(tsk);
   2830	u32 self = tsk_own_node(tsk);
   2831	u32 oport = tsk->portid;
   2832	struct sk_buff *skb;
   2833
   2834	if (tsk->probe_unacked) {
   2835		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
   2836		sk->sk_err = ECONNABORTED;
   2837		tipc_node_remove_conn(sock_net(sk), pnode, pport);
   2838		sk->sk_state_change(sk);
   2839		return;
   2840	}
   2841	/* Prepare new probe */
   2842	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
   2843			      pnode, self, pport, oport, TIPC_OK);
   2844	if (skb)
   2845		__skb_queue_tail(list, skb);
   2846	tsk->probe_unacked = true;
   2847	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
   2848}
   2849
   2850static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
   2851{
   2852	struct tipc_sock *tsk = tipc_sk(sk);
   2853
   2854	/* Try again later if dest link is congested */
   2855	if (tsk->cong_link_cnt) {
   2856		sk_reset_timer(sk, &sk->sk_timer,
   2857			       jiffies + msecs_to_jiffies(100));
   2858		return;
   2859	}
   2860	/* Prepare SYN for retransmit */
   2861	tipc_msg_skb_clone(&sk->sk_write_queue, list);
   2862}
   2863
   2864static void tipc_sk_timeout(struct timer_list *t)
   2865{
   2866	struct sock *sk = from_timer(sk, t, sk_timer);
   2867	struct tipc_sock *tsk = tipc_sk(sk);
   2868	u32 pnode = tsk_peer_node(tsk);
   2869	struct sk_buff_head list;
   2870	int rc = 0;
   2871
   2872	__skb_queue_head_init(&list);
   2873	bh_lock_sock(sk);
   2874
   2875	/* Try again later if socket is busy */
   2876	if (sock_owned_by_user(sk)) {
   2877		sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
   2878		bh_unlock_sock(sk);
   2879		sock_put(sk);
   2880		return;
   2881	}
   2882
   2883	if (sk->sk_state == TIPC_ESTABLISHED)
   2884		tipc_sk_check_probing_state(sk, &list);
   2885	else if (sk->sk_state == TIPC_CONNECTING)
   2886		tipc_sk_retry_connect(sk, &list);
   2887
   2888	bh_unlock_sock(sk);
   2889
   2890	if (!skb_queue_empty(&list))
   2891		rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
   2892
   2893	/* SYN messages may cause link congestion */
   2894	if (rc == -ELINKCONG) {
   2895		tipc_dest_push(&tsk->cong_links, pnode, 0);
   2896		tsk->cong_link_cnt = 1;
   2897	}
   2898	sock_put(sk);
   2899}
   2900
   2901static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua)
   2902{
   2903	struct sock *sk = &tsk->sk;
   2904	struct net *net = sock_net(sk);
   2905	struct tipc_socket_addr skaddr;
   2906	struct publication *p;
   2907	u32 key;
   2908
   2909	if (tipc_sk_connected(sk))
   2910		return -EINVAL;
   2911	key = tsk->portid + tsk->pub_count + 1;
   2912	if (key == tsk->portid)
   2913		return -EADDRINUSE;
   2914	skaddr.ref = tsk->portid;
   2915	skaddr.node = tipc_own_addr(net);
   2916	p = tipc_nametbl_publish(net, ua, &skaddr, key);
   2917	if (unlikely(!p))
   2918		return -EINVAL;
   2919
   2920	list_add(&p->binding_sock, &tsk->publications);
   2921	tsk->pub_count++;
   2922	tsk->published = true;
   2923	return 0;
   2924}
   2925
   2926static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua)
   2927{
   2928	struct net *net = sock_net(&tsk->sk);
   2929	struct publication *safe, *p;
   2930	struct tipc_uaddr _ua;
   2931	int rc = -EINVAL;
   2932
   2933	list_for_each_entry_safe(p, safe, &tsk->publications, binding_sock) {
   2934		if (!ua) {
   2935			tipc_uaddr(&_ua, TIPC_SERVICE_RANGE, p->scope,
   2936				   p->sr.type, p->sr.lower, p->sr.upper);
   2937			tipc_nametbl_withdraw(net, &_ua, &p->sk, p->key);
   2938			continue;
   2939		}
   2940		/* Unbind specific publication */
   2941		if (p->scope != ua->scope)
   2942			continue;
   2943		if (p->sr.type != ua->sr.type)
   2944			continue;
   2945		if (p->sr.lower != ua->sr.lower)
   2946			continue;
   2947		if (p->sr.upper != ua->sr.upper)
   2948			break;
   2949		tipc_nametbl_withdraw(net, ua, &p->sk, p->key);
   2950		rc = 0;
   2951		break;
   2952	}
   2953	if (list_empty(&tsk->publications)) {
   2954		tsk->published = 0;
   2955		rc = 0;
   2956	}
   2957	return rc;
   2958}
   2959
   2960/* tipc_sk_reinit: set non-zero address in all existing sockets
   2961 *                 when we go from standalone to network mode.
   2962 */
   2963void tipc_sk_reinit(struct net *net)
   2964{
   2965	struct tipc_net *tn = net_generic(net, tipc_net_id);
   2966	struct rhashtable_iter iter;
   2967	struct tipc_sock *tsk;
   2968	struct tipc_msg *msg;
   2969
   2970	rhashtable_walk_enter(&tn->sk_rht, &iter);
   2971
   2972	do {
   2973		rhashtable_walk_start(&iter);
   2974
   2975		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
   2976			sock_hold(&tsk->sk);
   2977			rhashtable_walk_stop(&iter);
   2978			lock_sock(&tsk->sk);
   2979			msg = &tsk->phdr;
   2980			msg_set_prevnode(msg, tipc_own_addr(net));
   2981			msg_set_orignode(msg, tipc_own_addr(net));
   2982			release_sock(&tsk->sk);
   2983			rhashtable_walk_start(&iter);
   2984			sock_put(&tsk->sk);
   2985		}
   2986
   2987		rhashtable_walk_stop(&iter);
   2988	} while (tsk == ERR_PTR(-EAGAIN));
   2989
   2990	rhashtable_walk_exit(&iter);
   2991}
   2992
   2993static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
   2994{
   2995	struct tipc_net *tn = net_generic(net, tipc_net_id);
   2996	struct tipc_sock *tsk;
   2997
   2998	rcu_read_lock();
   2999	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
   3000	if (tsk)
   3001		sock_hold(&tsk->sk);
   3002	rcu_read_unlock();
   3003
   3004	return tsk;
   3005}
   3006
   3007static int tipc_sk_insert(struct tipc_sock *tsk)
   3008{
   3009	struct sock *sk = &tsk->sk;
   3010	struct net *net = sock_net(sk);
   3011	struct tipc_net *tn = net_generic(net, tipc_net_id);
   3012	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
   3013	u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
   3014
   3015	while (remaining--) {
   3016		portid++;
   3017		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
   3018			portid = TIPC_MIN_PORT;
   3019		tsk->portid = portid;
   3020		sock_hold(&tsk->sk);
   3021		if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
   3022						   tsk_rht_params))
   3023			return 0;
   3024		sock_put(&tsk->sk);
   3025	}
   3026
   3027	return -1;
   3028}
   3029
   3030static void tipc_sk_remove(struct tipc_sock *tsk)
   3031{
   3032	struct sock *sk = &tsk->sk;
   3033	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
   3034
   3035	if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
   3036		WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
   3037		__sock_put(sk);
   3038	}
   3039}
   3040
   3041static const struct rhashtable_params tsk_rht_params = {
   3042	.nelem_hint = 192,
   3043	.head_offset = offsetof(struct tipc_sock, node),
   3044	.key_offset = offsetof(struct tipc_sock, portid),
   3045	.key_len = sizeof(u32), /* portid */
   3046	.max_size = 1048576,
   3047	.min_size = 256,
   3048	.automatic_shrinking = true,
   3049};
   3050
   3051int tipc_sk_rht_init(struct net *net)
   3052{
   3053	struct tipc_net *tn = net_generic(net, tipc_net_id);
   3054
   3055	return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
   3056}
   3057
   3058void tipc_sk_rht_destroy(struct net *net)
   3059{
   3060	struct tipc_net *tn = net_generic(net, tipc_net_id);
   3061
   3062	/* Wait for socket readers to complete */
   3063	synchronize_net();
   3064
   3065	rhashtable_destroy(&tn->sk_rht);
   3066}
   3067
   3068static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
   3069{
   3070	struct net *net = sock_net(&tsk->sk);
   3071	struct tipc_group *grp = tsk->group;
   3072	struct tipc_msg *hdr = &tsk->phdr;
   3073	struct tipc_uaddr ua;
   3074	int rc;
   3075
   3076	if (mreq->type < TIPC_RESERVED_TYPES)
   3077		return -EACCES;
   3078	if (mreq->scope > TIPC_NODE_SCOPE)
   3079		return -EINVAL;
   3080	if (mreq->scope != TIPC_NODE_SCOPE)
   3081		mreq->scope = TIPC_CLUSTER_SCOPE;
   3082	if (grp)
   3083		return -EACCES;
   3084	grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
   3085	if (!grp)
   3086		return -ENOMEM;
   3087	tsk->group = grp;
   3088	msg_set_lookup_scope(hdr, mreq->scope);
   3089	msg_set_nametype(hdr, mreq->type);
   3090	msg_set_dest_droppable(hdr, true);
   3091	tipc_uaddr(&ua, TIPC_SERVICE_RANGE, mreq->scope,
   3092		   mreq->type, mreq->instance, mreq->instance);
   3093	tipc_nametbl_build_group(net, grp, &ua);
   3094	rc = tipc_sk_publish(tsk, &ua);
   3095	if (rc) {
   3096		tipc_group_delete(net, grp);
   3097		tsk->group = NULL;
   3098		return rc;
   3099	}
   3100	/* Eliminate any risk that a broadcast overtakes sent JOINs */
   3101	tsk->mc_method.rcast = true;
   3102	tsk->mc_method.mandatory = true;
   3103	tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
   3104	return rc;
   3105}
   3106
   3107static int tipc_sk_leave(struct tipc_sock *tsk)
   3108{
   3109	struct net *net = sock_net(&tsk->sk);
   3110	struct tipc_group *grp = tsk->group;
   3111	struct tipc_uaddr ua;
   3112	int scope;
   3113
   3114	if (!grp)
   3115		return -EINVAL;
   3116	ua.addrtype = TIPC_SERVICE_RANGE;
   3117	tipc_group_self(grp, &ua.sr, &scope);
   3118	ua.scope = scope;
   3119	tipc_group_delete(net, grp);
   3120	tsk->group = NULL;
   3121	tipc_sk_withdraw(tsk, &ua);
   3122	return 0;
   3123}
   3124
   3125/**
   3126 * tipc_setsockopt - set socket option
   3127 * @sock: socket structure
   3128 * @lvl: option level
   3129 * @opt: option identifier
   3130 * @ov: pointer to new option value
   3131 * @ol: length of option value
   3132 *
   3133 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
   3134 * (to ease compatibility).
   3135 *
   3136 * Return: 0 on success, errno otherwise
   3137 */
   3138static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
   3139			   sockptr_t ov, unsigned int ol)
   3140{
   3141	struct sock *sk = sock->sk;
   3142	struct tipc_sock *tsk = tipc_sk(sk);
   3143	struct tipc_group_req mreq;
   3144	u32 value = 0;
   3145	int res = 0;
   3146
   3147	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
   3148		return 0;
   3149	if (lvl != SOL_TIPC)
   3150		return -ENOPROTOOPT;
   3151
   3152	switch (opt) {
   3153	case TIPC_IMPORTANCE:
   3154	case TIPC_SRC_DROPPABLE:
   3155	case TIPC_DEST_DROPPABLE:
   3156	case TIPC_CONN_TIMEOUT:
   3157	case TIPC_NODELAY:
   3158		if (ol < sizeof(value))
   3159			return -EINVAL;
   3160		if (copy_from_sockptr(&value, ov, sizeof(u32)))
   3161			return -EFAULT;
   3162		break;
   3163	case TIPC_GROUP_JOIN:
   3164		if (ol < sizeof(mreq))
   3165			return -EINVAL;
   3166		if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
   3167			return -EFAULT;
   3168		break;
   3169	default:
   3170		if (!sockptr_is_null(ov) || ol)
   3171			return -EINVAL;
   3172	}
   3173
   3174	lock_sock(sk);
   3175
   3176	switch (opt) {
   3177	case TIPC_IMPORTANCE:
   3178		res = tsk_set_importance(sk, value);
   3179		break;
   3180	case TIPC_SRC_DROPPABLE:
   3181		if (sock->type != SOCK_STREAM)
   3182			tsk_set_unreliable(tsk, value);
   3183		else
   3184			res = -ENOPROTOOPT;
   3185		break;
   3186	case TIPC_DEST_DROPPABLE:
   3187		tsk_set_unreturnable(tsk, value);
   3188		break;
   3189	case TIPC_CONN_TIMEOUT:
   3190		tipc_sk(sk)->conn_timeout = value;
   3191		break;
   3192	case TIPC_MCAST_BROADCAST:
   3193		tsk->mc_method.rcast = false;
   3194		tsk->mc_method.mandatory = true;
   3195		break;
   3196	case TIPC_MCAST_REPLICAST:
   3197		tsk->mc_method.rcast = true;
   3198		tsk->mc_method.mandatory = true;
   3199		break;
   3200	case TIPC_GROUP_JOIN:
   3201		res = tipc_sk_join(tsk, &mreq);
   3202		break;
   3203	case TIPC_GROUP_LEAVE:
   3204		res = tipc_sk_leave(tsk);
   3205		break;
   3206	case TIPC_NODELAY:
   3207		tsk->nodelay = !!value;
   3208		tsk_set_nagle(tsk);
   3209		break;
   3210	default:
   3211		res = -EINVAL;
   3212	}
   3213
   3214	release_sock(sk);
   3215
   3216	return res;
   3217}
   3218
   3219/**
   3220 * tipc_getsockopt - get socket option
   3221 * @sock: socket structure
   3222 * @lvl: option level
   3223 * @opt: option identifier
   3224 * @ov: receptacle for option value
   3225 * @ol: receptacle for length of option value
   3226 *
   3227 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
   3228 * (to ease compatibility).
   3229 *
   3230 * Return: 0 on success, errno otherwise
   3231 */
   3232static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
   3233			   char __user *ov, int __user *ol)
   3234{
   3235	struct sock *sk = sock->sk;
   3236	struct tipc_sock *tsk = tipc_sk(sk);
   3237	struct tipc_service_range seq;
   3238	int len, scope;
   3239	u32 value;
   3240	int res;
   3241
   3242	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
   3243		return put_user(0, ol);
   3244	if (lvl != SOL_TIPC)
   3245		return -ENOPROTOOPT;
   3246	res = get_user(len, ol);
   3247	if (res)
   3248		return res;
   3249
   3250	lock_sock(sk);
   3251
   3252	switch (opt) {
   3253	case TIPC_IMPORTANCE:
   3254		value = tsk_importance(tsk);
   3255		break;
   3256	case TIPC_SRC_DROPPABLE:
   3257		value = tsk_unreliable(tsk);
   3258		break;
   3259	case TIPC_DEST_DROPPABLE:
   3260		value = tsk_unreturnable(tsk);
   3261		break;
   3262	case TIPC_CONN_TIMEOUT:
   3263		value = tsk->conn_timeout;
   3264		/* no need to set "res", since already 0 at this point */
   3265		break;
   3266	case TIPC_NODE_RECVQ_DEPTH:
   3267		value = 0; /* was tipc_queue_size, now obsolete */
   3268		break;
   3269	case TIPC_SOCK_RECVQ_DEPTH:
   3270		value = skb_queue_len(&sk->sk_receive_queue);
   3271		break;
   3272	case TIPC_SOCK_RECVQ_USED:
   3273		value = sk_rmem_alloc_get(sk);
   3274		break;
   3275	case TIPC_GROUP_JOIN:
   3276		seq.type = 0;
   3277		if (tsk->group)
   3278			tipc_group_self(tsk->group, &seq, &scope);
   3279		value = seq.type;
   3280		break;
   3281	default:
   3282		res = -EINVAL;
   3283	}
   3284
   3285	release_sock(sk);
   3286
   3287	if (res)
   3288		return res;	/* "get" failed */
   3289
   3290	if (len < sizeof(value))
   3291		return -EINVAL;
   3292
   3293	if (copy_to_user(ov, &value, sizeof(value)))
   3294		return -EFAULT;
   3295
   3296	return put_user(sizeof(value), ol);
   3297}
   3298
   3299static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
   3300{
   3301	struct net *net = sock_net(sock->sk);
   3302	struct tipc_sioc_nodeid_req nr = {0};
   3303	struct tipc_sioc_ln_req lnr;
   3304	void __user *argp = (void __user *)arg;
   3305
   3306	switch (cmd) {
   3307	case SIOCGETLINKNAME:
   3308		if (copy_from_user(&lnr, argp, sizeof(lnr)))
   3309			return -EFAULT;
   3310		if (!tipc_node_get_linkname(net,
   3311					    lnr.bearer_id & 0xffff, lnr.peer,
   3312					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
   3313			if (copy_to_user(argp, &lnr, sizeof(lnr)))
   3314				return -EFAULT;
   3315			return 0;
   3316		}
   3317		return -EADDRNOTAVAIL;
   3318	case SIOCGETNODEID:
   3319		if (copy_from_user(&nr, argp, sizeof(nr)))
   3320			return -EFAULT;
   3321		if (!tipc_node_get_id(net, nr.peer, nr.node_id))
   3322			return -EADDRNOTAVAIL;
   3323		if (copy_to_user(argp, &nr, sizeof(nr)))
   3324			return -EFAULT;
   3325		return 0;
   3326	default:
   3327		return -ENOIOCTLCMD;
   3328	}
   3329}
   3330
   3331static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
   3332{
   3333	struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
   3334	struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
   3335	u32 onode = tipc_own_addr(sock_net(sock1->sk));
   3336
   3337	tsk1->peer.family = AF_TIPC;
   3338	tsk1->peer.addrtype = TIPC_SOCKET_ADDR;
   3339	tsk1->peer.scope = TIPC_NODE_SCOPE;
   3340	tsk1->peer.addr.id.ref = tsk2->portid;
   3341	tsk1->peer.addr.id.node = onode;
   3342	tsk2->peer.family = AF_TIPC;
   3343	tsk2->peer.addrtype = TIPC_SOCKET_ADDR;
   3344	tsk2->peer.scope = TIPC_NODE_SCOPE;
   3345	tsk2->peer.addr.id.ref = tsk1->portid;
   3346	tsk2->peer.addr.id.node = onode;
   3347
   3348	tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
   3349	tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
   3350	return 0;
   3351}
   3352
   3353/* Protocol switches for the various types of TIPC sockets */
   3354
   3355static const struct proto_ops msg_ops = {
   3356	.owner		= THIS_MODULE,
   3357	.family		= AF_TIPC,
   3358	.release	= tipc_release,
   3359	.bind		= tipc_bind,
   3360	.connect	= tipc_connect,
   3361	.socketpair	= tipc_socketpair,
   3362	.accept		= sock_no_accept,
   3363	.getname	= tipc_getname,
   3364	.poll		= tipc_poll,
   3365	.ioctl		= tipc_ioctl,
   3366	.listen		= sock_no_listen,
   3367	.shutdown	= tipc_shutdown,
   3368	.setsockopt	= tipc_setsockopt,
   3369	.getsockopt	= tipc_getsockopt,
   3370	.sendmsg	= tipc_sendmsg,
   3371	.recvmsg	= tipc_recvmsg,
   3372	.mmap		= sock_no_mmap,
   3373	.sendpage	= sock_no_sendpage
   3374};
   3375
   3376static const struct proto_ops packet_ops = {
   3377	.owner		= THIS_MODULE,
   3378	.family		= AF_TIPC,
   3379	.release	= tipc_release,
   3380	.bind		= tipc_bind,
   3381	.connect	= tipc_connect,
   3382	.socketpair	= tipc_socketpair,
   3383	.accept		= tipc_accept,
   3384	.getname	= tipc_getname,
   3385	.poll		= tipc_poll,
   3386	.ioctl		= tipc_ioctl,
   3387	.listen		= tipc_listen,
   3388	.shutdown	= tipc_shutdown,
   3389	.setsockopt	= tipc_setsockopt,
   3390	.getsockopt	= tipc_getsockopt,
   3391	.sendmsg	= tipc_send_packet,
   3392	.recvmsg	= tipc_recvmsg,
   3393	.mmap		= sock_no_mmap,
   3394	.sendpage	= sock_no_sendpage
   3395};
   3396
   3397static const struct proto_ops stream_ops = {
   3398	.owner		= THIS_MODULE,
   3399	.family		= AF_TIPC,
   3400	.release	= tipc_release,
   3401	.bind		= tipc_bind,
   3402	.connect	= tipc_connect,
   3403	.socketpair	= tipc_socketpair,
   3404	.accept		= tipc_accept,
   3405	.getname	= tipc_getname,
   3406	.poll		= tipc_poll,
   3407	.ioctl		= tipc_ioctl,
   3408	.listen		= tipc_listen,
   3409	.shutdown	= tipc_shutdown,
   3410	.setsockopt	= tipc_setsockopt,
   3411	.getsockopt	= tipc_getsockopt,
   3412	.sendmsg	= tipc_sendstream,
   3413	.recvmsg	= tipc_recvstream,
   3414	.mmap		= sock_no_mmap,
   3415	.sendpage	= sock_no_sendpage
   3416};
   3417
   3418static const struct net_proto_family tipc_family_ops = {
   3419	.owner		= THIS_MODULE,
   3420	.family		= AF_TIPC,
   3421	.create		= tipc_sk_create
   3422};
   3423
   3424static struct proto tipc_proto = {
   3425	.name		= "TIPC",
   3426	.owner		= THIS_MODULE,
   3427	.obj_size	= sizeof(struct tipc_sock),
   3428	.sysctl_rmem	= sysctl_tipc_rmem
   3429};
   3430
   3431/**
   3432 * tipc_socket_init - initialize TIPC socket interface
   3433 *
   3434 * Return: 0 on success, errno otherwise
   3435 */
   3436int tipc_socket_init(void)
   3437{
   3438	int res;
   3439
   3440	res = proto_register(&tipc_proto, 1);
   3441	if (res) {
   3442		pr_err("Failed to register TIPC protocol type\n");
   3443		goto out;
   3444	}
   3445
   3446	res = sock_register(&tipc_family_ops);
   3447	if (res) {
   3448		pr_err("Failed to register TIPC socket type\n");
   3449		proto_unregister(&tipc_proto);
   3450		goto out;
   3451	}
   3452 out:
   3453	return res;
   3454}
   3455
   3456/**
   3457 * tipc_socket_stop - stop TIPC socket interface
   3458 */
   3459void tipc_socket_stop(void)
   3460{
   3461	sock_unregister(tipc_family_ops.family);
   3462	proto_unregister(&tipc_proto);
   3463}
   3464
   3465/* Caller should hold socket lock for the passed tipc socket. */
   3466static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
   3467{
   3468	u32 peer_node, peer_port;
   3469	u32 conn_type, conn_instance;
   3470	struct nlattr *nest;
   3471
   3472	peer_node = tsk_peer_node(tsk);
   3473	peer_port = tsk_peer_port(tsk);
   3474	conn_type = msg_nametype(&tsk->phdr);
   3475	conn_instance = msg_nameinst(&tsk->phdr);
   3476	nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
   3477	if (!nest)
   3478		return -EMSGSIZE;
   3479
   3480	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
   3481		goto msg_full;
   3482	if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
   3483		goto msg_full;
   3484
   3485	if (tsk->conn_addrtype != 0) {
   3486		if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
   3487			goto msg_full;
   3488		if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, conn_type))
   3489			goto msg_full;
   3490		if (nla_put_u32(skb, TIPC_NLA_CON_INST, conn_instance))
   3491			goto msg_full;
   3492	}
   3493	nla_nest_end(skb, nest);
   3494
   3495	return 0;
   3496
   3497msg_full:
   3498	nla_nest_cancel(skb, nest);
   3499
   3500	return -EMSGSIZE;
   3501}
   3502
   3503static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
   3504			  *tsk)
   3505{
   3506	struct net *net = sock_net(skb->sk);
   3507	struct sock *sk = &tsk->sk;
   3508
   3509	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
   3510	    nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
   3511		return -EMSGSIZE;
   3512
   3513	if (tipc_sk_connected(sk)) {
   3514		if (__tipc_nl_add_sk_con(skb, tsk))
   3515			return -EMSGSIZE;
   3516	} else if (!list_empty(&tsk->publications)) {
   3517		if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
   3518			return -EMSGSIZE;
   3519	}
   3520	return 0;
   3521}
   3522
   3523/* Caller should hold socket lock for the passed tipc socket. */
   3524static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
   3525			    struct tipc_sock *tsk)
   3526{
   3527	struct nlattr *attrs;
   3528	void *hdr;
   3529
   3530	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
   3531			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
   3532	if (!hdr)
   3533		goto msg_cancel;
   3534
   3535	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
   3536	if (!attrs)
   3537		goto genlmsg_cancel;
   3538
   3539	if (__tipc_nl_add_sk_info(skb, tsk))
   3540		goto attr_msg_cancel;
   3541
   3542	nla_nest_end(skb, attrs);
   3543	genlmsg_end(skb, hdr);
   3544
   3545	return 0;
   3546
   3547attr_msg_cancel:
   3548	nla_nest_cancel(skb, attrs);
   3549genlmsg_cancel:
   3550	genlmsg_cancel(skb, hdr);
   3551msg_cancel:
   3552	return -EMSGSIZE;
   3553}
   3554
   3555int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
   3556		    int (*skb_handler)(struct sk_buff *skb,
   3557				       struct netlink_callback *cb,
   3558				       struct tipc_sock *tsk))
   3559{
   3560	struct rhashtable_iter *iter = (void *)cb->args[4];
   3561	struct tipc_sock *tsk;
   3562	int err;
   3563
   3564	rhashtable_walk_start(iter);
   3565	while ((tsk = rhashtable_walk_next(iter)) != NULL) {
   3566		if (IS_ERR(tsk)) {
   3567			err = PTR_ERR(tsk);
   3568			if (err == -EAGAIN) {
   3569				err = 0;
   3570				continue;
   3571			}
   3572			break;
   3573		}
   3574
   3575		sock_hold(&tsk->sk);
   3576		rhashtable_walk_stop(iter);
   3577		lock_sock(&tsk->sk);
   3578		err = skb_handler(skb, cb, tsk);
   3579		if (err) {
   3580			release_sock(&tsk->sk);
   3581			sock_put(&tsk->sk);
   3582			goto out;
   3583		}
   3584		release_sock(&tsk->sk);
   3585		rhashtable_walk_start(iter);
   3586		sock_put(&tsk->sk);
   3587	}
   3588	rhashtable_walk_stop(iter);
   3589out:
   3590	return skb->len;
   3591}
   3592EXPORT_SYMBOL(tipc_nl_sk_walk);
   3593
   3594int tipc_dump_start(struct netlink_callback *cb)
   3595{
   3596	return __tipc_dump_start(cb, sock_net(cb->skb->sk));
   3597}
   3598EXPORT_SYMBOL(tipc_dump_start);
   3599
   3600int __tipc_dump_start(struct netlink_callback *cb, struct net *net)
   3601{
   3602	/* tipc_nl_name_table_dump() uses cb->args[0...3]. */
   3603	struct rhashtable_iter *iter = (void *)cb->args[4];
   3604	struct tipc_net *tn = tipc_net(net);
   3605
   3606	if (!iter) {
   3607		iter = kmalloc(sizeof(*iter), GFP_KERNEL);
   3608		if (!iter)
   3609			return -ENOMEM;
   3610
   3611		cb->args[4] = (long)iter;
   3612	}
   3613
   3614	rhashtable_walk_enter(&tn->sk_rht, iter);
   3615	return 0;
   3616}
   3617
   3618int tipc_dump_done(struct netlink_callback *cb)
   3619{
   3620	struct rhashtable_iter *hti = (void *)cb->args[4];
   3621
   3622	rhashtable_walk_exit(hti);
   3623	kfree(hti);
   3624	return 0;
   3625}
   3626EXPORT_SYMBOL(tipc_dump_done);
   3627
   3628int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
   3629			   struct tipc_sock *tsk, u32 sk_filter_state,
   3630			   u64 (*tipc_diag_gen_cookie)(struct sock *sk))
   3631{
   3632	struct sock *sk = &tsk->sk;
   3633	struct nlattr *attrs;
   3634	struct nlattr *stat;
   3635
   3636	/*filter response w.r.t sk_state*/
   3637	if (!(sk_filter_state & (1 << sk->sk_state)))
   3638		return 0;
   3639
   3640	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
   3641	if (!attrs)
   3642		goto msg_cancel;
   3643
   3644	if (__tipc_nl_add_sk_info(skb, tsk))
   3645		goto attr_msg_cancel;
   3646
   3647	if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
   3648	    nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
   3649	    nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
   3650	    nla_put_u32(skb, TIPC_NLA_SOCK_UID,
   3651			from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
   3652					 sock_i_uid(sk))) ||
   3653	    nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
   3654			      tipc_diag_gen_cookie(sk),
   3655			      TIPC_NLA_SOCK_PAD))
   3656		goto attr_msg_cancel;
   3657
   3658	stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
   3659	if (!stat)
   3660		goto attr_msg_cancel;
   3661
   3662	if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
   3663			skb_queue_len(&sk->sk_receive_queue)) ||
   3664	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
   3665			skb_queue_len(&sk->sk_write_queue)) ||
   3666	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
   3667			atomic_read(&sk->sk_drops)))
   3668		goto stat_msg_cancel;
   3669
   3670	if (tsk->cong_link_cnt &&
   3671	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
   3672		goto stat_msg_cancel;
   3673
   3674	if (tsk_conn_cong(tsk) &&
   3675	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
   3676		goto stat_msg_cancel;
   3677
   3678	nla_nest_end(skb, stat);
   3679
   3680	if (tsk->group)
   3681		if (tipc_group_fill_sock_diag(tsk->group, skb))
   3682			goto stat_msg_cancel;
   3683
   3684	nla_nest_end(skb, attrs);
   3685
   3686	return 0;
   3687
   3688stat_msg_cancel:
   3689	nla_nest_cancel(skb, stat);
   3690attr_msg_cancel:
   3691	nla_nest_cancel(skb, attrs);
   3692msg_cancel:
   3693	return -EMSGSIZE;
   3694}
   3695EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
   3696
   3697int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
   3698{
   3699	return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
   3700}
   3701
   3702/* Caller should hold socket lock for the passed tipc socket. */
   3703static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
   3704				 struct netlink_callback *cb,
   3705				 struct publication *publ)
   3706{
   3707	void *hdr;
   3708	struct nlattr *attrs;
   3709
   3710	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
   3711			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
   3712	if (!hdr)
   3713		goto msg_cancel;
   3714
   3715	attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
   3716	if (!attrs)
   3717		goto genlmsg_cancel;
   3718
   3719	if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
   3720		goto attr_msg_cancel;
   3721	if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->sr.type))
   3722		goto attr_msg_cancel;
   3723	if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->sr.lower))
   3724		goto attr_msg_cancel;
   3725	if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->sr.upper))
   3726		goto attr_msg_cancel;
   3727
   3728	nla_nest_end(skb, attrs);
   3729	genlmsg_end(skb, hdr);
   3730
   3731	return 0;
   3732
   3733attr_msg_cancel:
   3734	nla_nest_cancel(skb, attrs);
   3735genlmsg_cancel:
   3736	genlmsg_cancel(skb, hdr);
   3737msg_cancel:
   3738	return -EMSGSIZE;
   3739}
   3740
   3741/* Caller should hold socket lock for the passed tipc socket. */
   3742static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
   3743				  struct netlink_callback *cb,
   3744				  struct tipc_sock *tsk, u32 *last_publ)
   3745{
   3746	int err;
   3747	struct publication *p;
   3748
   3749	if (*last_publ) {
   3750		list_for_each_entry(p, &tsk->publications, binding_sock) {
   3751			if (p->key == *last_publ)
   3752				break;
   3753		}
   3754		if (list_entry_is_head(p, &tsk->publications, binding_sock)) {
   3755			/* We never set seq or call nl_dump_check_consistent()
   3756			 * this means that setting prev_seq here will cause the
   3757			 * consistence check to fail in the netlink callback
   3758			 * handler. Resulting in the last NLMSG_DONE message
   3759			 * having the NLM_F_DUMP_INTR flag set.
   3760			 */
   3761			cb->prev_seq = 1;
   3762			*last_publ = 0;
   3763			return -EPIPE;
   3764		}
   3765	} else {
   3766		p = list_first_entry(&tsk->publications, struct publication,
   3767				     binding_sock);
   3768	}
   3769
   3770	list_for_each_entry_from(p, &tsk->publications, binding_sock) {
   3771		err = __tipc_nl_add_sk_publ(skb, cb, p);
   3772		if (err) {
   3773			*last_publ = p->key;
   3774			return err;
   3775		}
   3776	}
   3777	*last_publ = 0;
   3778
   3779	return 0;
   3780}
   3781
   3782int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
   3783{
   3784	int err;
   3785	u32 tsk_portid = cb->args[0];
   3786	u32 last_publ = cb->args[1];
   3787	u32 done = cb->args[2];
   3788	struct net *net = sock_net(skb->sk);
   3789	struct tipc_sock *tsk;
   3790
   3791	if (!tsk_portid) {
   3792		struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
   3793		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
   3794
   3795		if (!attrs[TIPC_NLA_SOCK])
   3796			return -EINVAL;
   3797
   3798		err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
   3799						  attrs[TIPC_NLA_SOCK],
   3800						  tipc_nl_sock_policy, NULL);
   3801		if (err)
   3802			return err;
   3803
   3804		if (!sock[TIPC_NLA_SOCK_REF])
   3805			return -EINVAL;
   3806
   3807		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
   3808	}
   3809
   3810	if (done)
   3811		return 0;
   3812
   3813	tsk = tipc_sk_lookup(net, tsk_portid);
   3814	if (!tsk)
   3815		return -EINVAL;
   3816
   3817	lock_sock(&tsk->sk);
   3818	err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
   3819	if (!err)
   3820		done = 1;
   3821	release_sock(&tsk->sk);
   3822	sock_put(&tsk->sk);
   3823
   3824	cb->args[0] = tsk_portid;
   3825	cb->args[1] = last_publ;
   3826	cb->args[2] = done;
   3827
   3828	return skb->len;
   3829}
   3830
   3831/**
   3832 * tipc_sk_filtering - check if a socket should be traced
   3833 * @sk: the socket to be examined
   3834 *
   3835 * @sysctl_tipc_sk_filter is used as the socket tuple for filtering:
   3836 * (portid, sock type, name type, name lower, name upper)
   3837 *
   3838 * Return: true if the socket meets the socket tuple data
   3839 * (value 0 = 'any') or when there is no tuple set (all = 0),
   3840 * otherwise false
   3841 */
   3842bool tipc_sk_filtering(struct sock *sk)
   3843{
   3844	struct tipc_sock *tsk;
   3845	struct publication *p;
   3846	u32 _port, _sktype, _type, _lower, _upper;
   3847	u32 type = 0, lower = 0, upper = 0;
   3848
   3849	if (!sk)
   3850		return true;
   3851
   3852	tsk = tipc_sk(sk);
   3853
   3854	_port = sysctl_tipc_sk_filter[0];
   3855	_sktype = sysctl_tipc_sk_filter[1];
   3856	_type = sysctl_tipc_sk_filter[2];
   3857	_lower = sysctl_tipc_sk_filter[3];
   3858	_upper = sysctl_tipc_sk_filter[4];
   3859
   3860	if (!_port && !_sktype && !_type && !_lower && !_upper)
   3861		return true;
   3862
   3863	if (_port)
   3864		return (_port == tsk->portid);
   3865
   3866	if (_sktype && _sktype != sk->sk_type)
   3867		return false;
   3868
   3869	if (tsk->published) {
   3870		p = list_first_entry_or_null(&tsk->publications,
   3871					     struct publication, binding_sock);
   3872		if (p) {
   3873			type = p->sr.type;
   3874			lower = p->sr.lower;
   3875			upper = p->sr.upper;
   3876		}
   3877	}
   3878
   3879	if (!tipc_sk_type_connectionless(sk)) {
   3880		type = msg_nametype(&tsk->phdr);
   3881		lower = msg_nameinst(&tsk->phdr);
   3882		upper = lower;
   3883	}
   3884
   3885	if ((_type && _type != type) || (_lower && _lower != lower) ||
   3886	    (_upper && _upper != upper))
   3887		return false;
   3888
   3889	return true;
   3890}
   3891
   3892u32 tipc_sock_get_portid(struct sock *sk)
   3893{
   3894	return (sk) ? (tipc_sk(sk))->portid : 0;
   3895}
   3896
   3897/**
   3898 * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
   3899 *			both the rcv and backlog queues are considered
   3900 * @sk: tipc sk to be checked
   3901 * @skb: tipc msg to be checked
   3902 *
   3903 * Return: true if the socket rx queue allocation is > 90%, otherwise false
   3904 */
   3905
   3906bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
   3907{
   3908	atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
   3909	unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
   3910	unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
   3911
   3912	return (qsize > lim * 90 / 100);
   3913}
   3914
   3915/**
   3916 * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
   3917 *			only the rcv queue is considered
   3918 * @sk: tipc sk to be checked
   3919 * @skb: tipc msg to be checked
   3920 *
   3921 * Return: true if the socket rx queue allocation is > 90%, otherwise false
   3922 */
   3923
   3924bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
   3925{
   3926	unsigned int lim = rcvbuf_limit(sk, skb);
   3927	unsigned int qsize = sk_rmem_alloc_get(sk);
   3928
   3929	return (qsize > lim * 90 / 100);
   3930}
   3931
   3932/**
   3933 * tipc_sk_dump - dump TIPC socket
   3934 * @sk: tipc sk to be dumped
   3935 * @dqueues: bitmask to decide if any socket queue to be dumped?
   3936 *           - TIPC_DUMP_NONE: don't dump socket queues
   3937 *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
   3938 *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
   3939 *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
   3940 *           - TIPC_DUMP_ALL: dump all the socket queues above
   3941 * @buf: returned buffer of dump data in format
   3942 */
   3943int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
   3944{
   3945	int i = 0;
   3946	size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
   3947	u32 conn_type, conn_instance;
   3948	struct tipc_sock *tsk;
   3949	struct publication *p;
   3950	bool tsk_connected;
   3951
   3952	if (!sk) {
   3953		i += scnprintf(buf, sz, "sk data: (null)\n");
   3954		return i;
   3955	}
   3956
   3957	tsk = tipc_sk(sk);
   3958	tsk_connected = !tipc_sk_type_connectionless(sk);
   3959
   3960	i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
   3961	i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
   3962	i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
   3963	i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
   3964	i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
   3965	if (tsk_connected) {
   3966		i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
   3967		i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
   3968		conn_type = msg_nametype(&tsk->phdr);
   3969		conn_instance = msg_nameinst(&tsk->phdr);
   3970		i += scnprintf(buf + i, sz - i, " %u", conn_type);
   3971		i += scnprintf(buf + i, sz - i, " %u", conn_instance);
   3972	}
   3973	i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
   3974	if (tsk->published) {
   3975		p = list_first_entry_or_null(&tsk->publications,
   3976					     struct publication, binding_sock);
   3977		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.type : 0);
   3978		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.lower : 0);
   3979		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.upper : 0);
   3980	}
   3981	i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
   3982	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
   3983	i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
   3984	i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
   3985	i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
   3986	i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
   3987	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
   3988	i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
   3989	i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
   3990	i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
   3991	i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
   3992	i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
   3993	i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
   3994	i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
   3995
   3996	if (dqueues & TIPC_DUMP_SK_SNDQ) {
   3997		i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
   3998		i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
   3999	}
   4000
   4001	if (dqueues & TIPC_DUMP_SK_RCVQ) {
   4002		i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
   4003		i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
   4004	}
   4005
   4006	if (dqueues & TIPC_DUMP_SK_BKLGQ) {
   4007		i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
   4008		i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
   4009		if (sk->sk_backlog.tail != sk->sk_backlog.head) {
   4010			i += scnprintf(buf + i, sz - i, "  tail ");
   4011			i += tipc_skb_dump(sk->sk_backlog.tail, false,
   4012					   buf + i);
   4013		}
   4014	}
   4015
   4016	return i;
   4017}