cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

xpc_uv.c (47769B)


      1/*
      2 * This file is subject to the terms and conditions of the GNU General Public
      3 * License.  See the file "COPYING" in the main directory of this archive
      4 * for more details.
      5 *
      6 * Copyright (c) 2008-2009 Silicon Graphics, Inc.  All Rights Reserved.
      7 */
      8
      9/*
     10 * Cross Partition Communication (XPC) uv-based functions.
     11 *
     12 *     Architecture specific implementation of common functions.
     13 *
     14 */
     15
     16#include <linux/kernel.h>
     17#include <linux/mm.h>
     18#include <linux/interrupt.h>
     19#include <linux/delay.h>
     20#include <linux/device.h>
     21#include <linux/cpu.h>
     22#include <linux/module.h>
     23#include <linux/err.h>
     24#include <linux/slab.h>
     25#include <linux/numa.h>
     26#include <asm/uv/uv_hub.h>
     27#if defined CONFIG_X86_64
     28#include <asm/uv/bios.h>
     29#include <asm/uv/uv_irq.h>
     30#elif defined CONFIG_IA64_SGI_UV
     31#include <asm/sn/intr.h>
     32#include <asm/sn/sn_sal.h>
     33#endif
     34#include "../sgi-gru/gru.h"
     35#include "../sgi-gru/grukservices.h"
     36#include "xpc.h"
     37
     38#if defined CONFIG_IA64_SGI_UV
     39struct uv_IO_APIC_route_entry {
     40	__u64	vector		:  8,
     41		delivery_mode	:  3,
     42		dest_mode	:  1,
     43		delivery_status	:  1,
     44		polarity	:  1,
     45		__reserved_1	:  1,
     46		trigger		:  1,
     47		mask		:  1,
     48		__reserved_2	: 15,
     49		dest		: 32;
     50};
     51
     52#define sn_partition_id 0
     53#endif
     54
     55static struct xpc_heartbeat_uv *xpc_heartbeat_uv;
     56
     57#define XPC_ACTIVATE_MSG_SIZE_UV	(1 * GRU_CACHE_LINE_BYTES)
     58#define XPC_ACTIVATE_MQ_SIZE_UV		(4 * XP_MAX_NPARTITIONS_UV * \
     59					 XPC_ACTIVATE_MSG_SIZE_UV)
     60#define XPC_ACTIVATE_IRQ_NAME		"xpc_activate"
     61
     62#define XPC_NOTIFY_MSG_SIZE_UV		(2 * GRU_CACHE_LINE_BYTES)
     63#define XPC_NOTIFY_MQ_SIZE_UV		(4 * XP_MAX_NPARTITIONS_UV * \
     64					 XPC_NOTIFY_MSG_SIZE_UV)
     65#define XPC_NOTIFY_IRQ_NAME		"xpc_notify"
     66
     67static int xpc_mq_node = NUMA_NO_NODE;
     68
     69static struct xpc_gru_mq_uv *xpc_activate_mq_uv;
     70static struct xpc_gru_mq_uv *xpc_notify_mq_uv;
     71
     72static int
     73xpc_setup_partitions_uv(void)
     74{
     75	short partid;
     76	struct xpc_partition_uv *part_uv;
     77
     78	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
     79		part_uv = &xpc_partitions[partid].sn.uv;
     80
     81		mutex_init(&part_uv->cached_activate_gru_mq_desc_mutex);
     82		spin_lock_init(&part_uv->flags_lock);
     83		part_uv->remote_act_state = XPC_P_AS_INACTIVE;
     84	}
     85	return 0;
     86}
     87
     88static void
     89xpc_teardown_partitions_uv(void)
     90{
     91	short partid;
     92	struct xpc_partition_uv *part_uv;
     93	unsigned long irq_flags;
     94
     95	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
     96		part_uv = &xpc_partitions[partid].sn.uv;
     97
     98		if (part_uv->cached_activate_gru_mq_desc != NULL) {
     99			mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
    100			spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
    101			part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
    102			spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
    103			kfree(part_uv->cached_activate_gru_mq_desc);
    104			part_uv->cached_activate_gru_mq_desc = NULL;
    105			mutex_unlock(&part_uv->
    106				     cached_activate_gru_mq_desc_mutex);
    107		}
    108	}
    109}
    110
    111static int
    112xpc_get_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq, int cpu, char *irq_name)
    113{
    114	int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
    115
    116#if defined CONFIG_X86_64
    117	mq->irq = uv_setup_irq(irq_name, cpu, mq->mmr_blade, mq->mmr_offset,
    118			UV_AFFINITY_CPU);
    119	if (mq->irq < 0)
    120		return mq->irq;
    121
    122	mq->mmr_value = uv_read_global_mmr64(mmr_pnode, mq->mmr_offset);
    123
    124#elif defined CONFIG_IA64_SGI_UV
    125	if (strcmp(irq_name, XPC_ACTIVATE_IRQ_NAME) == 0)
    126		mq->irq = SGI_XPC_ACTIVATE;
    127	else if (strcmp(irq_name, XPC_NOTIFY_IRQ_NAME) == 0)
    128		mq->irq = SGI_XPC_NOTIFY;
    129	else
    130		return -EINVAL;
    131
    132	mq->mmr_value = (unsigned long)cpu_physical_id(cpu) << 32 | mq->irq;
    133	uv_write_global_mmr64(mmr_pnode, mq->mmr_offset, mq->mmr_value);
    134#else
    135	#error not a supported configuration
    136#endif
    137
    138	return 0;
    139}
    140
    141static void
    142xpc_release_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq)
    143{
    144#if defined CONFIG_X86_64
    145	uv_teardown_irq(mq->irq);
    146
    147#elif defined CONFIG_IA64_SGI_UV
    148	int mmr_pnode;
    149	unsigned long mmr_value;
    150
    151	mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
    152	mmr_value = 1UL << 16;
    153
    154	uv_write_global_mmr64(mmr_pnode, mq->mmr_offset, mmr_value);
    155#else
    156	#error not a supported configuration
    157#endif
    158}
    159
    160static int
    161xpc_gru_mq_watchlist_alloc_uv(struct xpc_gru_mq_uv *mq)
    162{
    163	int ret;
    164
    165#if defined CONFIG_IA64_SGI_UV
    166	int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
    167
    168	ret = sn_mq_watchlist_alloc(mmr_pnode, (void *)uv_gpa(mq->address),
    169				    mq->order, &mq->mmr_offset);
    170	if (ret < 0) {
    171		dev_err(xpc_part, "sn_mq_watchlist_alloc() failed, ret=%d\n",
    172			ret);
    173		return -EBUSY;
    174	}
    175#elif defined CONFIG_X86_64
    176	ret = uv_bios_mq_watchlist_alloc(uv_gpa(mq->address),
    177					 mq->order, &mq->mmr_offset);
    178	if (ret < 0) {
    179		dev_err(xpc_part, "uv_bios_mq_watchlist_alloc() failed, "
    180			"ret=%d\n", ret);
    181		return ret;
    182	}
    183#else
    184	#error not a supported configuration
    185#endif
    186
    187	mq->watchlist_num = ret;
    188	return 0;
    189}
    190
    191static void
    192xpc_gru_mq_watchlist_free_uv(struct xpc_gru_mq_uv *mq)
    193{
    194	int ret;
    195	int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
    196
    197#if defined CONFIG_X86_64
    198	ret = uv_bios_mq_watchlist_free(mmr_pnode, mq->watchlist_num);
    199	BUG_ON(ret != BIOS_STATUS_SUCCESS);
    200#elif defined CONFIG_IA64_SGI_UV
    201	ret = sn_mq_watchlist_free(mmr_pnode, mq->watchlist_num);
    202	BUG_ON(ret != SALRET_OK);
    203#else
    204	#error not a supported configuration
    205#endif
    206}
    207
    208static struct xpc_gru_mq_uv *
    209xpc_create_gru_mq_uv(unsigned int mq_size, int cpu, char *irq_name,
    210		     irq_handler_t irq_handler)
    211{
    212	enum xp_retval xp_ret;
    213	int ret;
    214	int nid;
    215	int nasid;
    216	int pg_order;
    217	struct page *page;
    218	struct xpc_gru_mq_uv *mq;
    219	struct uv_IO_APIC_route_entry *mmr_value;
    220
    221	mq = kmalloc(sizeof(struct xpc_gru_mq_uv), GFP_KERNEL);
    222	if (mq == NULL) {
    223		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
    224			"a xpc_gru_mq_uv structure\n");
    225		ret = -ENOMEM;
    226		goto out_0;
    227	}
    228
    229	mq->gru_mq_desc = kzalloc(sizeof(struct gru_message_queue_desc),
    230				  GFP_KERNEL);
    231	if (mq->gru_mq_desc == NULL) {
    232		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
    233			"a gru_message_queue_desc structure\n");
    234		ret = -ENOMEM;
    235		goto out_1;
    236	}
    237
    238	pg_order = get_order(mq_size);
    239	mq->order = pg_order + PAGE_SHIFT;
    240	mq_size = 1UL << mq->order;
    241
    242	mq->mmr_blade = uv_cpu_to_blade_id(cpu);
    243
    244	nid = cpu_to_node(cpu);
    245	page = __alloc_pages_node(nid,
    246				      GFP_KERNEL | __GFP_ZERO | __GFP_THISNODE,
    247				      pg_order);
    248	if (page == NULL) {
    249		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to alloc %d "
    250			"bytes of memory on nid=%d for GRU mq\n", mq_size, nid);
    251		ret = -ENOMEM;
    252		goto out_2;
    253	}
    254	mq->address = page_address(page);
    255
    256	/* enable generation of irq when GRU mq operation occurs to this mq */
    257	ret = xpc_gru_mq_watchlist_alloc_uv(mq);
    258	if (ret != 0)
    259		goto out_3;
    260
    261	ret = xpc_get_gru_mq_irq_uv(mq, cpu, irq_name);
    262	if (ret != 0)
    263		goto out_4;
    264
    265	ret = request_irq(mq->irq, irq_handler, 0, irq_name, NULL);
    266	if (ret != 0) {
    267		dev_err(xpc_part, "request_irq(irq=%d) returned error=%d\n",
    268			mq->irq, -ret);
    269		goto out_5;
    270	}
    271
    272	nasid = UV_PNODE_TO_NASID(uv_cpu_to_pnode(cpu));
    273
    274	mmr_value = (struct uv_IO_APIC_route_entry *)&mq->mmr_value;
    275	ret = gru_create_message_queue(mq->gru_mq_desc, mq->address, mq_size,
    276				     nasid, mmr_value->vector, mmr_value->dest);
    277	if (ret != 0) {
    278		dev_err(xpc_part, "gru_create_message_queue() returned "
    279			"error=%d\n", ret);
    280		ret = -EINVAL;
    281		goto out_6;
    282	}
    283
    284	/* allow other partitions to access this GRU mq */
    285	xp_ret = xp_expand_memprotect(xp_pa(mq->address), mq_size);
    286	if (xp_ret != xpSuccess) {
    287		ret = -EACCES;
    288		goto out_6;
    289	}
    290
    291	return mq;
    292
    293	/* something went wrong */
    294out_6:
    295	free_irq(mq->irq, NULL);
    296out_5:
    297	xpc_release_gru_mq_irq_uv(mq);
    298out_4:
    299	xpc_gru_mq_watchlist_free_uv(mq);
    300out_3:
    301	free_pages((unsigned long)mq->address, pg_order);
    302out_2:
    303	kfree(mq->gru_mq_desc);
    304out_1:
    305	kfree(mq);
    306out_0:
    307	return ERR_PTR(ret);
    308}
    309
    310static void
    311xpc_destroy_gru_mq_uv(struct xpc_gru_mq_uv *mq)
    312{
    313	unsigned int mq_size;
    314	int pg_order;
    315	int ret;
    316
    317	/* disallow other partitions to access GRU mq */
    318	mq_size = 1UL << mq->order;
    319	ret = xp_restrict_memprotect(xp_pa(mq->address), mq_size);
    320	BUG_ON(ret != xpSuccess);
    321
    322	/* unregister irq handler and release mq irq/vector mapping */
    323	free_irq(mq->irq, NULL);
    324	xpc_release_gru_mq_irq_uv(mq);
    325
    326	/* disable generation of irq when GRU mq op occurs to this mq */
    327	xpc_gru_mq_watchlist_free_uv(mq);
    328
    329	pg_order = mq->order - PAGE_SHIFT;
    330	free_pages((unsigned long)mq->address, pg_order);
    331
    332	kfree(mq);
    333}
    334
    335static enum xp_retval
    336xpc_send_gru_msg(struct gru_message_queue_desc *gru_mq_desc, void *msg,
    337		 size_t msg_size)
    338{
    339	enum xp_retval xp_ret;
    340	int ret;
    341
    342	while (1) {
    343		ret = gru_send_message_gpa(gru_mq_desc, msg, msg_size);
    344		if (ret == MQE_OK) {
    345			xp_ret = xpSuccess;
    346			break;
    347		}
    348
    349		if (ret == MQE_QUEUE_FULL) {
    350			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
    351				"error=MQE_QUEUE_FULL\n");
    352			/* !!! handle QLimit reached; delay & try again */
    353			/* ??? Do we add a limit to the number of retries? */
    354			(void)msleep_interruptible(10);
    355		} else if (ret == MQE_CONGESTION) {
    356			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
    357				"error=MQE_CONGESTION\n");
    358			/* !!! handle LB Overflow; simply try again */
    359			/* ??? Do we add a limit to the number of retries? */
    360		} else {
    361			/* !!! Currently this is MQE_UNEXPECTED_CB_ERR */
    362			dev_err(xpc_chan, "gru_send_message_gpa() returned "
    363				"error=%d\n", ret);
    364			xp_ret = xpGruSendMqError;
    365			break;
    366		}
    367	}
    368	return xp_ret;
    369}
    370
    371static void
    372xpc_process_activate_IRQ_rcvd_uv(void)
    373{
    374	unsigned long irq_flags;
    375	short partid;
    376	struct xpc_partition *part;
    377	u8 act_state_req;
    378
    379	DBUG_ON(xpc_activate_IRQ_rcvd == 0);
    380
    381	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    382	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
    383		part = &xpc_partitions[partid];
    384
    385		if (part->sn.uv.act_state_req == 0)
    386			continue;
    387
    388		xpc_activate_IRQ_rcvd--;
    389		BUG_ON(xpc_activate_IRQ_rcvd < 0);
    390
    391		act_state_req = part->sn.uv.act_state_req;
    392		part->sn.uv.act_state_req = 0;
    393		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    394
    395		if (act_state_req == XPC_P_ASR_ACTIVATE_UV) {
    396			if (part->act_state == XPC_P_AS_INACTIVE)
    397				xpc_activate_partition(part);
    398			else if (part->act_state == XPC_P_AS_DEACTIVATING)
    399				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
    400
    401		} else if (act_state_req == XPC_P_ASR_REACTIVATE_UV) {
    402			if (part->act_state == XPC_P_AS_INACTIVE)
    403				xpc_activate_partition(part);
    404			else
    405				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
    406
    407		} else if (act_state_req == XPC_P_ASR_DEACTIVATE_UV) {
    408			XPC_DEACTIVATE_PARTITION(part, part->sn.uv.reason);
    409
    410		} else {
    411			BUG();
    412		}
    413
    414		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    415		if (xpc_activate_IRQ_rcvd == 0)
    416			break;
    417	}
    418	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    419
    420}
    421
    422static void
    423xpc_handle_activate_mq_msg_uv(struct xpc_partition *part,
    424			      struct xpc_activate_mq_msghdr_uv *msg_hdr,
    425			      int part_setup,
    426			      int *wakeup_hb_checker)
    427{
    428	unsigned long irq_flags;
    429	struct xpc_partition_uv *part_uv = &part->sn.uv;
    430	struct xpc_openclose_args *args;
    431
    432	part_uv->remote_act_state = msg_hdr->act_state;
    433
    434	switch (msg_hdr->type) {
    435	case XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV:
    436		/* syncing of remote_act_state was just done above */
    437		break;
    438
    439	case XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV: {
    440		struct xpc_activate_mq_msg_activate_req_uv *msg;
    441
    442		/*
    443		 * ??? Do we deal here with ts_jiffies being different
    444		 * ??? if act_state != XPC_P_AS_INACTIVE instead of
    445		 * ??? below?
    446		 */
    447		msg = container_of(msg_hdr, struct
    448				   xpc_activate_mq_msg_activate_req_uv, hdr);
    449
    450		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    451		if (part_uv->act_state_req == 0)
    452			xpc_activate_IRQ_rcvd++;
    453		part_uv->act_state_req = XPC_P_ASR_ACTIVATE_UV;
    454		part->remote_rp_pa = msg->rp_gpa; /* !!! _pa is _gpa */
    455		part->remote_rp_ts_jiffies = msg_hdr->rp_ts_jiffies;
    456		part_uv->heartbeat_gpa = msg->heartbeat_gpa;
    457
    458		if (msg->activate_gru_mq_desc_gpa !=
    459		    part_uv->activate_gru_mq_desc_gpa) {
    460			spin_lock(&part_uv->flags_lock);
    461			part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
    462			spin_unlock(&part_uv->flags_lock);
    463			part_uv->activate_gru_mq_desc_gpa =
    464			    msg->activate_gru_mq_desc_gpa;
    465		}
    466		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    467
    468		(*wakeup_hb_checker)++;
    469		break;
    470	}
    471	case XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV: {
    472		struct xpc_activate_mq_msg_deactivate_req_uv *msg;
    473
    474		msg = container_of(msg_hdr, struct
    475				   xpc_activate_mq_msg_deactivate_req_uv, hdr);
    476
    477		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    478		if (part_uv->act_state_req == 0)
    479			xpc_activate_IRQ_rcvd++;
    480		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
    481		part_uv->reason = msg->reason;
    482		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    483
    484		(*wakeup_hb_checker)++;
    485		return;
    486	}
    487	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV: {
    488		struct xpc_activate_mq_msg_chctl_closerequest_uv *msg;
    489
    490		if (!part_setup)
    491			break;
    492
    493		msg = container_of(msg_hdr, struct
    494				   xpc_activate_mq_msg_chctl_closerequest_uv,
    495				   hdr);
    496		args = &part->remote_openclose_args[msg->ch_number];
    497		args->reason = msg->reason;
    498
    499		spin_lock_irqsave(&part->chctl_lock, irq_flags);
    500		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREQUEST;
    501		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
    502
    503		xpc_wakeup_channel_mgr(part);
    504		break;
    505	}
    506	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV: {
    507		struct xpc_activate_mq_msg_chctl_closereply_uv *msg;
    508
    509		if (!part_setup)
    510			break;
    511
    512		msg = container_of(msg_hdr, struct
    513				   xpc_activate_mq_msg_chctl_closereply_uv,
    514				   hdr);
    515
    516		spin_lock_irqsave(&part->chctl_lock, irq_flags);
    517		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREPLY;
    518		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
    519
    520		xpc_wakeup_channel_mgr(part);
    521		break;
    522	}
    523	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV: {
    524		struct xpc_activate_mq_msg_chctl_openrequest_uv *msg;
    525
    526		if (!part_setup)
    527			break;
    528
    529		msg = container_of(msg_hdr, struct
    530				   xpc_activate_mq_msg_chctl_openrequest_uv,
    531				   hdr);
    532		args = &part->remote_openclose_args[msg->ch_number];
    533		args->entry_size = msg->entry_size;
    534		args->local_nentries = msg->local_nentries;
    535
    536		spin_lock_irqsave(&part->chctl_lock, irq_flags);
    537		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREQUEST;
    538		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
    539
    540		xpc_wakeup_channel_mgr(part);
    541		break;
    542	}
    543	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV: {
    544		struct xpc_activate_mq_msg_chctl_openreply_uv *msg;
    545
    546		if (!part_setup)
    547			break;
    548
    549		msg = container_of(msg_hdr, struct
    550				   xpc_activate_mq_msg_chctl_openreply_uv, hdr);
    551		args = &part->remote_openclose_args[msg->ch_number];
    552		args->remote_nentries = msg->remote_nentries;
    553		args->local_nentries = msg->local_nentries;
    554		args->local_msgqueue_pa = msg->notify_gru_mq_desc_gpa;
    555
    556		spin_lock_irqsave(&part->chctl_lock, irq_flags);
    557		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREPLY;
    558		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
    559
    560		xpc_wakeup_channel_mgr(part);
    561		break;
    562	}
    563	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV: {
    564		struct xpc_activate_mq_msg_chctl_opencomplete_uv *msg;
    565
    566		if (!part_setup)
    567			break;
    568
    569		msg = container_of(msg_hdr, struct
    570				xpc_activate_mq_msg_chctl_opencomplete_uv, hdr);
    571		spin_lock_irqsave(&part->chctl_lock, irq_flags);
    572		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENCOMPLETE;
    573		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
    574
    575		xpc_wakeup_channel_mgr(part);
    576	}
    577		fallthrough;
    578	case XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV:
    579		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
    580		part_uv->flags |= XPC_P_ENGAGED_UV;
    581		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
    582		break;
    583
    584	case XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV:
    585		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
    586		part_uv->flags &= ~XPC_P_ENGAGED_UV;
    587		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
    588		break;
    589
    590	default:
    591		dev_err(xpc_part, "received unknown activate_mq msg type=%d "
    592			"from partition=%d\n", msg_hdr->type, XPC_PARTID(part));
    593
    594		/* get hb checker to deactivate from the remote partition */
    595		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    596		if (part_uv->act_state_req == 0)
    597			xpc_activate_IRQ_rcvd++;
    598		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
    599		part_uv->reason = xpBadMsgType;
    600		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    601
    602		(*wakeup_hb_checker)++;
    603		return;
    604	}
    605
    606	if (msg_hdr->rp_ts_jiffies != part->remote_rp_ts_jiffies &&
    607	    part->remote_rp_ts_jiffies != 0) {
    608		/*
    609		 * ??? Does what we do here need to be sensitive to
    610		 * ??? act_state or remote_act_state?
    611		 */
    612		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    613		if (part_uv->act_state_req == 0)
    614			xpc_activate_IRQ_rcvd++;
    615		part_uv->act_state_req = XPC_P_ASR_REACTIVATE_UV;
    616		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    617
    618		(*wakeup_hb_checker)++;
    619	}
    620}
    621
    622static irqreturn_t
    623xpc_handle_activate_IRQ_uv(int irq, void *dev_id)
    624{
    625	struct xpc_activate_mq_msghdr_uv *msg_hdr;
    626	short partid;
    627	struct xpc_partition *part;
    628	int wakeup_hb_checker = 0;
    629	int part_referenced;
    630
    631	while (1) {
    632		msg_hdr = gru_get_next_message(xpc_activate_mq_uv->gru_mq_desc);
    633		if (msg_hdr == NULL)
    634			break;
    635
    636		partid = msg_hdr->partid;
    637		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
    638			dev_err(xpc_part, "xpc_handle_activate_IRQ_uv() "
    639				"received invalid partid=0x%x in message\n",
    640				partid);
    641		} else {
    642			part = &xpc_partitions[partid];
    643
    644			part_referenced = xpc_part_ref(part);
    645			xpc_handle_activate_mq_msg_uv(part, msg_hdr,
    646						      part_referenced,
    647						      &wakeup_hb_checker);
    648			if (part_referenced)
    649				xpc_part_deref(part);
    650		}
    651
    652		gru_free_message(xpc_activate_mq_uv->gru_mq_desc, msg_hdr);
    653	}
    654
    655	if (wakeup_hb_checker)
    656		wake_up_interruptible(&xpc_activate_IRQ_wq);
    657
    658	return IRQ_HANDLED;
    659}
    660
    661static enum xp_retval
    662xpc_cache_remote_gru_mq_desc_uv(struct gru_message_queue_desc *gru_mq_desc,
    663				unsigned long gru_mq_desc_gpa)
    664{
    665	enum xp_retval ret;
    666
    667	ret = xp_remote_memcpy(uv_gpa(gru_mq_desc), gru_mq_desc_gpa,
    668			       sizeof(struct gru_message_queue_desc));
    669	if (ret == xpSuccess)
    670		gru_mq_desc->mq = NULL;
    671
    672	return ret;
    673}
    674
    675static enum xp_retval
    676xpc_send_activate_IRQ_uv(struct xpc_partition *part, void *msg, size_t msg_size,
    677			 int msg_type)
    678{
    679	struct xpc_activate_mq_msghdr_uv *msg_hdr = msg;
    680	struct xpc_partition_uv *part_uv = &part->sn.uv;
    681	struct gru_message_queue_desc *gru_mq_desc;
    682	unsigned long irq_flags;
    683	enum xp_retval ret;
    684
    685	DBUG_ON(msg_size > XPC_ACTIVATE_MSG_SIZE_UV);
    686
    687	msg_hdr->type = msg_type;
    688	msg_hdr->partid = xp_partition_id;
    689	msg_hdr->act_state = part->act_state;
    690	msg_hdr->rp_ts_jiffies = xpc_rsvd_page->ts_jiffies;
    691
    692	mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
    693again:
    694	if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV)) {
    695		gru_mq_desc = part_uv->cached_activate_gru_mq_desc;
    696		if (gru_mq_desc == NULL) {
    697			gru_mq_desc = kmalloc(sizeof(struct
    698					      gru_message_queue_desc),
    699					      GFP_ATOMIC);
    700			if (gru_mq_desc == NULL) {
    701				ret = xpNoMemory;
    702				goto done;
    703			}
    704			part_uv->cached_activate_gru_mq_desc = gru_mq_desc;
    705		}
    706
    707		ret = xpc_cache_remote_gru_mq_desc_uv(gru_mq_desc,
    708						      part_uv->
    709						      activate_gru_mq_desc_gpa);
    710		if (ret != xpSuccess)
    711			goto done;
    712
    713		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
    714		part_uv->flags |= XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
    715		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
    716	}
    717
    718	/* ??? Is holding a spin_lock (ch->lock) during this call a bad idea? */
    719	ret = xpc_send_gru_msg(part_uv->cached_activate_gru_mq_desc, msg,
    720			       msg_size);
    721	if (ret != xpSuccess) {
    722		smp_rmb();	/* ensure a fresh copy of part_uv->flags */
    723		if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV))
    724			goto again;
    725	}
    726done:
    727	mutex_unlock(&part_uv->cached_activate_gru_mq_desc_mutex);
    728	return ret;
    729}
    730
    731static void
    732xpc_send_activate_IRQ_part_uv(struct xpc_partition *part, void *msg,
    733			      size_t msg_size, int msg_type)
    734{
    735	enum xp_retval ret;
    736
    737	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
    738	if (unlikely(ret != xpSuccess))
    739		XPC_DEACTIVATE_PARTITION(part, ret);
    740}
    741
    742static void
    743xpc_send_activate_IRQ_ch_uv(struct xpc_channel *ch, unsigned long *irq_flags,
    744			 void *msg, size_t msg_size, int msg_type)
    745{
    746	struct xpc_partition *part = &xpc_partitions[ch->partid];
    747	enum xp_retval ret;
    748
    749	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
    750	if (unlikely(ret != xpSuccess)) {
    751		if (irq_flags != NULL)
    752			spin_unlock_irqrestore(&ch->lock, *irq_flags);
    753
    754		XPC_DEACTIVATE_PARTITION(part, ret);
    755
    756		if (irq_flags != NULL)
    757			spin_lock_irqsave(&ch->lock, *irq_flags);
    758	}
    759}
    760
    761static void
    762xpc_send_local_activate_IRQ_uv(struct xpc_partition *part, int act_state_req)
    763{
    764	unsigned long irq_flags;
    765	struct xpc_partition_uv *part_uv = &part->sn.uv;
    766
    767	/*
    768	 * !!! Make our side think that the remote partition sent an activate
    769	 * !!! mq message our way by doing what the activate IRQ handler would
    770	 * !!! do had one really been sent.
    771	 */
    772
    773	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    774	if (part_uv->act_state_req == 0)
    775		xpc_activate_IRQ_rcvd++;
    776	part_uv->act_state_req = act_state_req;
    777	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
    778
    779	wake_up_interruptible(&xpc_activate_IRQ_wq);
    780}
    781
    782static enum xp_retval
    783xpc_get_partition_rsvd_page_pa_uv(void *buf, u64 *cookie, unsigned long *rp_pa,
    784				  size_t *len)
    785{
    786	s64 status;
    787	enum xp_retval ret;
    788
    789#if defined CONFIG_X86_64
    790	status = uv_bios_reserved_page_pa((u64)buf, cookie, (u64 *)rp_pa,
    791					  (u64 *)len);
    792	if (status == BIOS_STATUS_SUCCESS)
    793		ret = xpSuccess;
    794	else if (status == BIOS_STATUS_MORE_PASSES)
    795		ret = xpNeedMoreInfo;
    796	else
    797		ret = xpBiosError;
    798
    799#elif defined CONFIG_IA64_SGI_UV
    800	status = sn_partition_reserved_page_pa((u64)buf, cookie, rp_pa, len);
    801	if (status == SALRET_OK)
    802		ret = xpSuccess;
    803	else if (status == SALRET_MORE_PASSES)
    804		ret = xpNeedMoreInfo;
    805	else
    806		ret = xpSalError;
    807
    808#else
    809	#error not a supported configuration
    810#endif
    811
    812	return ret;
    813}
    814
    815static int
    816xpc_setup_rsvd_page_uv(struct xpc_rsvd_page *rp)
    817{
    818	xpc_heartbeat_uv =
    819	    &xpc_partitions[sn_partition_id].sn.uv.cached_heartbeat;
    820	rp->sn.uv.heartbeat_gpa = uv_gpa(xpc_heartbeat_uv);
    821	rp->sn.uv.activate_gru_mq_desc_gpa =
    822	    uv_gpa(xpc_activate_mq_uv->gru_mq_desc);
    823	return 0;
    824}
    825
    826static void
    827xpc_allow_hb_uv(short partid)
    828{
    829}
    830
    831static void
    832xpc_disallow_hb_uv(short partid)
    833{
    834}
    835
    836static void
    837xpc_disallow_all_hbs_uv(void)
    838{
    839}
    840
    841static void
    842xpc_increment_heartbeat_uv(void)
    843{
    844	xpc_heartbeat_uv->value++;
    845}
    846
    847static void
    848xpc_offline_heartbeat_uv(void)
    849{
    850	xpc_increment_heartbeat_uv();
    851	xpc_heartbeat_uv->offline = 1;
    852}
    853
    854static void
    855xpc_online_heartbeat_uv(void)
    856{
    857	xpc_increment_heartbeat_uv();
    858	xpc_heartbeat_uv->offline = 0;
    859}
    860
    861static void
    862xpc_heartbeat_init_uv(void)
    863{
    864	xpc_heartbeat_uv->value = 1;
    865	xpc_heartbeat_uv->offline = 0;
    866}
    867
    868static void
    869xpc_heartbeat_exit_uv(void)
    870{
    871	xpc_offline_heartbeat_uv();
    872}
    873
    874static enum xp_retval
    875xpc_get_remote_heartbeat_uv(struct xpc_partition *part)
    876{
    877	struct xpc_partition_uv *part_uv = &part->sn.uv;
    878	enum xp_retval ret;
    879
    880	ret = xp_remote_memcpy(uv_gpa(&part_uv->cached_heartbeat),
    881			       part_uv->heartbeat_gpa,
    882			       sizeof(struct xpc_heartbeat_uv));
    883	if (ret != xpSuccess)
    884		return ret;
    885
    886	if (part_uv->cached_heartbeat.value == part->last_heartbeat &&
    887	    !part_uv->cached_heartbeat.offline) {
    888
    889		ret = xpNoHeartbeat;
    890	} else {
    891		part->last_heartbeat = part_uv->cached_heartbeat.value;
    892	}
    893	return ret;
    894}
    895
    896static void
    897xpc_request_partition_activation_uv(struct xpc_rsvd_page *remote_rp,
    898				    unsigned long remote_rp_gpa, int nasid)
    899{
    900	short partid = remote_rp->SAL_partid;
    901	struct xpc_partition *part = &xpc_partitions[partid];
    902	struct xpc_activate_mq_msg_activate_req_uv msg;
    903
    904	part->remote_rp_pa = remote_rp_gpa; /* !!! _pa here is really _gpa */
    905	part->remote_rp_ts_jiffies = remote_rp->ts_jiffies;
    906	part->sn.uv.heartbeat_gpa = remote_rp->sn.uv.heartbeat_gpa;
    907	part->sn.uv.activate_gru_mq_desc_gpa =
    908	    remote_rp->sn.uv.activate_gru_mq_desc_gpa;
    909
    910	/*
    911	 * ??? Is it a good idea to make this conditional on what is
    912	 * ??? potentially stale state information?
    913	 */
    914	if (part->sn.uv.remote_act_state == XPC_P_AS_INACTIVE) {
    915		msg.rp_gpa = uv_gpa(xpc_rsvd_page);
    916		msg.heartbeat_gpa = xpc_rsvd_page->sn.uv.heartbeat_gpa;
    917		msg.activate_gru_mq_desc_gpa =
    918		    xpc_rsvd_page->sn.uv.activate_gru_mq_desc_gpa;
    919		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
    920					   XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV);
    921	}
    922
    923	if (part->act_state == XPC_P_AS_INACTIVE)
    924		xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
    925}
    926
    927static void
    928xpc_request_partition_reactivation_uv(struct xpc_partition *part)
    929{
    930	xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
    931}
    932
    933static void
    934xpc_request_partition_deactivation_uv(struct xpc_partition *part)
    935{
    936	struct xpc_activate_mq_msg_deactivate_req_uv msg;
    937
    938	/*
    939	 * ??? Is it a good idea to make this conditional on what is
    940	 * ??? potentially stale state information?
    941	 */
    942	if (part->sn.uv.remote_act_state != XPC_P_AS_DEACTIVATING &&
    943	    part->sn.uv.remote_act_state != XPC_P_AS_INACTIVE) {
    944
    945		msg.reason = part->reason;
    946		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
    947					 XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV);
    948	}
    949}
    950
    951static void
    952xpc_cancel_partition_deactivation_request_uv(struct xpc_partition *part)
    953{
    954	/* nothing needs to be done */
    955	return;
    956}
    957
    958static void
    959xpc_init_fifo_uv(struct xpc_fifo_head_uv *head)
    960{
    961	head->first = NULL;
    962	head->last = NULL;
    963	spin_lock_init(&head->lock);
    964	head->n_entries = 0;
    965}
    966
    967static void *
    968xpc_get_fifo_entry_uv(struct xpc_fifo_head_uv *head)
    969{
    970	unsigned long irq_flags;
    971	struct xpc_fifo_entry_uv *first;
    972
    973	spin_lock_irqsave(&head->lock, irq_flags);
    974	first = head->first;
    975	if (head->first != NULL) {
    976		head->first = first->next;
    977		if (head->first == NULL)
    978			head->last = NULL;
    979
    980		head->n_entries--;
    981		BUG_ON(head->n_entries < 0);
    982
    983		first->next = NULL;
    984	}
    985	spin_unlock_irqrestore(&head->lock, irq_flags);
    986	return first;
    987}
    988
    989static void
    990xpc_put_fifo_entry_uv(struct xpc_fifo_head_uv *head,
    991		      struct xpc_fifo_entry_uv *last)
    992{
    993	unsigned long irq_flags;
    994
    995	last->next = NULL;
    996	spin_lock_irqsave(&head->lock, irq_flags);
    997	if (head->last != NULL)
    998		head->last->next = last;
    999	else
   1000		head->first = last;
   1001	head->last = last;
   1002	head->n_entries++;
   1003	spin_unlock_irqrestore(&head->lock, irq_flags);
   1004}
   1005
   1006static int
   1007xpc_n_of_fifo_entries_uv(struct xpc_fifo_head_uv *head)
   1008{
   1009	return head->n_entries;
   1010}
   1011
   1012/*
   1013 * Setup the channel structures that are uv specific.
   1014 */
   1015static enum xp_retval
   1016xpc_setup_ch_structures_uv(struct xpc_partition *part)
   1017{
   1018	struct xpc_channel_uv *ch_uv;
   1019	int ch_number;
   1020
   1021	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
   1022		ch_uv = &part->channels[ch_number].sn.uv;
   1023
   1024		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
   1025		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
   1026	}
   1027
   1028	return xpSuccess;
   1029}
   1030
   1031/*
   1032 * Teardown the channel structures that are uv specific.
   1033 */
   1034static void
   1035xpc_teardown_ch_structures_uv(struct xpc_partition *part)
   1036{
   1037	/* nothing needs to be done */
   1038	return;
   1039}
   1040
   1041static enum xp_retval
   1042xpc_make_first_contact_uv(struct xpc_partition *part)
   1043{
   1044	struct xpc_activate_mq_msg_uv msg;
   1045
   1046	/*
   1047	 * We send a sync msg to get the remote partition's remote_act_state
   1048	 * updated to our current act_state which at this point should
   1049	 * be XPC_P_AS_ACTIVATING.
   1050	 */
   1051	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
   1052				      XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV);
   1053
   1054	while (!((part->sn.uv.remote_act_state == XPC_P_AS_ACTIVATING) ||
   1055		 (part->sn.uv.remote_act_state == XPC_P_AS_ACTIVE))) {
   1056
   1057		dev_dbg(xpc_part, "waiting to make first contact with "
   1058			"partition %d\n", XPC_PARTID(part));
   1059
   1060		/* wait a 1/4 of a second or so */
   1061		(void)msleep_interruptible(250);
   1062
   1063		if (part->act_state == XPC_P_AS_DEACTIVATING)
   1064			return part->reason;
   1065	}
   1066
   1067	return xpSuccess;
   1068}
   1069
   1070static u64
   1071xpc_get_chctl_all_flags_uv(struct xpc_partition *part)
   1072{
   1073	unsigned long irq_flags;
   1074	union xpc_channel_ctl_flags chctl;
   1075
   1076	spin_lock_irqsave(&part->chctl_lock, irq_flags);
   1077	chctl = part->chctl;
   1078	if (chctl.all_flags != 0)
   1079		part->chctl.all_flags = 0;
   1080
   1081	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
   1082	return chctl.all_flags;
   1083}
   1084
   1085static enum xp_retval
   1086xpc_allocate_send_msg_slot_uv(struct xpc_channel *ch)
   1087{
   1088	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
   1089	struct xpc_send_msg_slot_uv *msg_slot;
   1090	unsigned long irq_flags;
   1091	int nentries;
   1092	int entry;
   1093	size_t nbytes;
   1094
   1095	for (nentries = ch->local_nentries; nentries > 0; nentries--) {
   1096		nbytes = nentries * sizeof(struct xpc_send_msg_slot_uv);
   1097		ch_uv->send_msg_slots = kzalloc(nbytes, GFP_KERNEL);
   1098		if (ch_uv->send_msg_slots == NULL)
   1099			continue;
   1100
   1101		for (entry = 0; entry < nentries; entry++) {
   1102			msg_slot = &ch_uv->send_msg_slots[entry];
   1103
   1104			msg_slot->msg_slot_number = entry;
   1105			xpc_put_fifo_entry_uv(&ch_uv->msg_slot_free_list,
   1106					      &msg_slot->next);
   1107		}
   1108
   1109		spin_lock_irqsave(&ch->lock, irq_flags);
   1110		if (nentries < ch->local_nentries)
   1111			ch->local_nentries = nentries;
   1112		spin_unlock_irqrestore(&ch->lock, irq_flags);
   1113		return xpSuccess;
   1114	}
   1115
   1116	return xpNoMemory;
   1117}
   1118
   1119static enum xp_retval
   1120xpc_allocate_recv_msg_slot_uv(struct xpc_channel *ch)
   1121{
   1122	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
   1123	struct xpc_notify_mq_msg_uv *msg_slot;
   1124	unsigned long irq_flags;
   1125	int nentries;
   1126	int entry;
   1127	size_t nbytes;
   1128
   1129	for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
   1130		nbytes = nentries * ch->entry_size;
   1131		ch_uv->recv_msg_slots = kzalloc(nbytes, GFP_KERNEL);
   1132		if (ch_uv->recv_msg_slots == NULL)
   1133			continue;
   1134
   1135		for (entry = 0; entry < nentries; entry++) {
   1136			msg_slot = ch_uv->recv_msg_slots +
   1137			    entry * ch->entry_size;
   1138
   1139			msg_slot->hdr.msg_slot_number = entry;
   1140		}
   1141
   1142		spin_lock_irqsave(&ch->lock, irq_flags);
   1143		if (nentries < ch->remote_nentries)
   1144			ch->remote_nentries = nentries;
   1145		spin_unlock_irqrestore(&ch->lock, irq_flags);
   1146		return xpSuccess;
   1147	}
   1148
   1149	return xpNoMemory;
   1150}
   1151
   1152/*
   1153 * Allocate msg_slots associated with the channel.
   1154 */
   1155static enum xp_retval
   1156xpc_setup_msg_structures_uv(struct xpc_channel *ch)
   1157{
   1158	static enum xp_retval ret;
   1159	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
   1160
   1161	DBUG_ON(ch->flags & XPC_C_SETUP);
   1162
   1163	ch_uv->cached_notify_gru_mq_desc = kmalloc(sizeof(struct
   1164						   gru_message_queue_desc),
   1165						   GFP_KERNEL);
   1166	if (ch_uv->cached_notify_gru_mq_desc == NULL)
   1167		return xpNoMemory;
   1168
   1169	ret = xpc_allocate_send_msg_slot_uv(ch);
   1170	if (ret == xpSuccess) {
   1171
   1172		ret = xpc_allocate_recv_msg_slot_uv(ch);
   1173		if (ret != xpSuccess) {
   1174			kfree(ch_uv->send_msg_slots);
   1175			xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
   1176		}
   1177	}
   1178	return ret;
   1179}
   1180
   1181/*
   1182 * Free up msg_slots and clear other stuff that were setup for the specified
   1183 * channel.
   1184 */
   1185static void
   1186xpc_teardown_msg_structures_uv(struct xpc_channel *ch)
   1187{
   1188	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
   1189
   1190	lockdep_assert_held(&ch->lock);
   1191
   1192	kfree(ch_uv->cached_notify_gru_mq_desc);
   1193	ch_uv->cached_notify_gru_mq_desc = NULL;
   1194
   1195	if (ch->flags & XPC_C_SETUP) {
   1196		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
   1197		kfree(ch_uv->send_msg_slots);
   1198		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
   1199		kfree(ch_uv->recv_msg_slots);
   1200	}
   1201}
   1202
   1203static void
   1204xpc_send_chctl_closerequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
   1205{
   1206	struct xpc_activate_mq_msg_chctl_closerequest_uv msg;
   1207
   1208	msg.ch_number = ch->number;
   1209	msg.reason = ch->reason;
   1210	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
   1211				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV);
   1212}
   1213
   1214static void
   1215xpc_send_chctl_closereply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
   1216{
   1217	struct xpc_activate_mq_msg_chctl_closereply_uv msg;
   1218
   1219	msg.ch_number = ch->number;
   1220	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
   1221				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV);
   1222}
   1223
   1224static void
   1225xpc_send_chctl_openrequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
   1226{
   1227	struct xpc_activate_mq_msg_chctl_openrequest_uv msg;
   1228
   1229	msg.ch_number = ch->number;
   1230	msg.entry_size = ch->entry_size;
   1231	msg.local_nentries = ch->local_nentries;
   1232	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
   1233				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV);
   1234}
   1235
   1236static void
   1237xpc_send_chctl_openreply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
   1238{
   1239	struct xpc_activate_mq_msg_chctl_openreply_uv msg;
   1240
   1241	msg.ch_number = ch->number;
   1242	msg.local_nentries = ch->local_nentries;
   1243	msg.remote_nentries = ch->remote_nentries;
   1244	msg.notify_gru_mq_desc_gpa = uv_gpa(xpc_notify_mq_uv->gru_mq_desc);
   1245	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
   1246				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV);
   1247}
   1248
   1249static void
   1250xpc_send_chctl_opencomplete_uv(struct xpc_channel *ch, unsigned long *irq_flags)
   1251{
   1252	struct xpc_activate_mq_msg_chctl_opencomplete_uv msg;
   1253
   1254	msg.ch_number = ch->number;
   1255	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
   1256				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV);
   1257}
   1258
   1259static void
   1260xpc_send_chctl_local_msgrequest_uv(struct xpc_partition *part, int ch_number)
   1261{
   1262	unsigned long irq_flags;
   1263
   1264	spin_lock_irqsave(&part->chctl_lock, irq_flags);
   1265	part->chctl.flags[ch_number] |= XPC_CHCTL_MSGREQUEST;
   1266	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
   1267
   1268	xpc_wakeup_channel_mgr(part);
   1269}
   1270
   1271static enum xp_retval
   1272xpc_save_remote_msgqueue_pa_uv(struct xpc_channel *ch,
   1273			       unsigned long gru_mq_desc_gpa)
   1274{
   1275	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
   1276
   1277	DBUG_ON(ch_uv->cached_notify_gru_mq_desc == NULL);
   1278	return xpc_cache_remote_gru_mq_desc_uv(ch_uv->cached_notify_gru_mq_desc,
   1279					       gru_mq_desc_gpa);
   1280}
   1281
   1282static void
   1283xpc_indicate_partition_engaged_uv(struct xpc_partition *part)
   1284{
   1285	struct xpc_activate_mq_msg_uv msg;
   1286
   1287	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
   1288				      XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV);
   1289}
   1290
   1291static void
   1292xpc_indicate_partition_disengaged_uv(struct xpc_partition *part)
   1293{
   1294	struct xpc_activate_mq_msg_uv msg;
   1295
   1296	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
   1297				      XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV);
   1298}
   1299
   1300static void
   1301xpc_assume_partition_disengaged_uv(short partid)
   1302{
   1303	struct xpc_partition_uv *part_uv = &xpc_partitions[partid].sn.uv;
   1304	unsigned long irq_flags;
   1305
   1306	spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
   1307	part_uv->flags &= ~XPC_P_ENGAGED_UV;
   1308	spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
   1309}
   1310
   1311static int
   1312xpc_partition_engaged_uv(short partid)
   1313{
   1314	return (xpc_partitions[partid].sn.uv.flags & XPC_P_ENGAGED_UV) != 0;
   1315}
   1316
   1317static int
   1318xpc_any_partition_engaged_uv(void)
   1319{
   1320	struct xpc_partition_uv *part_uv;
   1321	short partid;
   1322
   1323	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
   1324		part_uv = &xpc_partitions[partid].sn.uv;
   1325		if ((part_uv->flags & XPC_P_ENGAGED_UV) != 0)
   1326			return 1;
   1327	}
   1328	return 0;
   1329}
   1330
   1331static enum xp_retval
   1332xpc_allocate_msg_slot_uv(struct xpc_channel *ch, u32 flags,
   1333			 struct xpc_send_msg_slot_uv **address_of_msg_slot)
   1334{
   1335	enum xp_retval ret;
   1336	struct xpc_send_msg_slot_uv *msg_slot;
   1337	struct xpc_fifo_entry_uv *entry;
   1338
   1339	while (1) {
   1340		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list);
   1341		if (entry != NULL)
   1342			break;
   1343
   1344		if (flags & XPC_NOWAIT)
   1345			return xpNoWait;
   1346
   1347		ret = xpc_allocate_msg_wait(ch);
   1348		if (ret != xpInterrupted && ret != xpTimeout)
   1349			return ret;
   1350	}
   1351
   1352	msg_slot = container_of(entry, struct xpc_send_msg_slot_uv, next);
   1353	*address_of_msg_slot = msg_slot;
   1354	return xpSuccess;
   1355}
   1356
   1357static void
   1358xpc_free_msg_slot_uv(struct xpc_channel *ch,
   1359		     struct xpc_send_msg_slot_uv *msg_slot)
   1360{
   1361	xpc_put_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list, &msg_slot->next);
   1362
   1363	/* wakeup anyone waiting for a free msg slot */
   1364	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
   1365		wake_up(&ch->msg_allocate_wq);
   1366}
   1367
   1368static void
   1369xpc_notify_sender_uv(struct xpc_channel *ch,
   1370		     struct xpc_send_msg_slot_uv *msg_slot,
   1371		     enum xp_retval reason)
   1372{
   1373	xpc_notify_func func = msg_slot->func;
   1374
   1375	if (func != NULL && cmpxchg(&msg_slot->func, func, NULL) == func) {
   1376
   1377		atomic_dec(&ch->n_to_notify);
   1378
   1379		dev_dbg(xpc_chan, "msg_slot->func() called, msg_slot=0x%p "
   1380			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
   1381			msg_slot->msg_slot_number, ch->partid, ch->number);
   1382
   1383		func(reason, ch->partid, ch->number, msg_slot->key);
   1384
   1385		dev_dbg(xpc_chan, "msg_slot->func() returned, msg_slot=0x%p "
   1386			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
   1387			msg_slot->msg_slot_number, ch->partid, ch->number);
   1388	}
   1389}
   1390
   1391static void
   1392xpc_handle_notify_mq_ack_uv(struct xpc_channel *ch,
   1393			    struct xpc_notify_mq_msg_uv *msg)
   1394{
   1395	struct xpc_send_msg_slot_uv *msg_slot;
   1396	int entry = msg->hdr.msg_slot_number % ch->local_nentries;
   1397
   1398	msg_slot = &ch->sn.uv.send_msg_slots[entry];
   1399
   1400	BUG_ON(msg_slot->msg_slot_number != msg->hdr.msg_slot_number);
   1401	msg_slot->msg_slot_number += ch->local_nentries;
   1402
   1403	if (msg_slot->func != NULL)
   1404		xpc_notify_sender_uv(ch, msg_slot, xpMsgDelivered);
   1405
   1406	xpc_free_msg_slot_uv(ch, msg_slot);
   1407}
   1408
   1409static void
   1410xpc_handle_notify_mq_msg_uv(struct xpc_partition *part,
   1411			    struct xpc_notify_mq_msg_uv *msg)
   1412{
   1413	struct xpc_partition_uv *part_uv = &part->sn.uv;
   1414	struct xpc_channel *ch;
   1415	struct xpc_channel_uv *ch_uv;
   1416	struct xpc_notify_mq_msg_uv *msg_slot;
   1417	unsigned long irq_flags;
   1418	int ch_number = msg->hdr.ch_number;
   1419
   1420	if (unlikely(ch_number >= part->nchannels)) {
   1421		dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received invalid "
   1422			"channel number=0x%x in message from partid=%d\n",
   1423			ch_number, XPC_PARTID(part));
   1424
   1425		/* get hb checker to deactivate from the remote partition */
   1426		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
   1427		if (part_uv->act_state_req == 0)
   1428			xpc_activate_IRQ_rcvd++;
   1429		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
   1430		part_uv->reason = xpBadChannelNumber;
   1431		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
   1432
   1433		wake_up_interruptible(&xpc_activate_IRQ_wq);
   1434		return;
   1435	}
   1436
   1437	ch = &part->channels[ch_number];
   1438	xpc_msgqueue_ref(ch);
   1439
   1440	if (!(ch->flags & XPC_C_CONNECTED)) {
   1441		xpc_msgqueue_deref(ch);
   1442		return;
   1443	}
   1444
   1445	/* see if we're really dealing with an ACK for a previously sent msg */
   1446	if (msg->hdr.size == 0) {
   1447		xpc_handle_notify_mq_ack_uv(ch, msg);
   1448		xpc_msgqueue_deref(ch);
   1449		return;
   1450	}
   1451
   1452	/* we're dealing with a normal message sent via the notify_mq */
   1453	ch_uv = &ch->sn.uv;
   1454
   1455	msg_slot = ch_uv->recv_msg_slots +
   1456	    (msg->hdr.msg_slot_number % ch->remote_nentries) * ch->entry_size;
   1457
   1458	BUG_ON(msg_slot->hdr.size != 0);
   1459
   1460	memcpy(msg_slot, msg, msg->hdr.size);
   1461
   1462	xpc_put_fifo_entry_uv(&ch_uv->recv_msg_list, &msg_slot->hdr.u.next);
   1463
   1464	if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
   1465		/*
   1466		 * If there is an existing idle kthread get it to deliver
   1467		 * the payload, otherwise we'll have to get the channel mgr
   1468		 * for this partition to create a kthread to do the delivery.
   1469		 */
   1470		if (atomic_read(&ch->kthreads_idle) > 0)
   1471			wake_up_nr(&ch->idle_wq, 1);
   1472		else
   1473			xpc_send_chctl_local_msgrequest_uv(part, ch->number);
   1474	}
   1475	xpc_msgqueue_deref(ch);
   1476}
   1477
   1478static irqreturn_t
   1479xpc_handle_notify_IRQ_uv(int irq, void *dev_id)
   1480{
   1481	struct xpc_notify_mq_msg_uv *msg;
   1482	short partid;
   1483	struct xpc_partition *part;
   1484
   1485	while ((msg = gru_get_next_message(xpc_notify_mq_uv->gru_mq_desc)) !=
   1486	       NULL) {
   1487
   1488		partid = msg->hdr.partid;
   1489		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
   1490			dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received "
   1491				"invalid partid=0x%x in message\n", partid);
   1492		} else {
   1493			part = &xpc_partitions[partid];
   1494
   1495			if (xpc_part_ref(part)) {
   1496				xpc_handle_notify_mq_msg_uv(part, msg);
   1497				xpc_part_deref(part);
   1498			}
   1499		}
   1500
   1501		gru_free_message(xpc_notify_mq_uv->gru_mq_desc, msg);
   1502	}
   1503
   1504	return IRQ_HANDLED;
   1505}
   1506
   1507static int
   1508xpc_n_of_deliverable_payloads_uv(struct xpc_channel *ch)
   1509{
   1510	return xpc_n_of_fifo_entries_uv(&ch->sn.uv.recv_msg_list);
   1511}
   1512
   1513static void
   1514xpc_process_msg_chctl_flags_uv(struct xpc_partition *part, int ch_number)
   1515{
   1516	struct xpc_channel *ch = &part->channels[ch_number];
   1517	int ndeliverable_payloads;
   1518
   1519	xpc_msgqueue_ref(ch);
   1520
   1521	ndeliverable_payloads = xpc_n_of_deliverable_payloads_uv(ch);
   1522
   1523	if (ndeliverable_payloads > 0 &&
   1524	    (ch->flags & XPC_C_CONNECTED) &&
   1525	    (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)) {
   1526
   1527		xpc_activate_kthreads(ch, ndeliverable_payloads);
   1528	}
   1529
   1530	xpc_msgqueue_deref(ch);
   1531}
   1532
   1533static enum xp_retval
   1534xpc_send_payload_uv(struct xpc_channel *ch, u32 flags, void *payload,
   1535		    u16 payload_size, u8 notify_type, xpc_notify_func func,
   1536		    void *key)
   1537{
   1538	enum xp_retval ret = xpSuccess;
   1539	struct xpc_send_msg_slot_uv *msg_slot = NULL;
   1540	struct xpc_notify_mq_msg_uv *msg;
   1541	u8 msg_buffer[XPC_NOTIFY_MSG_SIZE_UV];
   1542	size_t msg_size;
   1543
   1544	DBUG_ON(notify_type != XPC_N_CALL);
   1545
   1546	msg_size = sizeof(struct xpc_notify_mq_msghdr_uv) + payload_size;
   1547	if (msg_size > ch->entry_size)
   1548		return xpPayloadTooBig;
   1549
   1550	xpc_msgqueue_ref(ch);
   1551
   1552	if (ch->flags & XPC_C_DISCONNECTING) {
   1553		ret = ch->reason;
   1554		goto out_1;
   1555	}
   1556	if (!(ch->flags & XPC_C_CONNECTED)) {
   1557		ret = xpNotConnected;
   1558		goto out_1;
   1559	}
   1560
   1561	ret = xpc_allocate_msg_slot_uv(ch, flags, &msg_slot);
   1562	if (ret != xpSuccess)
   1563		goto out_1;
   1564
   1565	if (func != NULL) {
   1566		atomic_inc(&ch->n_to_notify);
   1567
   1568		msg_slot->key = key;
   1569		smp_wmb(); /* a non-NULL func must hit memory after the key */
   1570		msg_slot->func = func;
   1571
   1572		if (ch->flags & XPC_C_DISCONNECTING) {
   1573			ret = ch->reason;
   1574			goto out_2;
   1575		}
   1576	}
   1577
   1578	msg = (struct xpc_notify_mq_msg_uv *)&msg_buffer;
   1579	msg->hdr.partid = xp_partition_id;
   1580	msg->hdr.ch_number = ch->number;
   1581	msg->hdr.size = msg_size;
   1582	msg->hdr.msg_slot_number = msg_slot->msg_slot_number;
   1583	memcpy(&msg->payload, payload, payload_size);
   1584
   1585	ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
   1586			       msg_size);
   1587	if (ret == xpSuccess)
   1588		goto out_1;
   1589
   1590	XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
   1591out_2:
   1592	if (func != NULL) {
   1593		/*
   1594		 * Try to NULL the msg_slot's func field. If we fail, then
   1595		 * xpc_notify_senders_of_disconnect_uv() beat us to it, in which
   1596		 * case we need to pretend we succeeded to send the message
   1597		 * since the user will get a callout for the disconnect error
   1598		 * by xpc_notify_senders_of_disconnect_uv(), and to also get an
   1599		 * error returned here will confuse them. Additionally, since
   1600		 * in this case the channel is being disconnected we don't need
   1601		 * to put the the msg_slot back on the free list.
   1602		 */
   1603		if (cmpxchg(&msg_slot->func, func, NULL) != func) {
   1604			ret = xpSuccess;
   1605			goto out_1;
   1606		}
   1607
   1608		msg_slot->key = NULL;
   1609		atomic_dec(&ch->n_to_notify);
   1610	}
   1611	xpc_free_msg_slot_uv(ch, msg_slot);
   1612out_1:
   1613	xpc_msgqueue_deref(ch);
   1614	return ret;
   1615}
   1616
   1617/*
   1618 * Tell the callers of xpc_send_notify() that the status of their payloads
   1619 * is unknown because the channel is now disconnecting.
   1620 *
   1621 * We don't worry about putting these msg_slots on the free list since the
   1622 * msg_slots themselves are about to be kfree'd.
   1623 */
   1624static void
   1625xpc_notify_senders_of_disconnect_uv(struct xpc_channel *ch)
   1626{
   1627	struct xpc_send_msg_slot_uv *msg_slot;
   1628	int entry;
   1629
   1630	DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
   1631
   1632	for (entry = 0; entry < ch->local_nentries; entry++) {
   1633
   1634		if (atomic_read(&ch->n_to_notify) == 0)
   1635			break;
   1636
   1637		msg_slot = &ch->sn.uv.send_msg_slots[entry];
   1638		if (msg_slot->func != NULL)
   1639			xpc_notify_sender_uv(ch, msg_slot, ch->reason);
   1640	}
   1641}
   1642
   1643/*
   1644 * Get the next deliverable message's payload.
   1645 */
   1646static void *
   1647xpc_get_deliverable_payload_uv(struct xpc_channel *ch)
   1648{
   1649	struct xpc_fifo_entry_uv *entry;
   1650	struct xpc_notify_mq_msg_uv *msg;
   1651	void *payload = NULL;
   1652
   1653	if (!(ch->flags & XPC_C_DISCONNECTING)) {
   1654		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.recv_msg_list);
   1655		if (entry != NULL) {
   1656			msg = container_of(entry, struct xpc_notify_mq_msg_uv,
   1657					   hdr.u.next);
   1658			payload = &msg->payload;
   1659		}
   1660	}
   1661	return payload;
   1662}
   1663
   1664static void
   1665xpc_received_payload_uv(struct xpc_channel *ch, void *payload)
   1666{
   1667	struct xpc_notify_mq_msg_uv *msg;
   1668	enum xp_retval ret;
   1669
   1670	msg = container_of(payload, struct xpc_notify_mq_msg_uv, payload);
   1671
   1672	/* return an ACK to the sender of this message */
   1673
   1674	msg->hdr.partid = xp_partition_id;
   1675	msg->hdr.size = 0;	/* size of zero indicates this is an ACK */
   1676
   1677	ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
   1678			       sizeof(struct xpc_notify_mq_msghdr_uv));
   1679	if (ret != xpSuccess)
   1680		XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
   1681}
   1682
   1683static const struct xpc_arch_operations xpc_arch_ops_uv = {
   1684	.setup_partitions = xpc_setup_partitions_uv,
   1685	.teardown_partitions = xpc_teardown_partitions_uv,
   1686	.process_activate_IRQ_rcvd = xpc_process_activate_IRQ_rcvd_uv,
   1687	.get_partition_rsvd_page_pa = xpc_get_partition_rsvd_page_pa_uv,
   1688	.setup_rsvd_page = xpc_setup_rsvd_page_uv,
   1689
   1690	.allow_hb = xpc_allow_hb_uv,
   1691	.disallow_hb = xpc_disallow_hb_uv,
   1692	.disallow_all_hbs = xpc_disallow_all_hbs_uv,
   1693	.increment_heartbeat = xpc_increment_heartbeat_uv,
   1694	.offline_heartbeat = xpc_offline_heartbeat_uv,
   1695	.online_heartbeat = xpc_online_heartbeat_uv,
   1696	.heartbeat_init = xpc_heartbeat_init_uv,
   1697	.heartbeat_exit = xpc_heartbeat_exit_uv,
   1698	.get_remote_heartbeat = xpc_get_remote_heartbeat_uv,
   1699
   1700	.request_partition_activation =
   1701		xpc_request_partition_activation_uv,
   1702	.request_partition_reactivation =
   1703		xpc_request_partition_reactivation_uv,
   1704	.request_partition_deactivation =
   1705		xpc_request_partition_deactivation_uv,
   1706	.cancel_partition_deactivation_request =
   1707		xpc_cancel_partition_deactivation_request_uv,
   1708
   1709	.setup_ch_structures = xpc_setup_ch_structures_uv,
   1710	.teardown_ch_structures = xpc_teardown_ch_structures_uv,
   1711
   1712	.make_first_contact = xpc_make_first_contact_uv,
   1713
   1714	.get_chctl_all_flags = xpc_get_chctl_all_flags_uv,
   1715	.send_chctl_closerequest = xpc_send_chctl_closerequest_uv,
   1716	.send_chctl_closereply = xpc_send_chctl_closereply_uv,
   1717	.send_chctl_openrequest = xpc_send_chctl_openrequest_uv,
   1718	.send_chctl_openreply = xpc_send_chctl_openreply_uv,
   1719	.send_chctl_opencomplete = xpc_send_chctl_opencomplete_uv,
   1720	.process_msg_chctl_flags = xpc_process_msg_chctl_flags_uv,
   1721
   1722	.save_remote_msgqueue_pa = xpc_save_remote_msgqueue_pa_uv,
   1723
   1724	.setup_msg_structures = xpc_setup_msg_structures_uv,
   1725	.teardown_msg_structures = xpc_teardown_msg_structures_uv,
   1726
   1727	.indicate_partition_engaged = xpc_indicate_partition_engaged_uv,
   1728	.indicate_partition_disengaged = xpc_indicate_partition_disengaged_uv,
   1729	.assume_partition_disengaged = xpc_assume_partition_disengaged_uv,
   1730	.partition_engaged = xpc_partition_engaged_uv,
   1731	.any_partition_engaged = xpc_any_partition_engaged_uv,
   1732
   1733	.n_of_deliverable_payloads = xpc_n_of_deliverable_payloads_uv,
   1734	.send_payload = xpc_send_payload_uv,
   1735	.get_deliverable_payload = xpc_get_deliverable_payload_uv,
   1736	.received_payload = xpc_received_payload_uv,
   1737	.notify_senders_of_disconnect = xpc_notify_senders_of_disconnect_uv,
   1738};
   1739
   1740static int
   1741xpc_init_mq_node(int nid)
   1742{
   1743	int cpu;
   1744
   1745	cpus_read_lock();
   1746
   1747	for_each_cpu(cpu, cpumask_of_node(nid)) {
   1748		xpc_activate_mq_uv =
   1749			xpc_create_gru_mq_uv(XPC_ACTIVATE_MQ_SIZE_UV, nid,
   1750					     XPC_ACTIVATE_IRQ_NAME,
   1751					     xpc_handle_activate_IRQ_uv);
   1752		if (!IS_ERR(xpc_activate_mq_uv))
   1753			break;
   1754	}
   1755	if (IS_ERR(xpc_activate_mq_uv)) {
   1756		cpus_read_unlock();
   1757		return PTR_ERR(xpc_activate_mq_uv);
   1758	}
   1759
   1760	for_each_cpu(cpu, cpumask_of_node(nid)) {
   1761		xpc_notify_mq_uv =
   1762			xpc_create_gru_mq_uv(XPC_NOTIFY_MQ_SIZE_UV, nid,
   1763					     XPC_NOTIFY_IRQ_NAME,
   1764					     xpc_handle_notify_IRQ_uv);
   1765		if (!IS_ERR(xpc_notify_mq_uv))
   1766			break;
   1767	}
   1768	if (IS_ERR(xpc_notify_mq_uv)) {
   1769		xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
   1770		cpus_read_unlock();
   1771		return PTR_ERR(xpc_notify_mq_uv);
   1772	}
   1773
   1774	cpus_read_unlock();
   1775	return 0;
   1776}
   1777
   1778int
   1779xpc_init_uv(void)
   1780{
   1781	int nid;
   1782	int ret = 0;
   1783
   1784	xpc_arch_ops = xpc_arch_ops_uv;
   1785
   1786	if (sizeof(struct xpc_notify_mq_msghdr_uv) > XPC_MSG_HDR_MAX_SIZE) {
   1787		dev_err(xpc_part, "xpc_notify_mq_msghdr_uv is larger than %d\n",
   1788			XPC_MSG_HDR_MAX_SIZE);
   1789		return -E2BIG;
   1790	}
   1791
   1792	if (xpc_mq_node < 0)
   1793		for_each_online_node(nid) {
   1794			ret = xpc_init_mq_node(nid);
   1795
   1796			if (!ret)
   1797				break;
   1798		}
   1799	else
   1800		ret = xpc_init_mq_node(xpc_mq_node);
   1801
   1802	if (ret < 0)
   1803		dev_err(xpc_part, "xpc_init_mq_node() returned error=%d\n",
   1804			-ret);
   1805
   1806	return ret;
   1807}
   1808
   1809void
   1810xpc_exit_uv(void)
   1811{
   1812	xpc_destroy_gru_mq_uv(xpc_notify_mq_uv);
   1813	xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
   1814}
   1815
   1816module_param(xpc_mq_node, int, 0);
   1817MODULE_PARM_DESC(xpc_mq_node, "Node number on which to allocate message queues.");