cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

heartbeat.c (68592B)


      1// SPDX-License-Identifier: GPL-2.0-or-later
      2/*
      3 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
      4 */
      5
      6#include <linux/kernel.h>
      7#include <linux/sched.h>
      8#include <linux/jiffies.h>
      9#include <linux/module.h>
     10#include <linux/fs.h>
     11#include <linux/bio.h>
     12#include <linux/blkdev.h>
     13#include <linux/delay.h>
     14#include <linux/file.h>
     15#include <linux/kthread.h>
     16#include <linux/configfs.h>
     17#include <linux/random.h>
     18#include <linux/crc32.h>
     19#include <linux/time.h>
     20#include <linux/debugfs.h>
     21#include <linux/slab.h>
     22#include <linux/bitmap.h>
     23#include <linux/ktime.h>
     24#include "heartbeat.h"
     25#include "tcp.h"
     26#include "nodemanager.h"
     27#include "quorum.h"
     28
     29#include "masklog.h"
     30
     31
     32/*
     33 * The first heartbeat pass had one global thread that would serialize all hb
     34 * callback calls.  This global serializing sem should only be removed once
     35 * we've made sure that all callees can deal with being called concurrently
     36 * from multiple hb region threads.
     37 */
     38static DECLARE_RWSEM(o2hb_callback_sem);
     39
     40/*
     41 * multiple hb threads are watching multiple regions.  A node is live
     42 * whenever any of the threads sees activity from the node in its region.
     43 */
     44static DEFINE_SPINLOCK(o2hb_live_lock);
     45static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
     46static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
     47static LIST_HEAD(o2hb_node_events);
     48static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
     49
     50/*
     51 * In global heartbeat, we maintain a series of region bitmaps.
     52 * 	- o2hb_region_bitmap allows us to limit the region number to max region.
     53 * 	- o2hb_live_region_bitmap tracks live regions (seen steady iterations).
     54 * 	- o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
     55 * 		heartbeat on it.
     56 * 	- o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
     57 */
     58static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
     59static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
     60static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
     61static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
     62
     63#define O2HB_DB_TYPE_LIVENODES		0
     64#define O2HB_DB_TYPE_LIVEREGIONS	1
     65#define O2HB_DB_TYPE_QUORUMREGIONS	2
     66#define O2HB_DB_TYPE_FAILEDREGIONS	3
     67#define O2HB_DB_TYPE_REGION_LIVENODES	4
     68#define O2HB_DB_TYPE_REGION_NUMBER	5
     69#define O2HB_DB_TYPE_REGION_ELAPSED_TIME	6
     70#define O2HB_DB_TYPE_REGION_PINNED	7
     71struct o2hb_debug_buf {
     72	int db_type;
     73	int db_size;
     74	int db_len;
     75	void *db_data;
     76};
     77
     78static struct o2hb_debug_buf *o2hb_db_livenodes;
     79static struct o2hb_debug_buf *o2hb_db_liveregions;
     80static struct o2hb_debug_buf *o2hb_db_quorumregions;
     81static struct o2hb_debug_buf *o2hb_db_failedregions;
     82
     83#define O2HB_DEBUG_DIR			"o2hb"
     84#define O2HB_DEBUG_LIVENODES		"livenodes"
     85#define O2HB_DEBUG_LIVEREGIONS		"live_regions"
     86#define O2HB_DEBUG_QUORUMREGIONS	"quorum_regions"
     87#define O2HB_DEBUG_FAILEDREGIONS	"failed_regions"
     88#define O2HB_DEBUG_REGION_NUMBER	"num"
     89#define O2HB_DEBUG_REGION_ELAPSED_TIME	"elapsed_time_in_ms"
     90#define O2HB_DEBUG_REGION_PINNED	"pinned"
     91
     92static struct dentry *o2hb_debug_dir;
     93
     94static LIST_HEAD(o2hb_all_regions);
     95
     96static struct o2hb_callback {
     97	struct list_head list;
     98} o2hb_callbacks[O2HB_NUM_CB];
     99
    100static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
    101
    102enum o2hb_heartbeat_modes {
    103	O2HB_HEARTBEAT_LOCAL		= 0,
    104	O2HB_HEARTBEAT_GLOBAL,
    105	O2HB_HEARTBEAT_NUM_MODES,
    106};
    107
    108static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
    109	"local",	/* O2HB_HEARTBEAT_LOCAL */
    110	"global",	/* O2HB_HEARTBEAT_GLOBAL */
    111};
    112
    113unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
    114static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
    115
    116/*
    117 * o2hb_dependent_users tracks the number of registered callbacks that depend
    118 * on heartbeat. o2net and o2dlm are two entities that register this callback.
    119 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
    120 * to stop while a dlm domain is still active.
    121 */
    122static unsigned int o2hb_dependent_users;
    123
    124/*
    125 * In global heartbeat mode, all regions are pinned if there are one or more
    126 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
    127 * regions are unpinned if the region count exceeds the cut off or the number
    128 * of dependent users falls to zero.
    129 */
    130#define O2HB_PIN_CUT_OFF		3
    131
    132/*
    133 * In local heartbeat mode, we assume the dlm domain name to be the same as
    134 * region uuid. This is true for domains created for the file system but not
    135 * necessarily true for userdlm domains. This is a known limitation.
    136 *
    137 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
    138 * works for both file system and userdlm domains.
    139 */
    140static int o2hb_region_pin(const char *region_uuid);
    141static void o2hb_region_unpin(const char *region_uuid);
    142
    143/* Only sets a new threshold if there are no active regions.
    144 *
    145 * No locking or otherwise interesting code is required for reading
    146 * o2hb_dead_threshold as it can't change once regions are active and
    147 * it's not interesting to anyone until then anyway. */
    148static void o2hb_dead_threshold_set(unsigned int threshold)
    149{
    150	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
    151		spin_lock(&o2hb_live_lock);
    152		if (list_empty(&o2hb_all_regions))
    153			o2hb_dead_threshold = threshold;
    154		spin_unlock(&o2hb_live_lock);
    155	}
    156}
    157
    158static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
    159{
    160	int ret = -1;
    161
    162	if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
    163		spin_lock(&o2hb_live_lock);
    164		if (list_empty(&o2hb_all_regions)) {
    165			o2hb_heartbeat_mode = hb_mode;
    166			ret = 0;
    167		}
    168		spin_unlock(&o2hb_live_lock);
    169	}
    170
    171	return ret;
    172}
    173
    174struct o2hb_node_event {
    175	struct list_head        hn_item;
    176	enum o2hb_callback_type hn_event_type;
    177	struct o2nm_node        *hn_node;
    178	int                     hn_node_num;
    179};
    180
    181struct o2hb_disk_slot {
    182	struct o2hb_disk_heartbeat_block *ds_raw_block;
    183	u8			ds_node_num;
    184	u64			ds_last_time;
    185	u64			ds_last_generation;
    186	u16			ds_equal_samples;
    187	u16			ds_changed_samples;
    188	struct list_head	ds_live_item;
    189};
    190
    191/* each thread owns a region.. when we're asked to tear down the region
    192 * we ask the thread to stop, who cleans up the region */
    193struct o2hb_region {
    194	struct config_item	hr_item;
    195
    196	struct list_head	hr_all_item;
    197	unsigned		hr_unclean_stop:1,
    198				hr_aborted_start:1,
    199				hr_item_pinned:1,
    200				hr_item_dropped:1,
    201				hr_node_deleted:1;
    202
    203	/* protected by the hr_callback_sem */
    204	struct task_struct 	*hr_task;
    205
    206	unsigned int		hr_blocks;
    207	unsigned long long	hr_start_block;
    208
    209	unsigned int		hr_block_bits;
    210	unsigned int		hr_block_bytes;
    211
    212	unsigned int		hr_slots_per_page;
    213	unsigned int		hr_num_pages;
    214
    215	struct page             **hr_slot_data;
    216	struct block_device	*hr_bdev;
    217	struct o2hb_disk_slot	*hr_slots;
    218
    219	/* live node map of this region */
    220	unsigned long		hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
    221	unsigned int		hr_region_num;
    222
    223	struct dentry		*hr_debug_dir;
    224	struct o2hb_debug_buf	*hr_db_livenodes;
    225	struct o2hb_debug_buf	*hr_db_regnum;
    226	struct o2hb_debug_buf	*hr_db_elapsed_time;
    227	struct o2hb_debug_buf	*hr_db_pinned;
    228
    229	/* let the person setting up hb wait for it to return until it
    230	 * has reached a 'steady' state.  This will be fixed when we have
    231	 * a more complete api that doesn't lead to this sort of fragility. */
    232	atomic_t		hr_steady_iterations;
    233
    234	/* terminate o2hb thread if it does not reach steady state
    235	 * (hr_steady_iterations == 0) within hr_unsteady_iterations */
    236	atomic_t		hr_unsteady_iterations;
    237
    238	char			hr_dev_name[BDEVNAME_SIZE];
    239
    240	unsigned int		hr_timeout_ms;
    241
    242	/* randomized as the region goes up and down so that a node
    243	 * recognizes a node going up and down in one iteration */
    244	u64			hr_generation;
    245
    246	struct delayed_work	hr_write_timeout_work;
    247	unsigned long		hr_last_timeout_start;
    248
    249	/* negotiate timer, used to negotiate extending hb timeout. */
    250	struct delayed_work	hr_nego_timeout_work;
    251	unsigned long		hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
    252
    253	/* Used during o2hb_check_slot to hold a copy of the block
    254	 * being checked because we temporarily have to zero out the
    255	 * crc field. */
    256	struct o2hb_disk_heartbeat_block *hr_tmp_block;
    257
    258	/* Message key for negotiate timeout message. */
    259	unsigned int		hr_key;
    260	struct list_head	hr_handler_list;
    261
    262	/* last hb status, 0 for success, other value for error. */
    263	int			hr_last_hb_status;
    264};
    265
    266struct o2hb_bio_wait_ctxt {
    267	atomic_t          wc_num_reqs;
    268	struct completion wc_io_complete;
    269	int               wc_error;
    270};
    271
    272#define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
    273
    274enum {
    275	O2HB_NEGO_TIMEOUT_MSG = 1,
    276	O2HB_NEGO_APPROVE_MSG = 2,
    277};
    278
    279struct o2hb_nego_msg {
    280	u8 node_num;
    281};
    282
    283static void o2hb_write_timeout(struct work_struct *work)
    284{
    285	int failed, quorum;
    286	struct o2hb_region *reg =
    287		container_of(work, struct o2hb_region,
    288			     hr_write_timeout_work.work);
    289
    290	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
    291	     "milliseconds\n", reg->hr_dev_name,
    292	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
    293
    294	if (o2hb_global_heartbeat_active()) {
    295		spin_lock(&o2hb_live_lock);
    296		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
    297			set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
    298		failed = bitmap_weight(o2hb_failed_region_bitmap,
    299					O2NM_MAX_REGIONS);
    300		quorum = bitmap_weight(o2hb_quorum_region_bitmap,
    301					O2NM_MAX_REGIONS);
    302		spin_unlock(&o2hb_live_lock);
    303
    304		mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
    305		     quorum, failed);
    306
    307		/*
    308		 * Fence if the number of failed regions >= half the number
    309		 * of  quorum regions
    310		 */
    311		if ((failed << 1) < quorum)
    312			return;
    313	}
    314
    315	o2quo_disk_timeout();
    316}
    317
    318static void o2hb_arm_timeout(struct o2hb_region *reg)
    319{
    320	/* Arm writeout only after thread reaches steady state */
    321	if (atomic_read(&reg->hr_steady_iterations) != 0)
    322		return;
    323
    324	mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
    325	     O2HB_MAX_WRITE_TIMEOUT_MS);
    326
    327	if (o2hb_global_heartbeat_active()) {
    328		spin_lock(&o2hb_live_lock);
    329		clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
    330		spin_unlock(&o2hb_live_lock);
    331	}
    332	cancel_delayed_work(&reg->hr_write_timeout_work);
    333	schedule_delayed_work(&reg->hr_write_timeout_work,
    334			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
    335
    336	cancel_delayed_work(&reg->hr_nego_timeout_work);
    337	/* negotiate timeout must be less than write timeout. */
    338	schedule_delayed_work(&reg->hr_nego_timeout_work,
    339			      msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
    340	memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
    341}
    342
    343static void o2hb_disarm_timeout(struct o2hb_region *reg)
    344{
    345	cancel_delayed_work_sync(&reg->hr_write_timeout_work);
    346	cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
    347}
    348
    349static int o2hb_send_nego_msg(int key, int type, u8 target)
    350{
    351	struct o2hb_nego_msg msg;
    352	int status, ret;
    353
    354	msg.node_num = o2nm_this_node();
    355again:
    356	ret = o2net_send_message(type, key, &msg, sizeof(msg),
    357			target, &status);
    358
    359	if (ret == -EAGAIN || ret == -ENOMEM) {
    360		msleep(100);
    361		goto again;
    362	}
    363
    364	return ret;
    365}
    366
    367static void o2hb_nego_timeout(struct work_struct *work)
    368{
    369	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
    370	int master_node, i, ret;
    371	struct o2hb_region *reg;
    372
    373	reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
    374	/* don't negotiate timeout if last hb failed since it is very
    375	 * possible io failed. Should let write timeout fence self.
    376	 */
    377	if (reg->hr_last_hb_status)
    378		return;
    379
    380	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
    381	/* lowest node as master node to make negotiate decision. */
    382	master_node = find_first_bit(live_node_bitmap, O2NM_MAX_NODES);
    383
    384	if (master_node == o2nm_this_node()) {
    385		if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
    386			printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
    387				o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
    388				config_item_name(&reg->hr_item), reg->hr_dev_name);
    389			set_bit(master_node, reg->hr_nego_node_bitmap);
    390		}
    391		if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
    392				sizeof(reg->hr_nego_node_bitmap))) {
    393			/* check negotiate bitmap every second to do timeout
    394			 * approve decision.
    395			 */
    396			schedule_delayed_work(&reg->hr_nego_timeout_work,
    397				msecs_to_jiffies(1000));
    398
    399			return;
    400		}
    401
    402		printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
    403			config_item_name(&reg->hr_item), reg->hr_dev_name);
    404		/* approve negotiate timeout request. */
    405		o2hb_arm_timeout(reg);
    406
    407		i = -1;
    408		while ((i = find_next_bit(live_node_bitmap,
    409				O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
    410			if (i == master_node)
    411				continue;
    412
    413			mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
    414			ret = o2hb_send_nego_msg(reg->hr_key,
    415					O2HB_NEGO_APPROVE_MSG, i);
    416			if (ret)
    417				mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
    418					i, ret);
    419		}
    420	} else {
    421		/* negotiate timeout with master node. */
    422		printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
    423			o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
    424			reg->hr_dev_name, master_node);
    425		ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
    426				master_node);
    427		if (ret)
    428			mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
    429				master_node, ret);
    430	}
    431}
    432
    433static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
    434				void **ret_data)
    435{
    436	struct o2hb_region *reg = data;
    437	struct o2hb_nego_msg *nego_msg;
    438
    439	nego_msg = (struct o2hb_nego_msg *)msg->buf;
    440	printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
    441		nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
    442	if (nego_msg->node_num < O2NM_MAX_NODES)
    443		set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
    444	else
    445		mlog(ML_ERROR, "got nego timeout message from bad node.\n");
    446
    447	return 0;
    448}
    449
    450static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
    451				void **ret_data)
    452{
    453	struct o2hb_region *reg = data;
    454
    455	printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
    456		config_item_name(&reg->hr_item), reg->hr_dev_name);
    457	o2hb_arm_timeout(reg);
    458	return 0;
    459}
    460
    461static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
    462{
    463	atomic_set(&wc->wc_num_reqs, 1);
    464	init_completion(&wc->wc_io_complete);
    465	wc->wc_error = 0;
    466}
    467
    468/* Used in error paths too */
    469static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
    470				     unsigned int num)
    471{
    472	/* sadly atomic_sub_and_test() isn't available on all platforms.  The
    473	 * good news is that the fast path only completes one at a time */
    474	while(num--) {
    475		if (atomic_dec_and_test(&wc->wc_num_reqs)) {
    476			BUG_ON(num > 0);
    477			complete(&wc->wc_io_complete);
    478		}
    479	}
    480}
    481
    482static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc)
    483{
    484	o2hb_bio_wait_dec(wc, 1);
    485	wait_for_completion(&wc->wc_io_complete);
    486}
    487
    488static void o2hb_bio_end_io(struct bio *bio)
    489{
    490	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
    491
    492	if (bio->bi_status) {
    493		mlog(ML_ERROR, "IO Error %d\n", bio->bi_status);
    494		wc->wc_error = blk_status_to_errno(bio->bi_status);
    495	}
    496
    497	o2hb_bio_wait_dec(wc, 1);
    498	bio_put(bio);
    499}
    500
    501/* Setup a Bio to cover I/O against num_slots slots starting at
    502 * start_slot. */
    503static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
    504				      struct o2hb_bio_wait_ctxt *wc,
    505				      unsigned int *current_slot,
    506				      unsigned int max_slots, int op,
    507				      int op_flags)
    508{
    509	int len, current_page;
    510	unsigned int vec_len, vec_start;
    511	unsigned int bits = reg->hr_block_bits;
    512	unsigned int spp = reg->hr_slots_per_page;
    513	unsigned int cs = *current_slot;
    514	struct bio *bio;
    515	struct page *page;
    516
    517	/* Testing has shown this allocation to take long enough under
    518	 * GFP_KERNEL that the local node can get fenced. It would be
    519	 * nicest if we could pre-allocate these bios and avoid this
    520	 * all together. */
    521	bio = bio_alloc(reg->hr_bdev, 16, op | op_flags, GFP_ATOMIC);
    522	if (!bio) {
    523		mlog(ML_ERROR, "Could not alloc slots BIO!\n");
    524		bio = ERR_PTR(-ENOMEM);
    525		goto bail;
    526	}
    527
    528	/* Must put everything in 512 byte sectors for the bio... */
    529	bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
    530	bio->bi_private = wc;
    531	bio->bi_end_io = o2hb_bio_end_io;
    532
    533	vec_start = (cs << bits) % PAGE_SIZE;
    534	while(cs < max_slots) {
    535		current_page = cs / spp;
    536		page = reg->hr_slot_data[current_page];
    537
    538		vec_len = min(PAGE_SIZE - vec_start,
    539			      (max_slots-cs) * (PAGE_SIZE/spp) );
    540
    541		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
    542		     current_page, vec_len, vec_start);
    543
    544		len = bio_add_page(bio, page, vec_len, vec_start);
    545		if (len != vec_len) break;
    546
    547		cs += vec_len / (PAGE_SIZE/spp);
    548		vec_start = 0;
    549	}
    550
    551bail:
    552	*current_slot = cs;
    553	return bio;
    554}
    555
    556static int o2hb_read_slots(struct o2hb_region *reg,
    557			   unsigned int begin_slot,
    558			   unsigned int max_slots)
    559{
    560	unsigned int current_slot = begin_slot;
    561	int status;
    562	struct o2hb_bio_wait_ctxt wc;
    563	struct bio *bio;
    564
    565	o2hb_bio_wait_init(&wc);
    566
    567	while(current_slot < max_slots) {
    568		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
    569					 REQ_OP_READ, 0);
    570		if (IS_ERR(bio)) {
    571			status = PTR_ERR(bio);
    572			mlog_errno(status);
    573			goto bail_and_wait;
    574		}
    575
    576		atomic_inc(&wc.wc_num_reqs);
    577		submit_bio(bio);
    578	}
    579
    580	status = 0;
    581
    582bail_and_wait:
    583	o2hb_wait_on_io(&wc);
    584	if (wc.wc_error && !status)
    585		status = wc.wc_error;
    586
    587	return status;
    588}
    589
    590static int o2hb_issue_node_write(struct o2hb_region *reg,
    591				 struct o2hb_bio_wait_ctxt *write_wc)
    592{
    593	int status;
    594	unsigned int slot;
    595	struct bio *bio;
    596
    597	o2hb_bio_wait_init(write_wc);
    598
    599	slot = o2nm_this_node();
    600
    601	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
    602				 REQ_SYNC);
    603	if (IS_ERR(bio)) {
    604		status = PTR_ERR(bio);
    605		mlog_errno(status);
    606		goto bail;
    607	}
    608
    609	atomic_inc(&write_wc->wc_num_reqs);
    610	submit_bio(bio);
    611
    612	status = 0;
    613bail:
    614	return status;
    615}
    616
    617static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
    618				     struct o2hb_disk_heartbeat_block *hb_block)
    619{
    620	__le32 old_cksum;
    621	u32 ret;
    622
    623	/* We want to compute the block crc with a 0 value in the
    624	 * hb_cksum field. Save it off here and replace after the
    625	 * crc. */
    626	old_cksum = hb_block->hb_cksum;
    627	hb_block->hb_cksum = 0;
    628
    629	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
    630
    631	hb_block->hb_cksum = old_cksum;
    632
    633	return ret;
    634}
    635
    636static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
    637{
    638	mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
    639	     "cksum = 0x%x, generation 0x%llx\n",
    640	     (long long)le64_to_cpu(hb_block->hb_seq),
    641	     hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
    642	     (long long)le64_to_cpu(hb_block->hb_generation));
    643}
    644
    645static int o2hb_verify_crc(struct o2hb_region *reg,
    646			   struct o2hb_disk_heartbeat_block *hb_block)
    647{
    648	u32 read, computed;
    649
    650	read = le32_to_cpu(hb_block->hb_cksum);
    651	computed = o2hb_compute_block_crc_le(reg, hb_block);
    652
    653	return read == computed;
    654}
    655
    656/*
    657 * Compare the slot data with what we wrote in the last iteration.
    658 * If the match fails, print an appropriate error message. This is to
    659 * detect errors like... another node hearting on the same slot,
    660 * flaky device that is losing writes, etc.
    661 * Returns 1 if check succeeds, 0 otherwise.
    662 */
    663static int o2hb_check_own_slot(struct o2hb_region *reg)
    664{
    665	struct o2hb_disk_slot *slot;
    666	struct o2hb_disk_heartbeat_block *hb_block;
    667	char *errstr;
    668
    669	slot = &reg->hr_slots[o2nm_this_node()];
    670	/* Don't check on our 1st timestamp */
    671	if (!slot->ds_last_time)
    672		return 0;
    673
    674	hb_block = slot->ds_raw_block;
    675	if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
    676	    le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
    677	    hb_block->hb_node == slot->ds_node_num)
    678		return 1;
    679
    680#define ERRSTR1		"Another node is heartbeating on device"
    681#define ERRSTR2		"Heartbeat generation mismatch on device"
    682#define ERRSTR3		"Heartbeat sequence mismatch on device"
    683
    684	if (hb_block->hb_node != slot->ds_node_num)
    685		errstr = ERRSTR1;
    686	else if (le64_to_cpu(hb_block->hb_generation) !=
    687		 slot->ds_last_generation)
    688		errstr = ERRSTR2;
    689	else
    690		errstr = ERRSTR3;
    691
    692	mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
    693	     "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
    694	     slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
    695	     (unsigned long long)slot->ds_last_time, hb_block->hb_node,
    696	     (unsigned long long)le64_to_cpu(hb_block->hb_generation),
    697	     (unsigned long long)le64_to_cpu(hb_block->hb_seq));
    698
    699	return 0;
    700}
    701
    702static inline void o2hb_prepare_block(struct o2hb_region *reg,
    703				      u64 generation)
    704{
    705	int node_num;
    706	u64 cputime;
    707	struct o2hb_disk_slot *slot;
    708	struct o2hb_disk_heartbeat_block *hb_block;
    709
    710	node_num = o2nm_this_node();
    711	slot = &reg->hr_slots[node_num];
    712
    713	hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
    714	memset(hb_block, 0, reg->hr_block_bytes);
    715	/* TODO: time stuff */
    716	cputime = ktime_get_real_seconds();
    717	if (!cputime)
    718		cputime = 1;
    719
    720	hb_block->hb_seq = cpu_to_le64(cputime);
    721	hb_block->hb_node = node_num;
    722	hb_block->hb_generation = cpu_to_le64(generation);
    723	hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
    724
    725	/* This step must always happen last! */
    726	hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
    727								   hb_block));
    728
    729	mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
    730	     (long long)generation,
    731	     le32_to_cpu(hb_block->hb_cksum));
    732}
    733
    734static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
    735				struct o2nm_node *node,
    736				int idx)
    737{
    738	struct o2hb_callback_func *f;
    739
    740	list_for_each_entry(f, &hbcall->list, hc_item) {
    741		mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
    742		(f->hc_func)(node, idx, f->hc_data);
    743	}
    744}
    745
    746/* Will run the list in order until we process the passed event */
    747static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
    748{
    749	struct o2hb_callback *hbcall;
    750	struct o2hb_node_event *event;
    751
    752	/* Holding callback sem assures we don't alter the callback
    753	 * lists when doing this, and serializes ourselves with other
    754	 * processes wanting callbacks. */
    755	down_write(&o2hb_callback_sem);
    756
    757	spin_lock(&o2hb_live_lock);
    758	while (!list_empty(&o2hb_node_events)
    759	       && !list_empty(&queued_event->hn_item)) {
    760		event = list_entry(o2hb_node_events.next,
    761				   struct o2hb_node_event,
    762				   hn_item);
    763		list_del_init(&event->hn_item);
    764		spin_unlock(&o2hb_live_lock);
    765
    766		mlog(ML_HEARTBEAT, "Node %s event for %d\n",
    767		     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
    768		     event->hn_node_num);
    769
    770		hbcall = hbcall_from_type(event->hn_event_type);
    771
    772		/* We should *never* have gotten on to the list with a
    773		 * bad type... This isn't something that we should try
    774		 * to recover from. */
    775		BUG_ON(IS_ERR(hbcall));
    776
    777		o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
    778
    779		spin_lock(&o2hb_live_lock);
    780	}
    781	spin_unlock(&o2hb_live_lock);
    782
    783	up_write(&o2hb_callback_sem);
    784}
    785
    786static void o2hb_queue_node_event(struct o2hb_node_event *event,
    787				  enum o2hb_callback_type type,
    788				  struct o2nm_node *node,
    789				  int node_num)
    790{
    791	assert_spin_locked(&o2hb_live_lock);
    792
    793	BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
    794
    795	event->hn_event_type = type;
    796	event->hn_node = node;
    797	event->hn_node_num = node_num;
    798
    799	mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
    800	     type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
    801
    802	list_add_tail(&event->hn_item, &o2hb_node_events);
    803}
    804
    805static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
    806{
    807	struct o2hb_node_event event =
    808		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
    809	struct o2nm_node *node;
    810	int queued = 0;
    811
    812	node = o2nm_get_node_by_num(slot->ds_node_num);
    813	if (!node)
    814		return;
    815
    816	spin_lock(&o2hb_live_lock);
    817	if (!list_empty(&slot->ds_live_item)) {
    818		mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
    819		     slot->ds_node_num);
    820
    821		list_del_init(&slot->ds_live_item);
    822
    823		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
    824			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
    825
    826			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
    827					      slot->ds_node_num);
    828			queued = 1;
    829		}
    830	}
    831	spin_unlock(&o2hb_live_lock);
    832
    833	if (queued)
    834		o2hb_run_event_list(&event);
    835
    836	o2nm_node_put(node);
    837}
    838
    839static void o2hb_set_quorum_device(struct o2hb_region *reg)
    840{
    841	if (!o2hb_global_heartbeat_active())
    842		return;
    843
    844	/* Prevent race with o2hb_heartbeat_group_drop_item() */
    845	if (kthread_should_stop())
    846		return;
    847
    848	/* Tag region as quorum only after thread reaches steady state */
    849	if (atomic_read(&reg->hr_steady_iterations) != 0)
    850		return;
    851
    852	spin_lock(&o2hb_live_lock);
    853
    854	if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
    855		goto unlock;
    856
    857	/*
    858	 * A region can be added to the quorum only when it sees all
    859	 * live nodes heartbeat on it. In other words, the region has been
    860	 * added to all nodes.
    861	 */
    862	if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
    863		   sizeof(o2hb_live_node_bitmap)))
    864		goto unlock;
    865
    866	printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
    867	       config_item_name(&reg->hr_item), reg->hr_dev_name);
    868
    869	set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
    870
    871	/*
    872	 * If global heartbeat active, unpin all regions if the
    873	 * region count > CUT_OFF
    874	 */
    875	if (bitmap_weight(o2hb_quorum_region_bitmap,
    876			   O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
    877		o2hb_region_unpin(NULL);
    878unlock:
    879	spin_unlock(&o2hb_live_lock);
    880}
    881
    882static int o2hb_check_slot(struct o2hb_region *reg,
    883			   struct o2hb_disk_slot *slot)
    884{
    885	int changed = 0, gen_changed = 0;
    886	struct o2hb_node_event event =
    887		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
    888	struct o2nm_node *node;
    889	struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
    890	u64 cputime;
    891	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
    892	unsigned int slot_dead_ms;
    893	int tmp;
    894	int queued = 0;
    895
    896	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
    897
    898	/*
    899	 * If a node is no longer configured but is still in the livemap, we
    900	 * may need to clear that bit from the livemap.
    901	 */
    902	node = o2nm_get_node_by_num(slot->ds_node_num);
    903	if (!node) {
    904		spin_lock(&o2hb_live_lock);
    905		tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
    906		spin_unlock(&o2hb_live_lock);
    907		if (!tmp)
    908			return 0;
    909	}
    910
    911	if (!o2hb_verify_crc(reg, hb_block)) {
    912		/* all paths from here will drop o2hb_live_lock for
    913		 * us. */
    914		spin_lock(&o2hb_live_lock);
    915
    916		/* Don't print an error on the console in this case -
    917		 * a freshly formatted heartbeat area will not have a
    918		 * crc set on it. */
    919		if (list_empty(&slot->ds_live_item))
    920			goto out;
    921
    922		/* The node is live but pushed out a bad crc. We
    923		 * consider it a transient miss but don't populate any
    924		 * other values as they may be junk. */
    925		mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
    926		     slot->ds_node_num, reg->hr_dev_name);
    927		o2hb_dump_slot(hb_block);
    928
    929		slot->ds_equal_samples++;
    930		goto fire_callbacks;
    931	}
    932
    933	/* we don't care if these wrap.. the state transitions below
    934	 * clear at the right places */
    935	cputime = le64_to_cpu(hb_block->hb_seq);
    936	if (slot->ds_last_time != cputime)
    937		slot->ds_changed_samples++;
    938	else
    939		slot->ds_equal_samples++;
    940	slot->ds_last_time = cputime;
    941
    942	/* The node changed heartbeat generations. We assume this to
    943	 * mean it dropped off but came back before we timed out. We
    944	 * want to consider it down for the time being but don't want
    945	 * to lose any changed_samples state we might build up to
    946	 * considering it live again. */
    947	if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
    948		gen_changed = 1;
    949		slot->ds_equal_samples = 0;
    950		mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
    951		     "to 0x%llx)\n", slot->ds_node_num,
    952		     (long long)slot->ds_last_generation,
    953		     (long long)le64_to_cpu(hb_block->hb_generation));
    954	}
    955
    956	slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
    957
    958	mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
    959	     "seq %llu last %llu changed %u equal %u\n",
    960	     slot->ds_node_num, (long long)slot->ds_last_generation,
    961	     le32_to_cpu(hb_block->hb_cksum),
    962	     (unsigned long long)le64_to_cpu(hb_block->hb_seq),
    963	     (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
    964	     slot->ds_equal_samples);
    965
    966	spin_lock(&o2hb_live_lock);
    967
    968fire_callbacks:
    969	/* dead nodes only come to life after some number of
    970	 * changes at any time during their dead time */
    971	if (list_empty(&slot->ds_live_item) &&
    972	    slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
    973		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
    974		     slot->ds_node_num, (long long)slot->ds_last_generation);
    975
    976		set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
    977
    978		/* first on the list generates a callback */
    979		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
    980			mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
    981			     "bitmap\n", slot->ds_node_num);
    982			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
    983
    984			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
    985					      slot->ds_node_num);
    986
    987			changed = 1;
    988			queued = 1;
    989		}
    990
    991		list_add_tail(&slot->ds_live_item,
    992			      &o2hb_live_slots[slot->ds_node_num]);
    993
    994		slot->ds_equal_samples = 0;
    995
    996		/* We want to be sure that all nodes agree on the
    997		 * number of milliseconds before a node will be
    998		 * considered dead. The self-fencing timeout is
    999		 * computed from this value, and a discrepancy might
   1000		 * result in heartbeat calling a node dead when it
   1001		 * hasn't self-fenced yet. */
   1002		slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
   1003		if (slot_dead_ms && slot_dead_ms != dead_ms) {
   1004			/* TODO: Perhaps we can fail the region here. */
   1005			mlog(ML_ERROR, "Node %d on device %s has a dead count "
   1006			     "of %u ms, but our count is %u ms.\n"
   1007			     "Please double check your configuration values "
   1008			     "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
   1009			     slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
   1010			     dead_ms);
   1011		}
   1012		goto out;
   1013	}
   1014
   1015	/* if the list is dead, we're done.. */
   1016	if (list_empty(&slot->ds_live_item))
   1017		goto out;
   1018
   1019	/* live nodes only go dead after enough consequtive missed
   1020	 * samples..  reset the missed counter whenever we see
   1021	 * activity */
   1022	if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
   1023		mlog(ML_HEARTBEAT, "Node %d left my region\n",
   1024		     slot->ds_node_num);
   1025
   1026		clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
   1027
   1028		/* last off the live_slot generates a callback */
   1029		list_del_init(&slot->ds_live_item);
   1030		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
   1031			mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
   1032			     "nodes bitmap\n", slot->ds_node_num);
   1033			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
   1034
   1035			/* node can be null */
   1036			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
   1037					      node, slot->ds_node_num);
   1038
   1039			changed = 1;
   1040			queued = 1;
   1041		}
   1042
   1043		/* We don't clear this because the node is still
   1044		 * actually writing new blocks. */
   1045		if (!gen_changed)
   1046			slot->ds_changed_samples = 0;
   1047		goto out;
   1048	}
   1049	if (slot->ds_changed_samples) {
   1050		slot->ds_changed_samples = 0;
   1051		slot->ds_equal_samples = 0;
   1052	}
   1053out:
   1054	spin_unlock(&o2hb_live_lock);
   1055
   1056	if (queued)
   1057		o2hb_run_event_list(&event);
   1058
   1059	if (node)
   1060		o2nm_node_put(node);
   1061	return changed;
   1062}
   1063
   1064static int o2hb_highest_node(unsigned long *nodes, int numbits)
   1065{
   1066	return find_last_bit(nodes, numbits);
   1067}
   1068
   1069static int o2hb_lowest_node(unsigned long *nodes, int numbits)
   1070{
   1071	return find_first_bit(nodes, numbits);
   1072}
   1073
   1074static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
   1075{
   1076	int i, ret, highest_node, lowest_node;
   1077	int membership_change = 0, own_slot_ok = 0;
   1078	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
   1079	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
   1080	struct o2hb_bio_wait_ctxt write_wc;
   1081
   1082	ret = o2nm_configured_node_map(configured_nodes,
   1083				       sizeof(configured_nodes));
   1084	if (ret) {
   1085		mlog_errno(ret);
   1086		goto bail;
   1087	}
   1088
   1089	/*
   1090	 * If a node is not configured but is in the livemap, we still need
   1091	 * to read the slot so as to be able to remove it from the livemap.
   1092	 */
   1093	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
   1094	i = -1;
   1095	while ((i = find_next_bit(live_node_bitmap,
   1096				  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
   1097		set_bit(i, configured_nodes);
   1098	}
   1099
   1100	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
   1101	lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES);
   1102	if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) {
   1103		mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
   1104		ret = -EINVAL;
   1105		goto bail;
   1106	}
   1107
   1108	/* No sense in reading the slots of nodes that don't exist
   1109	 * yet. Of course, if the node definitions have holes in them
   1110	 * then we're reading an empty slot anyway... Consider this
   1111	 * best-effort. */
   1112	ret = o2hb_read_slots(reg, lowest_node, highest_node + 1);
   1113	if (ret < 0) {
   1114		mlog_errno(ret);
   1115		goto bail;
   1116	}
   1117
   1118	/* With an up to date view of the slots, we can check that no
   1119	 * other node has been improperly configured to heartbeat in
   1120	 * our slot. */
   1121	own_slot_ok = o2hb_check_own_slot(reg);
   1122
   1123	/* fill in the proper info for our next heartbeat */
   1124	o2hb_prepare_block(reg, reg->hr_generation);
   1125
   1126	ret = o2hb_issue_node_write(reg, &write_wc);
   1127	if (ret < 0) {
   1128		mlog_errno(ret);
   1129		goto bail;
   1130	}
   1131
   1132	i = -1;
   1133	while((i = find_next_bit(configured_nodes,
   1134				 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
   1135		membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
   1136	}
   1137
   1138	/*
   1139	 * We have to be sure we've advertised ourselves on disk
   1140	 * before we can go to steady state.  This ensures that
   1141	 * people we find in our steady state have seen us.
   1142	 */
   1143	o2hb_wait_on_io(&write_wc);
   1144	if (write_wc.wc_error) {
   1145		/* Do not re-arm the write timeout on I/O error - we
   1146		 * can't be sure that the new block ever made it to
   1147		 * disk */
   1148		mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
   1149		     write_wc.wc_error, reg->hr_dev_name);
   1150		ret = write_wc.wc_error;
   1151		goto bail;
   1152	}
   1153
   1154	/* Skip disarming the timeout if own slot has stale/bad data */
   1155	if (own_slot_ok) {
   1156		o2hb_set_quorum_device(reg);
   1157		o2hb_arm_timeout(reg);
   1158		reg->hr_last_timeout_start = jiffies;
   1159	}
   1160
   1161bail:
   1162	/* let the person who launched us know when things are steady */
   1163	if (atomic_read(&reg->hr_steady_iterations) != 0) {
   1164		if (!ret && own_slot_ok && !membership_change) {
   1165			if (atomic_dec_and_test(&reg->hr_steady_iterations))
   1166				wake_up(&o2hb_steady_queue);
   1167		}
   1168	}
   1169
   1170	if (atomic_read(&reg->hr_steady_iterations) != 0) {
   1171		if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
   1172			printk(KERN_NOTICE "o2hb: Unable to stabilize "
   1173			       "heartbeat on region %s (%s)\n",
   1174			       config_item_name(&reg->hr_item),
   1175			       reg->hr_dev_name);
   1176			atomic_set(&reg->hr_steady_iterations, 0);
   1177			reg->hr_aborted_start = 1;
   1178			wake_up(&o2hb_steady_queue);
   1179			ret = -EIO;
   1180		}
   1181	}
   1182
   1183	return ret;
   1184}
   1185
   1186/*
   1187 * we ride the region ref that the region dir holds.  before the region
   1188 * dir is removed and drops it ref it will wait to tear down this
   1189 * thread.
   1190 */
   1191static int o2hb_thread(void *data)
   1192{
   1193	int i, ret;
   1194	struct o2hb_region *reg = data;
   1195	struct o2hb_bio_wait_ctxt write_wc;
   1196	ktime_t before_hb, after_hb;
   1197	unsigned int elapsed_msec;
   1198
   1199	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
   1200
   1201	set_user_nice(current, MIN_NICE);
   1202
   1203	/* Pin node */
   1204	ret = o2nm_depend_this_node();
   1205	if (ret) {
   1206		mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
   1207		reg->hr_node_deleted = 1;
   1208		wake_up(&o2hb_steady_queue);
   1209		return 0;
   1210	}
   1211
   1212	while (!kthread_should_stop() &&
   1213	       !reg->hr_unclean_stop && !reg->hr_aborted_start) {
   1214		/* We track the time spent inside
   1215		 * o2hb_do_disk_heartbeat so that we avoid more than
   1216		 * hr_timeout_ms between disk writes. On busy systems
   1217		 * this should result in a heartbeat which is less
   1218		 * likely to time itself out. */
   1219		before_hb = ktime_get_real();
   1220
   1221		ret = o2hb_do_disk_heartbeat(reg);
   1222		reg->hr_last_hb_status = ret;
   1223
   1224		after_hb = ktime_get_real();
   1225
   1226		elapsed_msec = (unsigned int)
   1227				ktime_ms_delta(after_hb, before_hb);
   1228
   1229		mlog(ML_HEARTBEAT,
   1230		     "start = %lld, end = %lld, msec = %u, ret = %d\n",
   1231		     before_hb, after_hb, elapsed_msec, ret);
   1232
   1233		if (!kthread_should_stop() &&
   1234		    elapsed_msec < reg->hr_timeout_ms) {
   1235			/* the kthread api has blocked signals for us so no
   1236			 * need to record the return value. */
   1237			msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
   1238		}
   1239	}
   1240
   1241	o2hb_disarm_timeout(reg);
   1242
   1243	/* unclean stop is only used in very bad situation */
   1244	for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
   1245		o2hb_shutdown_slot(&reg->hr_slots[i]);
   1246
   1247	/* Explicit down notification - avoid forcing the other nodes
   1248	 * to timeout on this region when we could just as easily
   1249	 * write a clear generation - thus indicating to them that
   1250	 * this node has left this region.
   1251	 */
   1252	if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
   1253		o2hb_prepare_block(reg, 0);
   1254		ret = o2hb_issue_node_write(reg, &write_wc);
   1255		if (ret == 0)
   1256			o2hb_wait_on_io(&write_wc);
   1257		else
   1258			mlog_errno(ret);
   1259	}
   1260
   1261	/* Unpin node */
   1262	o2nm_undepend_this_node();
   1263
   1264	mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
   1265
   1266	return 0;
   1267}
   1268
   1269#ifdef CONFIG_DEBUG_FS
   1270static int o2hb_debug_open(struct inode *inode, struct file *file)
   1271{
   1272	struct o2hb_debug_buf *db = inode->i_private;
   1273	struct o2hb_region *reg;
   1274	unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
   1275	unsigned long lts;
   1276	char *buf = NULL;
   1277	int i = -1;
   1278	int out = 0;
   1279
   1280	/* max_nodes should be the largest bitmap we pass here */
   1281	BUG_ON(sizeof(map) < db->db_size);
   1282
   1283	buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
   1284	if (!buf)
   1285		goto bail;
   1286
   1287	switch (db->db_type) {
   1288	case O2HB_DB_TYPE_LIVENODES:
   1289	case O2HB_DB_TYPE_LIVEREGIONS:
   1290	case O2HB_DB_TYPE_QUORUMREGIONS:
   1291	case O2HB_DB_TYPE_FAILEDREGIONS:
   1292		spin_lock(&o2hb_live_lock);
   1293		memcpy(map, db->db_data, db->db_size);
   1294		spin_unlock(&o2hb_live_lock);
   1295		break;
   1296
   1297	case O2HB_DB_TYPE_REGION_LIVENODES:
   1298		spin_lock(&o2hb_live_lock);
   1299		reg = (struct o2hb_region *)db->db_data;
   1300		memcpy(map, reg->hr_live_node_bitmap, db->db_size);
   1301		spin_unlock(&o2hb_live_lock);
   1302		break;
   1303
   1304	case O2HB_DB_TYPE_REGION_NUMBER:
   1305		reg = (struct o2hb_region *)db->db_data;
   1306		out += scnprintf(buf + out, PAGE_SIZE - out, "%d\n",
   1307				reg->hr_region_num);
   1308		goto done;
   1309
   1310	case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
   1311		reg = (struct o2hb_region *)db->db_data;
   1312		lts = reg->hr_last_timeout_start;
   1313		/* If 0, it has never been set before */
   1314		if (lts)
   1315			lts = jiffies_to_msecs(jiffies - lts);
   1316		out += scnprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
   1317		goto done;
   1318
   1319	case O2HB_DB_TYPE_REGION_PINNED:
   1320		reg = (struct o2hb_region *)db->db_data;
   1321		out += scnprintf(buf + out, PAGE_SIZE - out, "%u\n",
   1322				!!reg->hr_item_pinned);
   1323		goto done;
   1324
   1325	default:
   1326		goto done;
   1327	}
   1328
   1329	while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
   1330		out += scnprintf(buf + out, PAGE_SIZE - out, "%d ", i);
   1331	out += scnprintf(buf + out, PAGE_SIZE - out, "\n");
   1332
   1333done:
   1334	i_size_write(inode, out);
   1335
   1336	file->private_data = buf;
   1337
   1338	return 0;
   1339bail:
   1340	return -ENOMEM;
   1341}
   1342
   1343static int o2hb_debug_release(struct inode *inode, struct file *file)
   1344{
   1345	kfree(file->private_data);
   1346	return 0;
   1347}
   1348
   1349static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
   1350				 size_t nbytes, loff_t *ppos)
   1351{
   1352	return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
   1353				       i_size_read(file->f_mapping->host));
   1354}
   1355#else
   1356static int o2hb_debug_open(struct inode *inode, struct file *file)
   1357{
   1358	return 0;
   1359}
   1360static int o2hb_debug_release(struct inode *inode, struct file *file)
   1361{
   1362	return 0;
   1363}
   1364static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
   1365			       size_t nbytes, loff_t *ppos)
   1366{
   1367	return 0;
   1368}
   1369#endif  /* CONFIG_DEBUG_FS */
   1370
   1371static const struct file_operations o2hb_debug_fops = {
   1372	.open =		o2hb_debug_open,
   1373	.release =	o2hb_debug_release,
   1374	.read =		o2hb_debug_read,
   1375	.llseek =	generic_file_llseek,
   1376};
   1377
   1378void o2hb_exit(void)
   1379{
   1380	debugfs_remove_recursive(o2hb_debug_dir);
   1381	kfree(o2hb_db_livenodes);
   1382	kfree(o2hb_db_liveregions);
   1383	kfree(o2hb_db_quorumregions);
   1384	kfree(o2hb_db_failedregions);
   1385}
   1386
   1387static void o2hb_debug_create(const char *name, struct dentry *dir,
   1388			      struct o2hb_debug_buf **db, int db_len, int type,
   1389			      int size, int len, void *data)
   1390{
   1391	*db = kmalloc(db_len, GFP_KERNEL);
   1392	if (!*db)
   1393		return;
   1394
   1395	(*db)->db_type = type;
   1396	(*db)->db_size = size;
   1397	(*db)->db_len = len;
   1398	(*db)->db_data = data;
   1399
   1400	debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops);
   1401}
   1402
   1403static void o2hb_debug_init(void)
   1404{
   1405	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
   1406
   1407	o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir,
   1408			  &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes),
   1409			  O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap),
   1410			  O2NM_MAX_NODES, o2hb_live_node_bitmap);
   1411
   1412	o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir,
   1413			  &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions),
   1414			  O2HB_DB_TYPE_LIVEREGIONS,
   1415			  sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS,
   1416			  o2hb_live_region_bitmap);
   1417
   1418	o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir,
   1419			  &o2hb_db_quorumregions,
   1420			  sizeof(*o2hb_db_quorumregions),
   1421			  O2HB_DB_TYPE_QUORUMREGIONS,
   1422			  sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS,
   1423			  o2hb_quorum_region_bitmap);
   1424
   1425	o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir,
   1426			  &o2hb_db_failedregions,
   1427			  sizeof(*o2hb_db_failedregions),
   1428			  O2HB_DB_TYPE_FAILEDREGIONS,
   1429			  sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS,
   1430			  o2hb_failed_region_bitmap);
   1431}
   1432
   1433void o2hb_init(void)
   1434{
   1435	int i;
   1436
   1437	for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
   1438		INIT_LIST_HEAD(&o2hb_callbacks[i].list);
   1439
   1440	for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
   1441		INIT_LIST_HEAD(&o2hb_live_slots[i]);
   1442
   1443	memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
   1444	memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
   1445	memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
   1446	memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
   1447	memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
   1448
   1449	o2hb_dependent_users = 0;
   1450
   1451	o2hb_debug_init();
   1452}
   1453
   1454/* if we're already in a callback then we're already serialized by the sem */
   1455static void o2hb_fill_node_map_from_callback(unsigned long *map,
   1456					     unsigned bytes)
   1457{
   1458	BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
   1459
   1460	memcpy(map, &o2hb_live_node_bitmap, bytes);
   1461}
   1462
   1463/*
   1464 * get a map of all nodes that are heartbeating in any regions
   1465 */
   1466void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
   1467{
   1468	/* callers want to serialize this map and callbacks so that they
   1469	 * can trust that they don't miss nodes coming to the party */
   1470	down_read(&o2hb_callback_sem);
   1471	spin_lock(&o2hb_live_lock);
   1472	o2hb_fill_node_map_from_callback(map, bytes);
   1473	spin_unlock(&o2hb_live_lock);
   1474	up_read(&o2hb_callback_sem);
   1475}
   1476EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
   1477
   1478/*
   1479 * heartbeat configfs bits.  The heartbeat set is a default set under
   1480 * the cluster set in nodemanager.c.
   1481 */
   1482
   1483static struct o2hb_region *to_o2hb_region(struct config_item *item)
   1484{
   1485	return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
   1486}
   1487
   1488/* drop_item only drops its ref after killing the thread, nothing should
   1489 * be using the region anymore.  this has to clean up any state that
   1490 * attributes might have built up. */
   1491static void o2hb_region_release(struct config_item *item)
   1492{
   1493	int i;
   1494	struct page *page;
   1495	struct o2hb_region *reg = to_o2hb_region(item);
   1496
   1497	mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
   1498
   1499	kfree(reg->hr_tmp_block);
   1500
   1501	if (reg->hr_slot_data) {
   1502		for (i = 0; i < reg->hr_num_pages; i++) {
   1503			page = reg->hr_slot_data[i];
   1504			if (page)
   1505				__free_page(page);
   1506		}
   1507		kfree(reg->hr_slot_data);
   1508	}
   1509
   1510	if (reg->hr_bdev)
   1511		blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
   1512
   1513	kfree(reg->hr_slots);
   1514
   1515	debugfs_remove_recursive(reg->hr_debug_dir);
   1516	kfree(reg->hr_db_livenodes);
   1517	kfree(reg->hr_db_regnum);
   1518	kfree(reg->hr_db_elapsed_time);
   1519	kfree(reg->hr_db_pinned);
   1520
   1521	spin_lock(&o2hb_live_lock);
   1522	list_del(&reg->hr_all_item);
   1523	spin_unlock(&o2hb_live_lock);
   1524
   1525	o2net_unregister_handler_list(&reg->hr_handler_list);
   1526	kfree(reg);
   1527}
   1528
   1529static int o2hb_read_block_input(struct o2hb_region *reg,
   1530				 const char *page,
   1531				 unsigned long *ret_bytes,
   1532				 unsigned int *ret_bits)
   1533{
   1534	unsigned long bytes;
   1535	char *p = (char *)page;
   1536
   1537	bytes = simple_strtoul(p, &p, 0);
   1538	if (!p || (*p && (*p != '\n')))
   1539		return -EINVAL;
   1540
   1541	/* Heartbeat and fs min / max block sizes are the same. */
   1542	if (bytes > 4096 || bytes < 512)
   1543		return -ERANGE;
   1544	if (hweight16(bytes) != 1)
   1545		return -EINVAL;
   1546
   1547	if (ret_bytes)
   1548		*ret_bytes = bytes;
   1549	if (ret_bits)
   1550		*ret_bits = ffs(bytes) - 1;
   1551
   1552	return 0;
   1553}
   1554
   1555static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
   1556					    char *page)
   1557{
   1558	return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
   1559}
   1560
   1561static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
   1562					     const char *page,
   1563					     size_t count)
   1564{
   1565	struct o2hb_region *reg = to_o2hb_region(item);
   1566	int status;
   1567	unsigned long block_bytes;
   1568	unsigned int block_bits;
   1569
   1570	if (reg->hr_bdev)
   1571		return -EINVAL;
   1572
   1573	status = o2hb_read_block_input(reg, page, &block_bytes,
   1574				       &block_bits);
   1575	if (status)
   1576		return status;
   1577
   1578	reg->hr_block_bytes = (unsigned int)block_bytes;
   1579	reg->hr_block_bits = block_bits;
   1580
   1581	return count;
   1582}
   1583
   1584static ssize_t o2hb_region_start_block_show(struct config_item *item,
   1585					    char *page)
   1586{
   1587	return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
   1588}
   1589
   1590static ssize_t o2hb_region_start_block_store(struct config_item *item,
   1591					     const char *page,
   1592					     size_t count)
   1593{
   1594	struct o2hb_region *reg = to_o2hb_region(item);
   1595	unsigned long long tmp;
   1596	char *p = (char *)page;
   1597	ssize_t ret;
   1598
   1599	if (reg->hr_bdev)
   1600		return -EINVAL;
   1601
   1602	ret = kstrtoull(p, 0, &tmp);
   1603	if (ret)
   1604		return -EINVAL;
   1605
   1606	reg->hr_start_block = tmp;
   1607
   1608	return count;
   1609}
   1610
   1611static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
   1612{
   1613	return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
   1614}
   1615
   1616static ssize_t o2hb_region_blocks_store(struct config_item *item,
   1617					const char *page,
   1618					size_t count)
   1619{
   1620	struct o2hb_region *reg = to_o2hb_region(item);
   1621	unsigned long tmp;
   1622	char *p = (char *)page;
   1623
   1624	if (reg->hr_bdev)
   1625		return -EINVAL;
   1626
   1627	tmp = simple_strtoul(p, &p, 0);
   1628	if (!p || (*p && (*p != '\n')))
   1629		return -EINVAL;
   1630
   1631	if (tmp > O2NM_MAX_NODES || tmp == 0)
   1632		return -ERANGE;
   1633
   1634	reg->hr_blocks = (unsigned int)tmp;
   1635
   1636	return count;
   1637}
   1638
   1639static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
   1640{
   1641	unsigned int ret = 0;
   1642
   1643	if (to_o2hb_region(item)->hr_bdev)
   1644		ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
   1645
   1646	return ret;
   1647}
   1648
   1649static void o2hb_init_region_params(struct o2hb_region *reg)
   1650{
   1651	reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
   1652	reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
   1653
   1654	mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
   1655	     reg->hr_start_block, reg->hr_blocks);
   1656	mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
   1657	     reg->hr_block_bytes, reg->hr_block_bits);
   1658	mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
   1659	mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
   1660}
   1661
   1662static int o2hb_map_slot_data(struct o2hb_region *reg)
   1663{
   1664	int i, j;
   1665	unsigned int last_slot;
   1666	unsigned int spp = reg->hr_slots_per_page;
   1667	struct page *page;
   1668	char *raw;
   1669	struct o2hb_disk_slot *slot;
   1670
   1671	reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
   1672	if (reg->hr_tmp_block == NULL)
   1673		return -ENOMEM;
   1674
   1675	reg->hr_slots = kcalloc(reg->hr_blocks,
   1676				sizeof(struct o2hb_disk_slot), GFP_KERNEL);
   1677	if (reg->hr_slots == NULL)
   1678		return -ENOMEM;
   1679
   1680	for(i = 0; i < reg->hr_blocks; i++) {
   1681		slot = &reg->hr_slots[i];
   1682		slot->ds_node_num = i;
   1683		INIT_LIST_HEAD(&slot->ds_live_item);
   1684		slot->ds_raw_block = NULL;
   1685	}
   1686
   1687	reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
   1688	mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
   1689			   "at %u blocks per page\n",
   1690	     reg->hr_num_pages, reg->hr_blocks, spp);
   1691
   1692	reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
   1693				    GFP_KERNEL);
   1694	if (!reg->hr_slot_data)
   1695		return -ENOMEM;
   1696
   1697	for(i = 0; i < reg->hr_num_pages; i++) {
   1698		page = alloc_page(GFP_KERNEL);
   1699		if (!page)
   1700			return -ENOMEM;
   1701
   1702		reg->hr_slot_data[i] = page;
   1703
   1704		last_slot = i * spp;
   1705		raw = page_address(page);
   1706		for (j = 0;
   1707		     (j < spp) && ((j + last_slot) < reg->hr_blocks);
   1708		     j++) {
   1709			BUG_ON((j + last_slot) >= reg->hr_blocks);
   1710
   1711			slot = &reg->hr_slots[j + last_slot];
   1712			slot->ds_raw_block =
   1713				(struct o2hb_disk_heartbeat_block *) raw;
   1714
   1715			raw += reg->hr_block_bytes;
   1716		}
   1717	}
   1718
   1719	return 0;
   1720}
   1721
   1722/* Read in all the slots available and populate the tracking
   1723 * structures so that we can start with a baseline idea of what's
   1724 * there. */
   1725static int o2hb_populate_slot_data(struct o2hb_region *reg)
   1726{
   1727	int ret, i;
   1728	struct o2hb_disk_slot *slot;
   1729	struct o2hb_disk_heartbeat_block *hb_block;
   1730
   1731	ret = o2hb_read_slots(reg, 0, reg->hr_blocks);
   1732	if (ret)
   1733		goto out;
   1734
   1735	/* We only want to get an idea of the values initially in each
   1736	 * slot, so we do no verification - o2hb_check_slot will
   1737	 * actually determine if each configured slot is valid and
   1738	 * whether any values have changed. */
   1739	for(i = 0; i < reg->hr_blocks; i++) {
   1740		slot = &reg->hr_slots[i];
   1741		hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
   1742
   1743		/* Only fill the values that o2hb_check_slot uses to
   1744		 * determine changing slots */
   1745		slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
   1746		slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
   1747	}
   1748
   1749out:
   1750	return ret;
   1751}
   1752
   1753/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
   1754static ssize_t o2hb_region_dev_store(struct config_item *item,
   1755				     const char *page,
   1756				     size_t count)
   1757{
   1758	struct o2hb_region *reg = to_o2hb_region(item);
   1759	struct task_struct *hb_task;
   1760	long fd;
   1761	int sectsize;
   1762	char *p = (char *)page;
   1763	struct fd f;
   1764	ssize_t ret = -EINVAL;
   1765	int live_threshold;
   1766
   1767	if (reg->hr_bdev)
   1768		goto out;
   1769
   1770	/* We can't heartbeat without having had our node number
   1771	 * configured yet. */
   1772	if (o2nm_this_node() == O2NM_MAX_NODES)
   1773		goto out;
   1774
   1775	fd = simple_strtol(p, &p, 0);
   1776	if (!p || (*p && (*p != '\n')))
   1777		goto out;
   1778
   1779	if (fd < 0 || fd >= INT_MAX)
   1780		goto out;
   1781
   1782	f = fdget(fd);
   1783	if (f.file == NULL)
   1784		goto out;
   1785
   1786	if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
   1787	    reg->hr_block_bytes == 0)
   1788		goto out2;
   1789
   1790	if (!S_ISBLK(f.file->f_mapping->host->i_mode))
   1791		goto out2;
   1792
   1793	reg->hr_bdev = blkdev_get_by_dev(f.file->f_mapping->host->i_rdev,
   1794					 FMODE_WRITE | FMODE_READ, NULL);
   1795	if (IS_ERR(reg->hr_bdev)) {
   1796		ret = PTR_ERR(reg->hr_bdev);
   1797		reg->hr_bdev = NULL;
   1798		goto out2;
   1799	}
   1800
   1801	bdevname(reg->hr_bdev, reg->hr_dev_name);
   1802
   1803	sectsize = bdev_logical_block_size(reg->hr_bdev);
   1804	if (sectsize != reg->hr_block_bytes) {
   1805		mlog(ML_ERROR,
   1806		     "blocksize %u incorrect for device, expected %d",
   1807		     reg->hr_block_bytes, sectsize);
   1808		ret = -EINVAL;
   1809		goto out3;
   1810	}
   1811
   1812	o2hb_init_region_params(reg);
   1813
   1814	/* Generation of zero is invalid */
   1815	do {
   1816		get_random_bytes(&reg->hr_generation,
   1817				 sizeof(reg->hr_generation));
   1818	} while (reg->hr_generation == 0);
   1819
   1820	ret = o2hb_map_slot_data(reg);
   1821	if (ret) {
   1822		mlog_errno(ret);
   1823		goto out3;
   1824	}
   1825
   1826	ret = o2hb_populate_slot_data(reg);
   1827	if (ret) {
   1828		mlog_errno(ret);
   1829		goto out3;
   1830	}
   1831
   1832	INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
   1833	INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
   1834
   1835	/*
   1836	 * A node is considered live after it has beat LIVE_THRESHOLD
   1837	 * times.  We're not steady until we've given them a chance
   1838	 * _after_ our first read.
   1839	 * The default threshold is bare minimum so as to limit the delay
   1840	 * during mounts. For global heartbeat, the threshold doubled for the
   1841	 * first region.
   1842	 */
   1843	live_threshold = O2HB_LIVE_THRESHOLD;
   1844	if (o2hb_global_heartbeat_active()) {
   1845		spin_lock(&o2hb_live_lock);
   1846		if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
   1847			live_threshold <<= 1;
   1848		spin_unlock(&o2hb_live_lock);
   1849	}
   1850	++live_threshold;
   1851	atomic_set(&reg->hr_steady_iterations, live_threshold);
   1852	/* unsteady_iterations is triple the steady_iterations */
   1853	atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
   1854
   1855	hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
   1856			      reg->hr_item.ci_name);
   1857	if (IS_ERR(hb_task)) {
   1858		ret = PTR_ERR(hb_task);
   1859		mlog_errno(ret);
   1860		goto out3;
   1861	}
   1862
   1863	spin_lock(&o2hb_live_lock);
   1864	reg->hr_task = hb_task;
   1865	spin_unlock(&o2hb_live_lock);
   1866
   1867	ret = wait_event_interruptible(o2hb_steady_queue,
   1868				atomic_read(&reg->hr_steady_iterations) == 0 ||
   1869				reg->hr_node_deleted);
   1870	if (ret) {
   1871		atomic_set(&reg->hr_steady_iterations, 0);
   1872		reg->hr_aborted_start = 1;
   1873	}
   1874
   1875	if (reg->hr_aborted_start) {
   1876		ret = -EIO;
   1877		goto out3;
   1878	}
   1879
   1880	if (reg->hr_node_deleted) {
   1881		ret = -EINVAL;
   1882		goto out3;
   1883	}
   1884
   1885	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
   1886	spin_lock(&o2hb_live_lock);
   1887	hb_task = reg->hr_task;
   1888	if (o2hb_global_heartbeat_active())
   1889		set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
   1890	spin_unlock(&o2hb_live_lock);
   1891
   1892	if (hb_task)
   1893		ret = count;
   1894	else
   1895		ret = -EIO;
   1896
   1897	if (hb_task && o2hb_global_heartbeat_active())
   1898		printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
   1899		       config_item_name(&reg->hr_item), reg->hr_dev_name);
   1900
   1901out3:
   1902	if (ret < 0) {
   1903		blkdev_put(reg->hr_bdev, FMODE_READ | FMODE_WRITE);
   1904		reg->hr_bdev = NULL;
   1905	}
   1906out2:
   1907	fdput(f);
   1908out:
   1909	return ret;
   1910}
   1911
   1912static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
   1913{
   1914	struct o2hb_region *reg = to_o2hb_region(item);
   1915	pid_t pid = 0;
   1916
   1917	spin_lock(&o2hb_live_lock);
   1918	if (reg->hr_task)
   1919		pid = task_pid_nr(reg->hr_task);
   1920	spin_unlock(&o2hb_live_lock);
   1921
   1922	if (!pid)
   1923		return 0;
   1924
   1925	return sprintf(page, "%u\n", pid);
   1926}
   1927
   1928CONFIGFS_ATTR(o2hb_region_, block_bytes);
   1929CONFIGFS_ATTR(o2hb_region_, start_block);
   1930CONFIGFS_ATTR(o2hb_region_, blocks);
   1931CONFIGFS_ATTR(o2hb_region_, dev);
   1932CONFIGFS_ATTR_RO(o2hb_region_, pid);
   1933
   1934static struct configfs_attribute *o2hb_region_attrs[] = {
   1935	&o2hb_region_attr_block_bytes,
   1936	&o2hb_region_attr_start_block,
   1937	&o2hb_region_attr_blocks,
   1938	&o2hb_region_attr_dev,
   1939	&o2hb_region_attr_pid,
   1940	NULL,
   1941};
   1942
   1943static struct configfs_item_operations o2hb_region_item_ops = {
   1944	.release		= o2hb_region_release,
   1945};
   1946
   1947static const struct config_item_type o2hb_region_type = {
   1948	.ct_item_ops	= &o2hb_region_item_ops,
   1949	.ct_attrs	= o2hb_region_attrs,
   1950	.ct_owner	= THIS_MODULE,
   1951};
   1952
   1953/* heartbeat set */
   1954
   1955struct o2hb_heartbeat_group {
   1956	struct config_group hs_group;
   1957	/* some stuff? */
   1958};
   1959
   1960static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
   1961{
   1962	return group ?
   1963		container_of(group, struct o2hb_heartbeat_group, hs_group)
   1964		: NULL;
   1965}
   1966
   1967static void o2hb_debug_region_init(struct o2hb_region *reg,
   1968				   struct dentry *parent)
   1969{
   1970	struct dentry *dir;
   1971
   1972	dir = debugfs_create_dir(config_item_name(&reg->hr_item), parent);
   1973	reg->hr_debug_dir = dir;
   1974
   1975	o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes),
   1976			  sizeof(*(reg->hr_db_livenodes)),
   1977			  O2HB_DB_TYPE_REGION_LIVENODES,
   1978			  sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES,
   1979			  reg);
   1980
   1981	o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum),
   1982			  sizeof(*(reg->hr_db_regnum)),
   1983			  O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg);
   1984
   1985	o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir,
   1986			  &(reg->hr_db_elapsed_time),
   1987			  sizeof(*(reg->hr_db_elapsed_time)),
   1988			  O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg);
   1989
   1990	o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned),
   1991			  sizeof(*(reg->hr_db_pinned)),
   1992			  O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg);
   1993
   1994}
   1995
   1996static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
   1997							  const char *name)
   1998{
   1999	struct o2hb_region *reg = NULL;
   2000	int ret;
   2001
   2002	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
   2003	if (reg == NULL)
   2004		return ERR_PTR(-ENOMEM);
   2005
   2006	if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
   2007		ret = -ENAMETOOLONG;
   2008		goto free;
   2009	}
   2010
   2011	spin_lock(&o2hb_live_lock);
   2012	reg->hr_region_num = 0;
   2013	if (o2hb_global_heartbeat_active()) {
   2014		reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
   2015							 O2NM_MAX_REGIONS);
   2016		if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
   2017			spin_unlock(&o2hb_live_lock);
   2018			ret = -EFBIG;
   2019			goto free;
   2020		}
   2021		set_bit(reg->hr_region_num, o2hb_region_bitmap);
   2022	}
   2023	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
   2024	spin_unlock(&o2hb_live_lock);
   2025
   2026	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
   2027
   2028	/* this is the same way to generate msg key as dlm, for local heartbeat,
   2029	 * name is also the same, so make initial crc value different to avoid
   2030	 * message key conflict.
   2031	 */
   2032	reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
   2033		name, strlen(name));
   2034	INIT_LIST_HEAD(&reg->hr_handler_list);
   2035	ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
   2036			sizeof(struct o2hb_nego_msg),
   2037			o2hb_nego_timeout_handler,
   2038			reg, NULL, &reg->hr_handler_list);
   2039	if (ret)
   2040		goto remove_item;
   2041
   2042	ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
   2043			sizeof(struct o2hb_nego_msg),
   2044			o2hb_nego_approve_handler,
   2045			reg, NULL, &reg->hr_handler_list);
   2046	if (ret)
   2047		goto unregister_handler;
   2048
   2049	o2hb_debug_region_init(reg, o2hb_debug_dir);
   2050
   2051	return &reg->hr_item;
   2052
   2053unregister_handler:
   2054	o2net_unregister_handler_list(&reg->hr_handler_list);
   2055remove_item:
   2056	spin_lock(&o2hb_live_lock);
   2057	list_del(&reg->hr_all_item);
   2058	if (o2hb_global_heartbeat_active())
   2059		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
   2060	spin_unlock(&o2hb_live_lock);
   2061free:
   2062	kfree(reg);
   2063	return ERR_PTR(ret);
   2064}
   2065
   2066static void o2hb_heartbeat_group_drop_item(struct config_group *group,
   2067					   struct config_item *item)
   2068{
   2069	struct task_struct *hb_task;
   2070	struct o2hb_region *reg = to_o2hb_region(item);
   2071	int quorum_region = 0;
   2072
   2073	/* stop the thread when the user removes the region dir */
   2074	spin_lock(&o2hb_live_lock);
   2075	hb_task = reg->hr_task;
   2076	reg->hr_task = NULL;
   2077	reg->hr_item_dropped = 1;
   2078	spin_unlock(&o2hb_live_lock);
   2079
   2080	if (hb_task)
   2081		kthread_stop(hb_task);
   2082
   2083	if (o2hb_global_heartbeat_active()) {
   2084		spin_lock(&o2hb_live_lock);
   2085		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
   2086		clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
   2087		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
   2088			quorum_region = 1;
   2089		clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
   2090		spin_unlock(&o2hb_live_lock);
   2091		printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
   2092		       ((atomic_read(&reg->hr_steady_iterations) == 0) ?
   2093			"stopped" : "start aborted"), config_item_name(item),
   2094		       reg->hr_dev_name);
   2095	}
   2096
   2097	/*
   2098	 * If we're racing a dev_write(), we need to wake them.  They will
   2099	 * check reg->hr_task
   2100	 */
   2101	if (atomic_read(&reg->hr_steady_iterations) != 0) {
   2102		reg->hr_aborted_start = 1;
   2103		atomic_set(&reg->hr_steady_iterations, 0);
   2104		wake_up(&o2hb_steady_queue);
   2105	}
   2106
   2107	config_item_put(item);
   2108
   2109	if (!o2hb_global_heartbeat_active() || !quorum_region)
   2110		return;
   2111
   2112	/*
   2113	 * If global heartbeat active and there are dependent users,
   2114	 * pin all regions if quorum region count <= CUT_OFF
   2115	 */
   2116	spin_lock(&o2hb_live_lock);
   2117
   2118	if (!o2hb_dependent_users)
   2119		goto unlock;
   2120
   2121	if (bitmap_weight(o2hb_quorum_region_bitmap,
   2122			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
   2123		o2hb_region_pin(NULL);
   2124
   2125unlock:
   2126	spin_unlock(&o2hb_live_lock);
   2127}
   2128
   2129static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
   2130		char *page)
   2131{
   2132	return sprintf(page, "%u\n", o2hb_dead_threshold);
   2133}
   2134
   2135static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
   2136		const char *page, size_t count)
   2137{
   2138	unsigned long tmp;
   2139	char *p = (char *)page;
   2140
   2141	tmp = simple_strtoul(p, &p, 10);
   2142	if (!p || (*p && (*p != '\n')))
   2143                return -EINVAL;
   2144
   2145	/* this will validate ranges for us. */
   2146	o2hb_dead_threshold_set((unsigned int) tmp);
   2147
   2148	return count;
   2149}
   2150
   2151static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
   2152		char *page)
   2153{
   2154	return sprintf(page, "%s\n",
   2155		       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
   2156}
   2157
   2158static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
   2159		const char *page, size_t count)
   2160{
   2161	unsigned int i;
   2162	int ret;
   2163	size_t len;
   2164
   2165	len = (page[count - 1] == '\n') ? count - 1 : count;
   2166	if (!len)
   2167		return -EINVAL;
   2168
   2169	for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
   2170		if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
   2171			continue;
   2172
   2173		ret = o2hb_global_heartbeat_mode_set(i);
   2174		if (!ret)
   2175			printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
   2176			       o2hb_heartbeat_mode_desc[i]);
   2177		return count;
   2178	}
   2179
   2180	return -EINVAL;
   2181
   2182}
   2183
   2184CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
   2185CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
   2186
   2187static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
   2188	&o2hb_heartbeat_group_attr_dead_threshold,
   2189	&o2hb_heartbeat_group_attr_mode,
   2190	NULL,
   2191};
   2192
   2193static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
   2194	.make_item	= o2hb_heartbeat_group_make_item,
   2195	.drop_item	= o2hb_heartbeat_group_drop_item,
   2196};
   2197
   2198static const struct config_item_type o2hb_heartbeat_group_type = {
   2199	.ct_group_ops	= &o2hb_heartbeat_group_group_ops,
   2200	.ct_attrs	= o2hb_heartbeat_group_attrs,
   2201	.ct_owner	= THIS_MODULE,
   2202};
   2203
   2204/* this is just here to avoid touching group in heartbeat.h which the
   2205 * entire damn world #includes */
   2206struct config_group *o2hb_alloc_hb_set(void)
   2207{
   2208	struct o2hb_heartbeat_group *hs = NULL;
   2209	struct config_group *ret = NULL;
   2210
   2211	hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
   2212	if (hs == NULL)
   2213		goto out;
   2214
   2215	config_group_init_type_name(&hs->hs_group, "heartbeat",
   2216				    &o2hb_heartbeat_group_type);
   2217
   2218	ret = &hs->hs_group;
   2219out:
   2220	if (ret == NULL)
   2221		kfree(hs);
   2222	return ret;
   2223}
   2224
   2225void o2hb_free_hb_set(struct config_group *group)
   2226{
   2227	struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
   2228	kfree(hs);
   2229}
   2230
   2231/* hb callback registration and issuing */
   2232
   2233static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
   2234{
   2235	if (type == O2HB_NUM_CB)
   2236		return ERR_PTR(-EINVAL);
   2237
   2238	return &o2hb_callbacks[type];
   2239}
   2240
   2241void o2hb_setup_callback(struct o2hb_callback_func *hc,
   2242			 enum o2hb_callback_type type,
   2243			 o2hb_cb_func *func,
   2244			 void *data,
   2245			 int priority)
   2246{
   2247	INIT_LIST_HEAD(&hc->hc_item);
   2248	hc->hc_func = func;
   2249	hc->hc_data = data;
   2250	hc->hc_priority = priority;
   2251	hc->hc_type = type;
   2252	hc->hc_magic = O2HB_CB_MAGIC;
   2253}
   2254EXPORT_SYMBOL_GPL(o2hb_setup_callback);
   2255
   2256/*
   2257 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
   2258 * In global heartbeat mode, region_uuid passed is NULL.
   2259 *
   2260 * In local, we only pin the matching region. In global we pin all the active
   2261 * regions.
   2262 */
   2263static int o2hb_region_pin(const char *region_uuid)
   2264{
   2265	int ret = 0, found = 0;
   2266	struct o2hb_region *reg;
   2267	char *uuid;
   2268
   2269	assert_spin_locked(&o2hb_live_lock);
   2270
   2271	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
   2272		if (reg->hr_item_dropped)
   2273			continue;
   2274
   2275		uuid = config_item_name(&reg->hr_item);
   2276
   2277		/* local heartbeat */
   2278		if (region_uuid) {
   2279			if (strcmp(region_uuid, uuid))
   2280				continue;
   2281			found = 1;
   2282		}
   2283
   2284		if (reg->hr_item_pinned || reg->hr_item_dropped)
   2285			goto skip_pin;
   2286
   2287		/* Ignore ENOENT only for local hb (userdlm domain) */
   2288		ret = o2nm_depend_item(&reg->hr_item);
   2289		if (!ret) {
   2290			mlog(ML_CLUSTER, "Pin region %s\n", uuid);
   2291			reg->hr_item_pinned = 1;
   2292		} else {
   2293			if (ret == -ENOENT && found)
   2294				ret = 0;
   2295			else {
   2296				mlog(ML_ERROR, "Pin region %s fails with %d\n",
   2297				     uuid, ret);
   2298				break;
   2299			}
   2300		}
   2301skip_pin:
   2302		if (found)
   2303			break;
   2304	}
   2305
   2306	return ret;
   2307}
   2308
   2309/*
   2310 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
   2311 * In global heartbeat mode, region_uuid passed is NULL.
   2312 *
   2313 * In local, we only unpin the matching region. In global we unpin all the
   2314 * active regions.
   2315 */
   2316static void o2hb_region_unpin(const char *region_uuid)
   2317{
   2318	struct o2hb_region *reg;
   2319	char *uuid;
   2320	int found = 0;
   2321
   2322	assert_spin_locked(&o2hb_live_lock);
   2323
   2324	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
   2325		if (reg->hr_item_dropped)
   2326			continue;
   2327
   2328		uuid = config_item_name(&reg->hr_item);
   2329		if (region_uuid) {
   2330			if (strcmp(region_uuid, uuid))
   2331				continue;
   2332			found = 1;
   2333		}
   2334
   2335		if (reg->hr_item_pinned) {
   2336			mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
   2337			o2nm_undepend_item(&reg->hr_item);
   2338			reg->hr_item_pinned = 0;
   2339		}
   2340		if (found)
   2341			break;
   2342	}
   2343}
   2344
   2345static int o2hb_region_inc_user(const char *region_uuid)
   2346{
   2347	int ret = 0;
   2348
   2349	spin_lock(&o2hb_live_lock);
   2350
   2351	/* local heartbeat */
   2352	if (!o2hb_global_heartbeat_active()) {
   2353	    ret = o2hb_region_pin(region_uuid);
   2354	    goto unlock;
   2355	}
   2356
   2357	/*
   2358	 * if global heartbeat active and this is the first dependent user,
   2359	 * pin all regions if quorum region count <= CUT_OFF
   2360	 */
   2361	o2hb_dependent_users++;
   2362	if (o2hb_dependent_users > 1)
   2363		goto unlock;
   2364
   2365	if (bitmap_weight(o2hb_quorum_region_bitmap,
   2366			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
   2367		ret = o2hb_region_pin(NULL);
   2368
   2369unlock:
   2370	spin_unlock(&o2hb_live_lock);
   2371	return ret;
   2372}
   2373
   2374static void o2hb_region_dec_user(const char *region_uuid)
   2375{
   2376	spin_lock(&o2hb_live_lock);
   2377
   2378	/* local heartbeat */
   2379	if (!o2hb_global_heartbeat_active()) {
   2380	    o2hb_region_unpin(region_uuid);
   2381	    goto unlock;
   2382	}
   2383
   2384	/*
   2385	 * if global heartbeat active and there are no dependent users,
   2386	 * unpin all quorum regions
   2387	 */
   2388	o2hb_dependent_users--;
   2389	if (!o2hb_dependent_users)
   2390		o2hb_region_unpin(NULL);
   2391
   2392unlock:
   2393	spin_unlock(&o2hb_live_lock);
   2394}
   2395
   2396int o2hb_register_callback(const char *region_uuid,
   2397			   struct o2hb_callback_func *hc)
   2398{
   2399	struct o2hb_callback_func *f;
   2400	struct o2hb_callback *hbcall;
   2401	int ret;
   2402
   2403	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
   2404	BUG_ON(!list_empty(&hc->hc_item));
   2405
   2406	hbcall = hbcall_from_type(hc->hc_type);
   2407	if (IS_ERR(hbcall)) {
   2408		ret = PTR_ERR(hbcall);
   2409		goto out;
   2410	}
   2411
   2412	if (region_uuid) {
   2413		ret = o2hb_region_inc_user(region_uuid);
   2414		if (ret) {
   2415			mlog_errno(ret);
   2416			goto out;
   2417		}
   2418	}
   2419
   2420	down_write(&o2hb_callback_sem);
   2421
   2422	list_for_each_entry(f, &hbcall->list, hc_item) {
   2423		if (hc->hc_priority < f->hc_priority) {
   2424			list_add_tail(&hc->hc_item, &f->hc_item);
   2425			break;
   2426		}
   2427	}
   2428	if (list_empty(&hc->hc_item))
   2429		list_add_tail(&hc->hc_item, &hbcall->list);
   2430
   2431	up_write(&o2hb_callback_sem);
   2432	ret = 0;
   2433out:
   2434	mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
   2435	     ret, __builtin_return_address(0), hc);
   2436	return ret;
   2437}
   2438EXPORT_SYMBOL_GPL(o2hb_register_callback);
   2439
   2440void o2hb_unregister_callback(const char *region_uuid,
   2441			      struct o2hb_callback_func *hc)
   2442{
   2443	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
   2444
   2445	mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
   2446	     __builtin_return_address(0), hc);
   2447
   2448	/* XXX Can this happen _with_ a region reference? */
   2449	if (list_empty(&hc->hc_item))
   2450		return;
   2451
   2452	if (region_uuid)
   2453		o2hb_region_dec_user(region_uuid);
   2454
   2455	down_write(&o2hb_callback_sem);
   2456
   2457	list_del_init(&hc->hc_item);
   2458
   2459	up_write(&o2hb_callback_sem);
   2460}
   2461EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
   2462
   2463int o2hb_check_node_heartbeating_no_sem(u8 node_num)
   2464{
   2465	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
   2466
   2467	spin_lock(&o2hb_live_lock);
   2468	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
   2469	spin_unlock(&o2hb_live_lock);
   2470	if (!test_bit(node_num, testing_map)) {
   2471		mlog(ML_HEARTBEAT,
   2472		     "node (%u) does not have heartbeating enabled.\n",
   2473		     node_num);
   2474		return 0;
   2475	}
   2476
   2477	return 1;
   2478}
   2479EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
   2480
   2481int o2hb_check_node_heartbeating_from_callback(u8 node_num)
   2482{
   2483	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
   2484
   2485	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
   2486	if (!test_bit(node_num, testing_map)) {
   2487		mlog(ML_HEARTBEAT,
   2488		     "node (%u) does not have heartbeating enabled.\n",
   2489		     node_num);
   2490		return 0;
   2491	}
   2492
   2493	return 1;
   2494}
   2495EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
   2496
   2497/*
   2498 * this is just a hack until we get the plumbing which flips file systems
   2499 * read only and drops the hb ref instead of killing the node dead.
   2500 */
   2501void o2hb_stop_all_regions(void)
   2502{
   2503	struct o2hb_region *reg;
   2504
   2505	mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
   2506
   2507	spin_lock(&o2hb_live_lock);
   2508
   2509	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
   2510		reg->hr_unclean_stop = 1;
   2511
   2512	spin_unlock(&o2hb_live_lock);
   2513}
   2514EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
   2515
   2516int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
   2517{
   2518	struct o2hb_region *reg;
   2519	int numregs = 0;
   2520	char *p;
   2521
   2522	spin_lock(&o2hb_live_lock);
   2523
   2524	p = region_uuids;
   2525	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
   2526		if (reg->hr_item_dropped)
   2527			continue;
   2528
   2529		mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
   2530		if (numregs < max_regions) {
   2531			memcpy(p, config_item_name(&reg->hr_item),
   2532			       O2HB_MAX_REGION_NAME_LEN);
   2533			p += O2HB_MAX_REGION_NAME_LEN;
   2534		}
   2535		numregs++;
   2536	}
   2537
   2538	spin_unlock(&o2hb_live_lock);
   2539
   2540	return numregs;
   2541}
   2542EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
   2543
   2544int o2hb_global_heartbeat_active(void)
   2545{
   2546	return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
   2547}
   2548EXPORT_SYMBOL(o2hb_global_heartbeat_active);