cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

lockspace.c (21527B)


      1// SPDX-License-Identifier: GPL-2.0-only
      2/******************************************************************************
      3*******************************************************************************
      4**
      5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
      6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
      7**
      8**
      9*******************************************************************************
     10******************************************************************************/
     11
     12#include <linux/module.h>
     13
     14#include "dlm_internal.h"
     15#include "lockspace.h"
     16#include "member.h"
     17#include "recoverd.h"
     18#include "dir.h"
     19#include "midcomms.h"
     20#include "lowcomms.h"
     21#include "config.h"
     22#include "memory.h"
     23#include "lock.h"
     24#include "recover.h"
     25#include "requestqueue.h"
     26#include "user.h"
     27#include "ast.h"
     28
     29static int			ls_count;
     30static struct mutex		ls_lock;
     31static struct list_head		lslist;
     32static spinlock_t		lslist_lock;
     33static struct task_struct *	scand_task;
     34
     35
     36static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
     37{
     38	ssize_t ret = len;
     39	int n;
     40	int rc = kstrtoint(buf, 0, &n);
     41
     42	if (rc)
     43		return rc;
     44	ls = dlm_find_lockspace_local(ls->ls_local_handle);
     45	if (!ls)
     46		return -EINVAL;
     47
     48	switch (n) {
     49	case 0:
     50		dlm_ls_stop(ls);
     51		break;
     52	case 1:
     53		dlm_ls_start(ls);
     54		break;
     55	default:
     56		ret = -EINVAL;
     57	}
     58	dlm_put_lockspace(ls);
     59	return ret;
     60}
     61
     62static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
     63{
     64	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
     65
     66	if (rc)
     67		return rc;
     68	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
     69	wake_up(&ls->ls_uevent_wait);
     70	return len;
     71}
     72
     73static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
     74{
     75	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
     76}
     77
     78static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
     79{
     80	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
     81
     82	if (rc)
     83		return rc;
     84	return len;
     85}
     86
     87static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
     88{
     89	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
     90}
     91
     92static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
     93{
     94	int val;
     95	int rc = kstrtoint(buf, 0, &val);
     96
     97	if (rc)
     98		return rc;
     99	if (val == 1)
    100		set_bit(LSFL_NODIR, &ls->ls_flags);
    101	return len;
    102}
    103
    104static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
    105{
    106	uint32_t status = dlm_recover_status(ls);
    107	return snprintf(buf, PAGE_SIZE, "%x\n", status);
    108}
    109
    110static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
    111{
    112	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
    113}
    114
    115struct dlm_attr {
    116	struct attribute attr;
    117	ssize_t (*show)(struct dlm_ls *, char *);
    118	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
    119};
    120
    121static struct dlm_attr dlm_attr_control = {
    122	.attr  = {.name = "control", .mode = S_IWUSR},
    123	.store = dlm_control_store
    124};
    125
    126static struct dlm_attr dlm_attr_event = {
    127	.attr  = {.name = "event_done", .mode = S_IWUSR},
    128	.store = dlm_event_store
    129};
    130
    131static struct dlm_attr dlm_attr_id = {
    132	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
    133	.show  = dlm_id_show,
    134	.store = dlm_id_store
    135};
    136
    137static struct dlm_attr dlm_attr_nodir = {
    138	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
    139	.show  = dlm_nodir_show,
    140	.store = dlm_nodir_store
    141};
    142
    143static struct dlm_attr dlm_attr_recover_status = {
    144	.attr  = {.name = "recover_status", .mode = S_IRUGO},
    145	.show  = dlm_recover_status_show
    146};
    147
    148static struct dlm_attr dlm_attr_recover_nodeid = {
    149	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
    150	.show  = dlm_recover_nodeid_show
    151};
    152
    153static struct attribute *dlm_attrs[] = {
    154	&dlm_attr_control.attr,
    155	&dlm_attr_event.attr,
    156	&dlm_attr_id.attr,
    157	&dlm_attr_nodir.attr,
    158	&dlm_attr_recover_status.attr,
    159	&dlm_attr_recover_nodeid.attr,
    160	NULL,
    161};
    162ATTRIBUTE_GROUPS(dlm);
    163
    164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
    165			     char *buf)
    166{
    167	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
    168	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
    169	return a->show ? a->show(ls, buf) : 0;
    170}
    171
    172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
    173			      const char *buf, size_t len)
    174{
    175	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
    176	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
    177	return a->store ? a->store(ls, buf, len) : len;
    178}
    179
    180static void lockspace_kobj_release(struct kobject *k)
    181{
    182	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
    183	kfree(ls);
    184}
    185
    186static const struct sysfs_ops dlm_attr_ops = {
    187	.show  = dlm_attr_show,
    188	.store = dlm_attr_store,
    189};
    190
    191static struct kobj_type dlm_ktype = {
    192	.default_groups = dlm_groups,
    193	.sysfs_ops     = &dlm_attr_ops,
    194	.release       = lockspace_kobj_release,
    195};
    196
    197static struct kset *dlm_kset;
    198
    199static int do_uevent(struct dlm_ls *ls, int in)
    200{
    201	if (in)
    202		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
    203	else
    204		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
    205
    206	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
    207
    208	/* dlm_controld will see the uevent, do the necessary group management
    209	   and then write to sysfs to wake us */
    210
    211	wait_event(ls->ls_uevent_wait,
    212		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
    213
    214	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
    215
    216	return ls->ls_uevent_result;
    217}
    218
    219static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
    220{
    221	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
    222
    223	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
    224	return 0;
    225}
    226
    227static const struct kset_uevent_ops dlm_uevent_ops = {
    228	.uevent = dlm_uevent,
    229};
    230
    231int __init dlm_lockspace_init(void)
    232{
    233	ls_count = 0;
    234	mutex_init(&ls_lock);
    235	INIT_LIST_HEAD(&lslist);
    236	spin_lock_init(&lslist_lock);
    237
    238	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
    239	if (!dlm_kset) {
    240		printk(KERN_WARNING "%s: can not create kset\n", __func__);
    241		return -ENOMEM;
    242	}
    243	return 0;
    244}
    245
    246void dlm_lockspace_exit(void)
    247{
    248	kset_unregister(dlm_kset);
    249}
    250
    251static struct dlm_ls *find_ls_to_scan(void)
    252{
    253	struct dlm_ls *ls;
    254
    255	spin_lock(&lslist_lock);
    256	list_for_each_entry(ls, &lslist, ls_list) {
    257		if (time_after_eq(jiffies, ls->ls_scan_time +
    258					    dlm_config.ci_scan_secs * HZ)) {
    259			spin_unlock(&lslist_lock);
    260			return ls;
    261		}
    262	}
    263	spin_unlock(&lslist_lock);
    264	return NULL;
    265}
    266
    267static int dlm_scand(void *data)
    268{
    269	struct dlm_ls *ls;
    270
    271	while (!kthread_should_stop()) {
    272		ls = find_ls_to_scan();
    273		if (ls) {
    274			if (dlm_lock_recovery_try(ls)) {
    275				ls->ls_scan_time = jiffies;
    276				dlm_scan_rsbs(ls);
    277				dlm_scan_timeout(ls);
    278				dlm_scan_waiters(ls);
    279				dlm_unlock_recovery(ls);
    280			} else {
    281				ls->ls_scan_time += HZ;
    282			}
    283			continue;
    284		}
    285		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
    286	}
    287	return 0;
    288}
    289
    290static int dlm_scand_start(void)
    291{
    292	struct task_struct *p;
    293	int error = 0;
    294
    295	p = kthread_run(dlm_scand, NULL, "dlm_scand");
    296	if (IS_ERR(p))
    297		error = PTR_ERR(p);
    298	else
    299		scand_task = p;
    300	return error;
    301}
    302
    303static void dlm_scand_stop(void)
    304{
    305	kthread_stop(scand_task);
    306}
    307
    308struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
    309{
    310	struct dlm_ls *ls;
    311
    312	spin_lock(&lslist_lock);
    313
    314	list_for_each_entry(ls, &lslist, ls_list) {
    315		if (ls->ls_global_id == id) {
    316			atomic_inc(&ls->ls_count);
    317			goto out;
    318		}
    319	}
    320	ls = NULL;
    321 out:
    322	spin_unlock(&lslist_lock);
    323	return ls;
    324}
    325
    326struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
    327{
    328	struct dlm_ls *ls;
    329
    330	spin_lock(&lslist_lock);
    331	list_for_each_entry(ls, &lslist, ls_list) {
    332		if (ls->ls_local_handle == lockspace) {
    333			atomic_inc(&ls->ls_count);
    334			goto out;
    335		}
    336	}
    337	ls = NULL;
    338 out:
    339	spin_unlock(&lslist_lock);
    340	return ls;
    341}
    342
    343struct dlm_ls *dlm_find_lockspace_device(int minor)
    344{
    345	struct dlm_ls *ls;
    346
    347	spin_lock(&lslist_lock);
    348	list_for_each_entry(ls, &lslist, ls_list) {
    349		if (ls->ls_device.minor == minor) {
    350			atomic_inc(&ls->ls_count);
    351			goto out;
    352		}
    353	}
    354	ls = NULL;
    355 out:
    356	spin_unlock(&lslist_lock);
    357	return ls;
    358}
    359
    360void dlm_put_lockspace(struct dlm_ls *ls)
    361{
    362	if (atomic_dec_and_test(&ls->ls_count))
    363		wake_up(&ls->ls_count_wait);
    364}
    365
    366static void remove_lockspace(struct dlm_ls *ls)
    367{
    368retry:
    369	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
    370
    371	spin_lock(&lslist_lock);
    372	if (atomic_read(&ls->ls_count) != 0) {
    373		spin_unlock(&lslist_lock);
    374		goto retry;
    375	}
    376
    377	WARN_ON(ls->ls_create_count != 0);
    378	list_del(&ls->ls_list);
    379	spin_unlock(&lslist_lock);
    380}
    381
    382static int threads_start(void)
    383{
    384	int error;
    385
    386	error = dlm_scand_start();
    387	if (error) {
    388		log_print("cannot start dlm_scand thread %d", error);
    389		goto fail;
    390	}
    391
    392	/* Thread for sending/receiving messages for all lockspace's */
    393	error = dlm_midcomms_start();
    394	if (error) {
    395		log_print("cannot start dlm lowcomms %d", error);
    396		goto scand_fail;
    397	}
    398
    399	return 0;
    400
    401 scand_fail:
    402	dlm_scand_stop();
    403 fail:
    404	return error;
    405}
    406
    407static int new_lockspace(const char *name, const char *cluster,
    408			 uint32_t flags, int lvblen,
    409			 const struct dlm_lockspace_ops *ops, void *ops_arg,
    410			 int *ops_result, dlm_lockspace_t **lockspace)
    411{
    412	struct dlm_ls *ls;
    413	int i, size, error;
    414	int do_unreg = 0;
    415	int namelen = strlen(name);
    416
    417	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
    418		return -EINVAL;
    419
    420	if (!lvblen || (lvblen % 8))
    421		return -EINVAL;
    422
    423	if (!try_module_get(THIS_MODULE))
    424		return -EINVAL;
    425
    426	if (!dlm_user_daemon_available()) {
    427		log_print("dlm user daemon not available");
    428		error = -EUNATCH;
    429		goto out;
    430	}
    431
    432	if (ops && ops_result) {
    433	       	if (!dlm_config.ci_recover_callbacks)
    434			*ops_result = -EOPNOTSUPP;
    435		else
    436			*ops_result = 0;
    437	}
    438
    439	if (!cluster)
    440		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
    441			  dlm_config.ci_cluster_name);
    442
    443	if (dlm_config.ci_recover_callbacks && cluster &&
    444	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
    445		log_print("dlm cluster name '%s' does not match "
    446			  "the application cluster name '%s'",
    447			  dlm_config.ci_cluster_name, cluster);
    448		error = -EBADR;
    449		goto out;
    450	}
    451
    452	error = 0;
    453
    454	spin_lock(&lslist_lock);
    455	list_for_each_entry(ls, &lslist, ls_list) {
    456		WARN_ON(ls->ls_create_count <= 0);
    457		if (ls->ls_namelen != namelen)
    458			continue;
    459		if (memcmp(ls->ls_name, name, namelen))
    460			continue;
    461		if (flags & DLM_LSFL_NEWEXCL) {
    462			error = -EEXIST;
    463			break;
    464		}
    465		ls->ls_create_count++;
    466		*lockspace = ls;
    467		error = 1;
    468		break;
    469	}
    470	spin_unlock(&lslist_lock);
    471
    472	if (error)
    473		goto out;
    474
    475	error = -ENOMEM;
    476
    477	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
    478	if (!ls)
    479		goto out;
    480	memcpy(ls->ls_name, name, namelen);
    481	ls->ls_namelen = namelen;
    482	ls->ls_lvblen = lvblen;
    483	atomic_set(&ls->ls_count, 0);
    484	init_waitqueue_head(&ls->ls_count_wait);
    485	ls->ls_flags = 0;
    486	ls->ls_scan_time = jiffies;
    487
    488	if (ops && dlm_config.ci_recover_callbacks) {
    489		ls->ls_ops = ops;
    490		ls->ls_ops_arg = ops_arg;
    491	}
    492
    493	if (flags & DLM_LSFL_TIMEWARN)
    494		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
    495
    496	/* ls_exflags are forced to match among nodes, and we don't
    497	   need to require all nodes to have some flags set */
    498	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
    499				    DLM_LSFL_NEWEXCL));
    500
    501	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
    502	ls->ls_rsbtbl_size = size;
    503
    504	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
    505	if (!ls->ls_rsbtbl)
    506		goto out_lsfree;
    507	for (i = 0; i < size; i++) {
    508		ls->ls_rsbtbl[i].keep.rb_node = NULL;
    509		ls->ls_rsbtbl[i].toss.rb_node = NULL;
    510		spin_lock_init(&ls->ls_rsbtbl[i].lock);
    511	}
    512
    513	spin_lock_init(&ls->ls_remove_spin);
    514	init_waitqueue_head(&ls->ls_remove_wait);
    515
    516	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
    517		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
    518						 GFP_KERNEL);
    519		if (!ls->ls_remove_names[i])
    520			goto out_rsbtbl;
    521	}
    522
    523	idr_init(&ls->ls_lkbidr);
    524	spin_lock_init(&ls->ls_lkbidr_spin);
    525
    526	INIT_LIST_HEAD(&ls->ls_waiters);
    527	mutex_init(&ls->ls_waiters_mutex);
    528	INIT_LIST_HEAD(&ls->ls_orphans);
    529	mutex_init(&ls->ls_orphans_mutex);
    530	INIT_LIST_HEAD(&ls->ls_timeout);
    531	mutex_init(&ls->ls_timeout_mutex);
    532
    533	INIT_LIST_HEAD(&ls->ls_new_rsb);
    534	spin_lock_init(&ls->ls_new_rsb_spin);
    535
    536	INIT_LIST_HEAD(&ls->ls_nodes);
    537	INIT_LIST_HEAD(&ls->ls_nodes_gone);
    538	ls->ls_num_nodes = 0;
    539	ls->ls_low_nodeid = 0;
    540	ls->ls_total_weight = 0;
    541	ls->ls_node_array = NULL;
    542
    543	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
    544	ls->ls_stub_rsb.res_ls = ls;
    545
    546	ls->ls_debug_rsb_dentry = NULL;
    547	ls->ls_debug_waiters_dentry = NULL;
    548
    549	init_waitqueue_head(&ls->ls_uevent_wait);
    550	ls->ls_uevent_result = 0;
    551	init_completion(&ls->ls_members_done);
    552	ls->ls_members_result = -1;
    553
    554	mutex_init(&ls->ls_cb_mutex);
    555	INIT_LIST_HEAD(&ls->ls_cb_delay);
    556
    557	ls->ls_recoverd_task = NULL;
    558	mutex_init(&ls->ls_recoverd_active);
    559	spin_lock_init(&ls->ls_recover_lock);
    560	spin_lock_init(&ls->ls_rcom_spin);
    561	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
    562	ls->ls_recover_status = 0;
    563	ls->ls_recover_seq = 0;
    564	ls->ls_recover_args = NULL;
    565	init_rwsem(&ls->ls_in_recovery);
    566	init_rwsem(&ls->ls_recv_active);
    567	INIT_LIST_HEAD(&ls->ls_requestqueue);
    568	atomic_set(&ls->ls_requestqueue_cnt, 0);
    569	init_waitqueue_head(&ls->ls_requestqueue_wait);
    570	mutex_init(&ls->ls_requestqueue_mutex);
    571	mutex_init(&ls->ls_clear_proc_locks);
    572
    573	/* Due backwards compatibility with 3.1 we need to use maximum
    574	 * possible dlm message size to be sure the message will fit and
    575	 * not having out of bounds issues. However on sending side 3.2
    576	 * might send less.
    577	 */
    578	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
    579	if (!ls->ls_recover_buf)
    580		goto out_lkbidr;
    581
    582	ls->ls_slot = 0;
    583	ls->ls_num_slots = 0;
    584	ls->ls_slots_size = 0;
    585	ls->ls_slots = NULL;
    586
    587	INIT_LIST_HEAD(&ls->ls_recover_list);
    588	spin_lock_init(&ls->ls_recover_list_lock);
    589	idr_init(&ls->ls_recover_idr);
    590	spin_lock_init(&ls->ls_recover_idr_lock);
    591	ls->ls_recover_list_count = 0;
    592	ls->ls_local_handle = ls;
    593	init_waitqueue_head(&ls->ls_wait_general);
    594	INIT_LIST_HEAD(&ls->ls_root_list);
    595	init_rwsem(&ls->ls_root_sem);
    596
    597	spin_lock(&lslist_lock);
    598	ls->ls_create_count = 1;
    599	list_add(&ls->ls_list, &lslist);
    600	spin_unlock(&lslist_lock);
    601
    602	if (flags & DLM_LSFL_FS) {
    603		error = dlm_callback_start(ls);
    604		if (error) {
    605			log_error(ls, "can't start dlm_callback %d", error);
    606			goto out_delist;
    607		}
    608	}
    609
    610	init_waitqueue_head(&ls->ls_recover_lock_wait);
    611
    612	/*
    613	 * Once started, dlm_recoverd first looks for ls in lslist, then
    614	 * initializes ls_in_recovery as locked in "down" mode.  We need
    615	 * to wait for the wakeup from dlm_recoverd because in_recovery
    616	 * has to start out in down mode.
    617	 */
    618
    619	error = dlm_recoverd_start(ls);
    620	if (error) {
    621		log_error(ls, "can't start dlm_recoverd %d", error);
    622		goto out_callback;
    623	}
    624
    625	wait_event(ls->ls_recover_lock_wait,
    626		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
    627
    628	/* let kobject handle freeing of ls if there's an error */
    629	do_unreg = 1;
    630
    631	ls->ls_kobj.kset = dlm_kset;
    632	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
    633				     "%s", ls->ls_name);
    634	if (error)
    635		goto out_recoverd;
    636	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
    637
    638	/* This uevent triggers dlm_controld in userspace to add us to the
    639	   group of nodes that are members of this lockspace (managed by the
    640	   cluster infrastructure.)  Once it's done that, it tells us who the
    641	   current lockspace members are (via configfs) and then tells the
    642	   lockspace to start running (via sysfs) in dlm_ls_start(). */
    643
    644	error = do_uevent(ls, 1);
    645	if (error)
    646		goto out_recoverd;
    647
    648	wait_for_completion(&ls->ls_members_done);
    649	error = ls->ls_members_result;
    650	if (error)
    651		goto out_members;
    652
    653	dlm_create_debug_file(ls);
    654
    655	log_rinfo(ls, "join complete");
    656	*lockspace = ls;
    657	return 0;
    658
    659 out_members:
    660	do_uevent(ls, 0);
    661	dlm_clear_members(ls);
    662	kfree(ls->ls_node_array);
    663 out_recoverd:
    664	dlm_recoverd_stop(ls);
    665 out_callback:
    666	dlm_callback_stop(ls);
    667 out_delist:
    668	spin_lock(&lslist_lock);
    669	list_del(&ls->ls_list);
    670	spin_unlock(&lslist_lock);
    671	idr_destroy(&ls->ls_recover_idr);
    672	kfree(ls->ls_recover_buf);
    673 out_lkbidr:
    674	idr_destroy(&ls->ls_lkbidr);
    675 out_rsbtbl:
    676	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
    677		kfree(ls->ls_remove_names[i]);
    678	vfree(ls->ls_rsbtbl);
    679 out_lsfree:
    680	if (do_unreg)
    681		kobject_put(&ls->ls_kobj);
    682	else
    683		kfree(ls);
    684 out:
    685	module_put(THIS_MODULE);
    686	return error;
    687}
    688
    689int dlm_new_lockspace(const char *name, const char *cluster,
    690		      uint32_t flags, int lvblen,
    691		      const struct dlm_lockspace_ops *ops, void *ops_arg,
    692		      int *ops_result, dlm_lockspace_t **lockspace)
    693{
    694	int error = 0;
    695
    696	mutex_lock(&ls_lock);
    697	if (!ls_count)
    698		error = threads_start();
    699	if (error)
    700		goto out;
    701
    702	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
    703			      ops_result, lockspace);
    704	if (!error)
    705		ls_count++;
    706	if (error > 0)
    707		error = 0;
    708	if (!ls_count) {
    709		dlm_scand_stop();
    710		dlm_midcomms_shutdown();
    711		dlm_lowcomms_stop();
    712	}
    713 out:
    714	mutex_unlock(&ls_lock);
    715	return error;
    716}
    717
    718static int lkb_idr_is_local(int id, void *p, void *data)
    719{
    720	struct dlm_lkb *lkb = p;
    721
    722	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
    723}
    724
    725static int lkb_idr_is_any(int id, void *p, void *data)
    726{
    727	return 1;
    728}
    729
    730static int lkb_idr_free(int id, void *p, void *data)
    731{
    732	struct dlm_lkb *lkb = p;
    733
    734	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
    735		dlm_free_lvb(lkb->lkb_lvbptr);
    736
    737	dlm_free_lkb(lkb);
    738	return 0;
    739}
    740
    741/* NOTE: We check the lkbidr here rather than the resource table.
    742   This is because there may be LKBs queued as ASTs that have been unlinked
    743   from their RSBs and are pending deletion once the AST has been delivered */
    744
    745static int lockspace_busy(struct dlm_ls *ls, int force)
    746{
    747	int rv;
    748
    749	spin_lock(&ls->ls_lkbidr_spin);
    750	if (force == 0) {
    751		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
    752	} else if (force == 1) {
    753		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
    754	} else {
    755		rv = 0;
    756	}
    757	spin_unlock(&ls->ls_lkbidr_spin);
    758	return rv;
    759}
    760
    761static int release_lockspace(struct dlm_ls *ls, int force)
    762{
    763	struct dlm_rsb *rsb;
    764	struct rb_node *n;
    765	int i, busy, rv;
    766
    767	busy = lockspace_busy(ls, force);
    768
    769	spin_lock(&lslist_lock);
    770	if (ls->ls_create_count == 1) {
    771		if (busy) {
    772			rv = -EBUSY;
    773		} else {
    774			/* remove_lockspace takes ls off lslist */
    775			ls->ls_create_count = 0;
    776			rv = 0;
    777		}
    778	} else if (ls->ls_create_count > 1) {
    779		rv = --ls->ls_create_count;
    780	} else {
    781		rv = -EINVAL;
    782	}
    783	spin_unlock(&lslist_lock);
    784
    785	if (rv) {
    786		log_debug(ls, "release_lockspace no remove %d", rv);
    787		return rv;
    788	}
    789
    790	dlm_device_deregister(ls);
    791
    792	if (force < 3 && dlm_user_daemon_available())
    793		do_uevent(ls, 0);
    794
    795	dlm_recoverd_stop(ls);
    796
    797	if (ls_count == 1) {
    798		dlm_scand_stop();
    799		dlm_clear_members(ls);
    800		dlm_midcomms_shutdown();
    801	}
    802
    803	dlm_callback_stop(ls);
    804
    805	remove_lockspace(ls);
    806
    807	dlm_delete_debug_file(ls);
    808
    809	idr_destroy(&ls->ls_recover_idr);
    810	kfree(ls->ls_recover_buf);
    811
    812	/*
    813	 * Free all lkb's in idr
    814	 */
    815
    816	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
    817	idr_destroy(&ls->ls_lkbidr);
    818
    819	/*
    820	 * Free all rsb's on rsbtbl[] lists
    821	 */
    822
    823	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
    824		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
    825			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
    826			rb_erase(n, &ls->ls_rsbtbl[i].keep);
    827			dlm_free_rsb(rsb);
    828		}
    829
    830		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
    831			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
    832			rb_erase(n, &ls->ls_rsbtbl[i].toss);
    833			dlm_free_rsb(rsb);
    834		}
    835	}
    836
    837	vfree(ls->ls_rsbtbl);
    838
    839	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
    840		kfree(ls->ls_remove_names[i]);
    841
    842	while (!list_empty(&ls->ls_new_rsb)) {
    843		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
    844				       res_hashchain);
    845		list_del(&rsb->res_hashchain);
    846		dlm_free_rsb(rsb);
    847	}
    848
    849	/*
    850	 * Free structures on any other lists
    851	 */
    852
    853	dlm_purge_requestqueue(ls);
    854	kfree(ls->ls_recover_args);
    855	dlm_clear_members(ls);
    856	dlm_clear_members_gone(ls);
    857	kfree(ls->ls_node_array);
    858	log_rinfo(ls, "release_lockspace final free");
    859	kobject_put(&ls->ls_kobj);
    860	/* The ls structure will be freed when the kobject is done with */
    861
    862	module_put(THIS_MODULE);
    863	return 0;
    864}
    865
    866/*
    867 * Called when a system has released all its locks and is not going to use the
    868 * lockspace any longer.  We free everything we're managing for this lockspace.
    869 * Remaining nodes will go through the recovery process as if we'd died.  The
    870 * lockspace must continue to function as usual, participating in recoveries,
    871 * until this returns.
    872 *
    873 * Force has 4 possible values:
    874 * 0 - don't destroy lockspace if it has any LKBs
    875 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
    876 * 2 - destroy lockspace regardless of LKBs
    877 * 3 - destroy lockspace as part of a forced shutdown
    878 */
    879
    880int dlm_release_lockspace(void *lockspace, int force)
    881{
    882	struct dlm_ls *ls;
    883	int error;
    884
    885	ls = dlm_find_lockspace_local(lockspace);
    886	if (!ls)
    887		return -EINVAL;
    888	dlm_put_lockspace(ls);
    889
    890	mutex_lock(&ls_lock);
    891	error = release_lockspace(ls, force);
    892	if (!error)
    893		ls_count--;
    894	if (!ls_count)
    895		dlm_lowcomms_stop();
    896	mutex_unlock(&ls_lock);
    897
    898	return error;
    899}
    900
    901void dlm_stop_lockspaces(void)
    902{
    903	struct dlm_ls *ls;
    904	int count;
    905
    906 restart:
    907	count = 0;
    908	spin_lock(&lslist_lock);
    909	list_for_each_entry(ls, &lslist, ls_list) {
    910		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
    911			count++;
    912			continue;
    913		}
    914		spin_unlock(&lslist_lock);
    915		log_error(ls, "no userland control daemon, stopping lockspace");
    916		dlm_ls_stop(ls);
    917		goto restart;
    918	}
    919	spin_unlock(&lslist_lock);
    920
    921	if (count)
    922		log_print("dlm user daemon left %d lockspaces", count);
    923}
    924
    925void dlm_stop_lockspaces_check(void)
    926{
    927	struct dlm_ls *ls;
    928
    929	spin_lock(&lslist_lock);
    930	list_for_each_entry(ls, &lslist, ls_list) {
    931		if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
    932			    !dlm_locking_stopped(ls)))
    933			break;
    934	}
    935	spin_unlock(&lslist_lock);
    936}