cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

bcast.c (22577B)


      1/*
      2 * net/tipc/bcast.c: TIPC broadcast code
      3 *
      4 * Copyright (c) 2004-2006, 2014-2017, Ericsson AB
      5 * Copyright (c) 2004, Intel Corporation.
      6 * Copyright (c) 2005, 2010-2011, Wind River Systems
      7 * All rights reserved.
      8 *
      9 * Redistribution and use in source and binary forms, with or without
     10 * modification, are permitted provided that the following conditions are met:
     11 *
     12 * 1. Redistributions of source code must retain the above copyright
     13 *    notice, this list of conditions and the following disclaimer.
     14 * 2. Redistributions in binary form must reproduce the above copyright
     15 *    notice, this list of conditions and the following disclaimer in the
     16 *    documentation and/or other materials provided with the distribution.
     17 * 3. Neither the names of the copyright holders nor the names of its
     18 *    contributors may be used to endorse or promote products derived from
     19 *    this software without specific prior written permission.
     20 *
     21 * Alternatively, this software may be distributed under the terms of the
     22 * GNU General Public License ("GPL") version 2 as published by the Free
     23 * Software Foundation.
     24 *
     25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
     29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     35 * POSSIBILITY OF SUCH DAMAGE.
     36 */
     37
     38#include <linux/tipc_config.h>
     39#include "socket.h"
     40#include "msg.h"
     41#include "bcast.h"
     42#include "link.h"
     43#include "name_table.h"
     44
     45#define BCLINK_WIN_DEFAULT  50	/* bcast link window size (default) */
     46#define BCLINK_WIN_MIN      32	/* bcast minimum link window size */
     47
     48const char tipc_bclink_name[] = "broadcast-link";
     49unsigned long sysctl_tipc_bc_retruni __read_mostly;
     50
     51/**
     52 * struct tipc_bc_base - base structure for keeping broadcast send state
     53 * @link: broadcast send link structure
     54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
     55 * @dests: array keeping number of reachable destinations per bearer
     56 * @primary_bearer: a bearer having links to all broadcast destinations, if any
     57 * @bcast_support: indicates if primary bearer, if any, supports broadcast
     58 * @force_bcast: forces broadcast for multicast traffic
     59 * @rcast_support: indicates if all peer nodes support replicast
     60 * @force_rcast: forces replicast for multicast traffic
     61 * @rc_ratio: dest count as percentage of cluster size where send method changes
     62 * @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast
     63 */
     64struct tipc_bc_base {
     65	struct tipc_link *link;
     66	struct sk_buff_head inputq;
     67	int dests[MAX_BEARERS];
     68	int primary_bearer;
     69	bool bcast_support;
     70	bool force_bcast;
     71	bool rcast_support;
     72	bool force_rcast;
     73	int rc_ratio;
     74	int bc_threshold;
     75};
     76
     77static struct tipc_bc_base *tipc_bc_base(struct net *net)
     78{
     79	return tipc_net(net)->bcbase;
     80}
     81
     82/* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link
     83 * Note: the MTU is decremented to give room for a tunnel header, in
     84 * case the message needs to be sent as replicast
     85 */
     86int tipc_bcast_get_mtu(struct net *net)
     87{
     88	return tipc_link_mss(tipc_bc_sndlink(net));
     89}
     90
     91void tipc_bcast_toggle_rcast(struct net *net, bool supp)
     92{
     93	tipc_bc_base(net)->rcast_support = supp;
     94}
     95
     96static void tipc_bcbase_calc_bc_threshold(struct net *net)
     97{
     98	struct tipc_bc_base *bb = tipc_bc_base(net);
     99	int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
    100
    101	bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
    102}
    103
    104/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
    105 *                               if any, and make it primary bearer
    106 */
    107static void tipc_bcbase_select_primary(struct net *net)
    108{
    109	struct tipc_bc_base *bb = tipc_bc_base(net);
    110	int all_dests =  tipc_link_bc_peers(bb->link);
    111	int max_win = tipc_link_max_win(bb->link);
    112	int min_win = tipc_link_min_win(bb->link);
    113	int i, mtu, prim;
    114
    115	bb->primary_bearer = INVALID_BEARER_ID;
    116	bb->bcast_support = true;
    117
    118	if (!all_dests)
    119		return;
    120
    121	for (i = 0; i < MAX_BEARERS; i++) {
    122		if (!bb->dests[i])
    123			continue;
    124
    125		mtu = tipc_bearer_mtu(net, i);
    126		if (mtu < tipc_link_mtu(bb->link)) {
    127			tipc_link_set_mtu(bb->link, mtu);
    128			tipc_link_set_queue_limits(bb->link,
    129						   min_win,
    130						   max_win);
    131		}
    132		bb->bcast_support &= tipc_bearer_bcast_support(net, i);
    133		if (bb->dests[i] < all_dests)
    134			continue;
    135
    136		bb->primary_bearer = i;
    137
    138		/* Reduce risk that all nodes select same primary */
    139		if ((i ^ tipc_own_addr(net)) & 1)
    140			break;
    141	}
    142	prim = bb->primary_bearer;
    143	if (prim != INVALID_BEARER_ID)
    144		bb->bcast_support = tipc_bearer_bcast_support(net, prim);
    145}
    146
    147void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
    148{
    149	struct tipc_bc_base *bb = tipc_bc_base(net);
    150
    151	tipc_bcast_lock(net);
    152	bb->dests[bearer_id]++;
    153	tipc_bcbase_select_primary(net);
    154	tipc_bcast_unlock(net);
    155}
    156
    157void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
    158{
    159	struct tipc_bc_base *bb = tipc_bc_base(net);
    160
    161	tipc_bcast_lock(net);
    162	bb->dests[bearer_id]--;
    163	tipc_bcbase_select_primary(net);
    164	tipc_bcast_unlock(net);
    165}
    166
    167/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
    168 *
    169 * Note that number of reachable destinations, as indicated in the dests[]
    170 * array, may transitionally differ from the number of destinations indicated
    171 * in each sent buffer. We can sustain this. Excess destination nodes will
    172 * drop and never acknowledge the unexpected packets, and missing destinations
    173 * will either require retransmission (if they are just about to be added to
    174 * the bearer), or be removed from the buffer's 'ackers' counter (if they
    175 * just went down)
    176 */
    177static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
    178{
    179	int bearer_id;
    180	struct tipc_bc_base *bb = tipc_bc_base(net);
    181	struct sk_buff *skb, *_skb;
    182	struct sk_buff_head _xmitq;
    183
    184	if (skb_queue_empty(xmitq))
    185		return;
    186
    187	/* The typical case: at least one bearer has links to all nodes */
    188	bearer_id = bb->primary_bearer;
    189	if (bearer_id >= 0) {
    190		tipc_bearer_bc_xmit(net, bearer_id, xmitq);
    191		return;
    192	}
    193
    194	/* We have to transmit across all bearers */
    195	__skb_queue_head_init(&_xmitq);
    196	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
    197		if (!bb->dests[bearer_id])
    198			continue;
    199
    200		skb_queue_walk(xmitq, skb) {
    201			_skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
    202			if (!_skb)
    203				break;
    204			__skb_queue_tail(&_xmitq, _skb);
    205		}
    206		tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
    207	}
    208	__skb_queue_purge(xmitq);
    209	__skb_queue_purge(&_xmitq);
    210}
    211
    212static void tipc_bcast_select_xmit_method(struct net *net, int dests,
    213					  struct tipc_mc_method *method)
    214{
    215	struct tipc_bc_base *bb = tipc_bc_base(net);
    216	unsigned long exp = method->expires;
    217
    218	/* Broadcast supported by used bearer/bearers? */
    219	if (!bb->bcast_support) {
    220		method->rcast = true;
    221		return;
    222	}
    223	/* Any destinations which don't support replicast ? */
    224	if (!bb->rcast_support) {
    225		method->rcast = false;
    226		return;
    227	}
    228	/* Can current method be changed ? */
    229	method->expires = jiffies + TIPC_METHOD_EXPIRE;
    230	if (method->mandatory)
    231		return;
    232
    233	if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
    234	    time_before(jiffies, exp))
    235		return;
    236
    237	/* Configuration as force 'broadcast' method */
    238	if (bb->force_bcast) {
    239		method->rcast = false;
    240		return;
    241	}
    242	/* Configuration as force 'replicast' method */
    243	if (bb->force_rcast) {
    244		method->rcast = true;
    245		return;
    246	}
    247	/* Configuration as 'autoselect' or default method */
    248	/* Determine method to use now */
    249	method->rcast = dests <= bb->bc_threshold;
    250}
    251
    252/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
    253 * @net: the applicable net namespace
    254 * @pkts: chain of buffers containing message
    255 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
    256 * Consumes the buffer chain.
    257 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
    258 */
    259int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
    260		    u16 *cong_link_cnt)
    261{
    262	struct tipc_link *l = tipc_bc_sndlink(net);
    263	struct sk_buff_head xmitq;
    264	int rc = 0;
    265
    266	__skb_queue_head_init(&xmitq);
    267	tipc_bcast_lock(net);
    268	if (tipc_link_bc_peers(l))
    269		rc = tipc_link_xmit(l, pkts, &xmitq);
    270	tipc_bcast_unlock(net);
    271	tipc_bcbase_xmit(net, &xmitq);
    272	__skb_queue_purge(pkts);
    273	if (rc == -ELINKCONG) {
    274		*cong_link_cnt = 1;
    275		rc = 0;
    276	}
    277	return rc;
    278}
    279
    280/* tipc_rcast_xmit - replicate and send a message to given destination nodes
    281 * @net: the applicable net namespace
    282 * @pkts: chain of buffers containing message
    283 * @dests: list of destination nodes
    284 * @cong_link_cnt: returns number of congested links
    285 * @cong_links: returns identities of congested links
    286 * Returns 0 if success, otherwise errno
    287 */
    288static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
    289			   struct tipc_nlist *dests, u16 *cong_link_cnt)
    290{
    291	struct tipc_dest *dst, *tmp;
    292	struct sk_buff_head _pkts;
    293	u32 dnode, selector;
    294
    295	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
    296	__skb_queue_head_init(&_pkts);
    297
    298	list_for_each_entry_safe(dst, tmp, &dests->list, list) {
    299		dnode = dst->node;
    300		if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts))
    301			return -ENOMEM;
    302
    303		/* Any other return value than -ELINKCONG is ignored */
    304		if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG)
    305			(*cong_link_cnt)++;
    306	}
    307	return 0;
    308}
    309
    310/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
    311 * @net: the applicable net namespace
    312 * @skb: socket buffer to copy
    313 * @method: send method to be used
    314 * @dests: destination nodes for message.
    315 * Returns 0 if success, otherwise errno
    316 */
    317static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
    318				struct tipc_mc_method *method,
    319				struct tipc_nlist *dests)
    320{
    321	struct tipc_msg *hdr, *_hdr;
    322	struct sk_buff_head tmpq;
    323	struct sk_buff *_skb;
    324	u16 cong_link_cnt;
    325	int rc = 0;
    326
    327	/* Is a cluster supporting with new capabilities ? */
    328	if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
    329		return 0;
    330
    331	hdr = buf_msg(skb);
    332	if (msg_user(hdr) == MSG_FRAGMENTER)
    333		hdr = msg_inner_hdr(hdr);
    334	if (msg_type(hdr) != TIPC_MCAST_MSG)
    335		return 0;
    336
    337	/* Allocate dummy message */
    338	_skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
    339	if (!_skb)
    340		return -ENOMEM;
    341
    342	/* Preparing for 'synching' header */
    343	msg_set_syn(hdr, 1);
    344
    345	/* Copy skb's header into a dummy header */
    346	skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
    347	skb_orphan(_skb);
    348
    349	/* Reverse method for dummy message */
    350	_hdr = buf_msg(_skb);
    351	msg_set_size(_hdr, MCAST_H_SIZE);
    352	msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
    353	msg_set_errcode(_hdr, TIPC_ERR_NO_PORT);
    354
    355	__skb_queue_head_init(&tmpq);
    356	__skb_queue_tail(&tmpq, _skb);
    357	if (method->rcast)
    358		rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt);
    359	else
    360		rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt);
    361
    362	/* This queue should normally be empty by now */
    363	__skb_queue_purge(&tmpq);
    364
    365	return rc;
    366}
    367
    368/* tipc_mcast_xmit - deliver message to indicated destination nodes
    369 *                   and to identified node local sockets
    370 * @net: the applicable net namespace
    371 * @pkts: chain of buffers containing message
    372 * @method: send method to be used
    373 * @dests: destination nodes for message.
    374 * @cong_link_cnt: returns number of encountered congested destination links
    375 * Consumes buffer chain.
    376 * Returns 0 if success, otherwise errno
    377 */
    378int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
    379		    struct tipc_mc_method *method, struct tipc_nlist *dests,
    380		    u16 *cong_link_cnt)
    381{
    382	struct sk_buff_head inputq, localq;
    383	bool rcast = method->rcast;
    384	struct tipc_msg *hdr;
    385	struct sk_buff *skb;
    386	int rc = 0;
    387
    388	skb_queue_head_init(&inputq);
    389	__skb_queue_head_init(&localq);
    390
    391	/* Clone packets before they are consumed by next call */
    392	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
    393		rc = -ENOMEM;
    394		goto exit;
    395	}
    396	/* Send according to determined transmit method */
    397	if (dests->remote) {
    398		tipc_bcast_select_xmit_method(net, dests->remote, method);
    399
    400		skb = skb_peek(pkts);
    401		hdr = buf_msg(skb);
    402		if (msg_user(hdr) == MSG_FRAGMENTER)
    403			hdr = msg_inner_hdr(hdr);
    404		msg_set_is_rcast(hdr, method->rcast);
    405
    406		/* Switch method ? */
    407		if (rcast != method->rcast) {
    408			rc = tipc_mcast_send_sync(net, skb, method, dests);
    409			if (unlikely(rc)) {
    410				pr_err("Unable to send SYN: method %d, rc %d\n",
    411				       rcast, rc);
    412				goto exit;
    413			}
    414		}
    415
    416		if (method->rcast)
    417			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
    418		else
    419			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
    420	}
    421
    422	if (dests->local) {
    423		tipc_loopback_trace(net, &localq);
    424		tipc_sk_mcast_rcv(net, &localq, &inputq);
    425	}
    426exit:
    427	/* This queue should normally be empty by now */
    428	__skb_queue_purge(pkts);
    429	return rc;
    430}
    431
    432/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
    433 *
    434 * RCU is locked, no other locks set
    435 */
    436int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
    437{
    438	struct tipc_msg *hdr = buf_msg(skb);
    439	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
    440	struct sk_buff_head xmitq;
    441	int rc;
    442
    443	__skb_queue_head_init(&xmitq);
    444
    445	if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
    446		kfree_skb(skb);
    447		return 0;
    448	}
    449
    450	tipc_bcast_lock(net);
    451	if (msg_user(hdr) == BCAST_PROTOCOL)
    452		rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
    453	else
    454		rc = tipc_link_rcv(l, skb, NULL);
    455	tipc_bcast_unlock(net);
    456
    457	tipc_bcbase_xmit(net, &xmitq);
    458
    459	/* Any socket wakeup messages ? */
    460	if (!skb_queue_empty(inputq))
    461		tipc_sk_rcv(net, inputq);
    462
    463	return rc;
    464}
    465
    466/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
    467 *
    468 * RCU is locked, no other locks set
    469 */
    470void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
    471			struct tipc_msg *hdr)
    472{
    473	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
    474	u16 acked = msg_bcast_ack(hdr);
    475	struct sk_buff_head xmitq;
    476
    477	/* Ignore bc acks sent by peer before bcast synch point was received */
    478	if (msg_bc_ack_invalid(hdr))
    479		return;
    480
    481	__skb_queue_head_init(&xmitq);
    482
    483	tipc_bcast_lock(net);
    484	tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
    485	tipc_bcast_unlock(net);
    486
    487	tipc_bcbase_xmit(net, &xmitq);
    488
    489	/* Any socket wakeup messages ? */
    490	if (!skb_queue_empty(inputq))
    491		tipc_sk_rcv(net, inputq);
    492}
    493
    494/* tipc_bcast_synch_rcv -  check and update rcv link with peer's send state
    495 *
    496 * RCU is locked, no other locks set
    497 */
    498int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
    499			struct tipc_msg *hdr,
    500			struct sk_buff_head *retrq)
    501{
    502	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
    503	struct tipc_gap_ack_blks *ga;
    504	struct sk_buff_head xmitq;
    505	int rc = 0;
    506
    507	__skb_queue_head_init(&xmitq);
    508
    509	tipc_bcast_lock(net);
    510	if (msg_type(hdr) != STATE_MSG) {
    511		tipc_link_bc_init_rcv(l, hdr);
    512	} else if (!msg_bc_ack_invalid(hdr)) {
    513		tipc_get_gap_ack_blks(&ga, l, hdr, false);
    514		if (!sysctl_tipc_bc_retruni)
    515			retrq = &xmitq;
    516		rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
    517					  msg_bc_gap(hdr), ga, &xmitq,
    518					  retrq);
    519		rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
    520	}
    521	tipc_bcast_unlock(net);
    522
    523	tipc_bcbase_xmit(net, &xmitq);
    524
    525	/* Any socket wakeup messages ? */
    526	if (!skb_queue_empty(inputq))
    527		tipc_sk_rcv(net, inputq);
    528	return rc;
    529}
    530
    531/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
    532 *
    533 * RCU is locked, node lock is set
    534 */
    535void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
    536			 struct sk_buff_head *xmitq)
    537{
    538	struct tipc_link *snd_l = tipc_bc_sndlink(net);
    539
    540	tipc_bcast_lock(net);
    541	tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
    542	tipc_bcbase_select_primary(net);
    543	tipc_bcbase_calc_bc_threshold(net);
    544	tipc_bcast_unlock(net);
    545}
    546
    547/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
    548 *
    549 * RCU is locked, node lock is set
    550 */
    551void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
    552{
    553	struct tipc_link *snd_l = tipc_bc_sndlink(net);
    554	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
    555	struct sk_buff_head xmitq;
    556
    557	__skb_queue_head_init(&xmitq);
    558
    559	tipc_bcast_lock(net);
    560	tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
    561	tipc_bcbase_select_primary(net);
    562	tipc_bcbase_calc_bc_threshold(net);
    563	tipc_bcast_unlock(net);
    564
    565	tipc_bcbase_xmit(net, &xmitq);
    566
    567	/* Any socket wakeup messages ? */
    568	if (!skb_queue_empty(inputq))
    569		tipc_sk_rcv(net, inputq);
    570}
    571
    572int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l)
    573{
    574	if (!l)
    575		return -ENOPROTOOPT;
    576
    577	tipc_bcast_lock(net);
    578	tipc_link_reset_stats(l);
    579	tipc_bcast_unlock(net);
    580	return 0;
    581}
    582
    583static int tipc_bc_link_set_queue_limits(struct net *net, u32 max_win)
    584{
    585	struct tipc_link *l = tipc_bc_sndlink(net);
    586
    587	if (!l)
    588		return -ENOPROTOOPT;
    589	if (max_win < BCLINK_WIN_MIN)
    590		max_win = BCLINK_WIN_MIN;
    591	if (max_win > TIPC_MAX_LINK_WIN)
    592		return -EINVAL;
    593	tipc_bcast_lock(net);
    594	tipc_link_set_queue_limits(l, tipc_link_min_win(l), max_win);
    595	tipc_bcast_unlock(net);
    596	return 0;
    597}
    598
    599static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode)
    600{
    601	struct tipc_bc_base *bb = tipc_bc_base(net);
    602
    603	switch (bc_mode) {
    604	case BCLINK_MODE_BCAST:
    605		if (!bb->bcast_support)
    606			return -ENOPROTOOPT;
    607
    608		bb->force_bcast = true;
    609		bb->force_rcast = false;
    610		break;
    611	case BCLINK_MODE_RCAST:
    612		if (!bb->rcast_support)
    613			return -ENOPROTOOPT;
    614
    615		bb->force_bcast = false;
    616		bb->force_rcast = true;
    617		break;
    618	case BCLINK_MODE_SEL:
    619		if (!bb->bcast_support || !bb->rcast_support)
    620			return -ENOPROTOOPT;
    621
    622		bb->force_bcast = false;
    623		bb->force_rcast = false;
    624		break;
    625	default:
    626		return -EINVAL;
    627	}
    628
    629	return 0;
    630}
    631
    632static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio)
    633{
    634	struct tipc_bc_base *bb = tipc_bc_base(net);
    635
    636	if (!bb->bcast_support || !bb->rcast_support)
    637		return -ENOPROTOOPT;
    638
    639	if (bc_ratio > 100 || bc_ratio <= 0)
    640		return -EINVAL;
    641
    642	bb->rc_ratio = bc_ratio;
    643	tipc_bcast_lock(net);
    644	tipc_bcbase_calc_bc_threshold(net);
    645	tipc_bcast_unlock(net);
    646
    647	return 0;
    648}
    649
    650int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
    651{
    652	int err;
    653	u32 win;
    654	u32 bc_mode;
    655	u32 bc_ratio;
    656	struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
    657
    658	if (!attrs[TIPC_NLA_LINK_PROP])
    659		return -EINVAL;
    660
    661	err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
    662	if (err)
    663		return err;
    664
    665	if (!props[TIPC_NLA_PROP_WIN] &&
    666	    !props[TIPC_NLA_PROP_BROADCAST] &&
    667	    !props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
    668		return -EOPNOTSUPP;
    669	}
    670
    671	if (props[TIPC_NLA_PROP_BROADCAST]) {
    672		bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]);
    673		err = tipc_bc_link_set_broadcast_mode(net, bc_mode);
    674	}
    675
    676	if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
    677		bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]);
    678		err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio);
    679	}
    680
    681	if (!err && props[TIPC_NLA_PROP_WIN]) {
    682		win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
    683		err = tipc_bc_link_set_queue_limits(net, win);
    684	}
    685
    686	return err;
    687}
    688
    689int tipc_bcast_init(struct net *net)
    690{
    691	struct tipc_net *tn = tipc_net(net);
    692	struct tipc_bc_base *bb = NULL;
    693	struct tipc_link *l = NULL;
    694
    695	bb = kzalloc(sizeof(*bb), GFP_KERNEL);
    696	if (!bb)
    697		goto enomem;
    698	tn->bcbase = bb;
    699	spin_lock_init(&tipc_net(net)->bclock);
    700
    701	if (!tipc_link_bc_create(net, 0, 0, NULL,
    702				 one_page_mtu,
    703				 BCLINK_WIN_DEFAULT,
    704				 BCLINK_WIN_DEFAULT,
    705				 0,
    706				 &bb->inputq,
    707				 NULL,
    708				 NULL,
    709				 &l))
    710		goto enomem;
    711	bb->link = l;
    712	tn->bcl = l;
    713	bb->rc_ratio = 10;
    714	bb->rcast_support = true;
    715	return 0;
    716enomem:
    717	kfree(bb);
    718	kfree(l);
    719	return -ENOMEM;
    720}
    721
    722void tipc_bcast_stop(struct net *net)
    723{
    724	struct tipc_net *tn = net_generic(net, tipc_net_id);
    725
    726	synchronize_net();
    727	kfree(tn->bcbase);
    728	kfree(tn->bcl);
    729}
    730
    731void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
    732{
    733	memset(nl, 0, sizeof(*nl));
    734	INIT_LIST_HEAD(&nl->list);
    735	nl->self = self;
    736}
    737
    738void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
    739{
    740	if (node == nl->self)
    741		nl->local = true;
    742	else if (tipc_dest_push(&nl->list, node, 0))
    743		nl->remote++;
    744}
    745
    746void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
    747{
    748	if (node == nl->self)
    749		nl->local = false;
    750	else if (tipc_dest_del(&nl->list, node, 0))
    751		nl->remote--;
    752}
    753
    754void tipc_nlist_purge(struct tipc_nlist *nl)
    755{
    756	tipc_dest_list_purge(&nl->list);
    757	nl->remote = 0;
    758	nl->local = false;
    759}
    760
    761u32 tipc_bcast_get_mode(struct net *net)
    762{
    763	struct tipc_bc_base *bb = tipc_bc_base(net);
    764
    765	if (bb->force_bcast)
    766		return BCLINK_MODE_BCAST;
    767
    768	if (bb->force_rcast)
    769		return BCLINK_MODE_RCAST;
    770
    771	if (bb->bcast_support && bb->rcast_support)
    772		return BCLINK_MODE_SEL;
    773
    774	return 0;
    775}
    776
    777u32 tipc_bcast_get_broadcast_ratio(struct net *net)
    778{
    779	struct tipc_bc_base *bb = tipc_bc_base(net);
    780
    781	return bb->rc_ratio;
    782}
    783
    784void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
    785			   struct sk_buff_head *inputq)
    786{
    787	struct sk_buff *skb, *_skb, *tmp;
    788	struct tipc_msg *hdr, *_hdr;
    789	bool match = false;
    790	u32 node, port;
    791
    792	skb = skb_peek(inputq);
    793	if (!skb)
    794		return;
    795
    796	hdr = buf_msg(skb);
    797
    798	if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
    799		return;
    800
    801	node = msg_orignode(hdr);
    802	if (node == tipc_own_addr(net))
    803		return;
    804
    805	port = msg_origport(hdr);
    806
    807	/* Has the twin SYN message already arrived ? */
    808	skb_queue_walk(defq, _skb) {
    809		_hdr = buf_msg(_skb);
    810		if (msg_orignode(_hdr) != node)
    811			continue;
    812		if (msg_origport(_hdr) != port)
    813			continue;
    814		match = true;
    815		break;
    816	}
    817
    818	if (!match) {
    819		if (!msg_is_syn(hdr))
    820			return;
    821		__skb_dequeue(inputq);
    822		__skb_queue_tail(defq, skb);
    823		return;
    824	}
    825
    826	/* Deliver non-SYN message from other link, otherwise queue it */
    827	if (!msg_is_syn(hdr)) {
    828		if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
    829			return;
    830		__skb_dequeue(inputq);
    831		__skb_queue_tail(defq, skb);
    832		return;
    833	}
    834
    835	/* Queue non-SYN/SYN message from same link */
    836	if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
    837		__skb_dequeue(inputq);
    838		__skb_queue_tail(defq, skb);
    839		return;
    840	}
    841
    842	/* Matching SYN messages => return the one with data, if any */
    843	__skb_unlink(_skb, defq);
    844	if (msg_data_sz(hdr)) {
    845		kfree_skb(_skb);
    846	} else {
    847		__skb_dequeue(inputq);
    848		kfree_skb(skb);
    849		__skb_queue_tail(inputq, _skb);
    850	}
    851
    852	/* Deliver subsequent non-SYN messages from same peer */
    853	skb_queue_walk_safe(defq, _skb, tmp) {
    854		_hdr = buf_msg(_skb);
    855		if (msg_orignode(_hdr) != node)
    856			continue;
    857		if (msg_origport(_hdr) != port)
    858			continue;
    859		if (msg_is_syn(_hdr))
    860			break;
    861		__skb_unlink(_skb, defq);
    862		__skb_queue_tail(inputq, _skb);
    863	}
    864}