lockspace.c (21527B)
1// SPDX-License-Identifier: GPL-2.0-only 2/****************************************************************************** 3******************************************************************************* 4** 5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7** 8** 9******************************************************************************* 10******************************************************************************/ 11 12#include <linux/module.h> 13 14#include "dlm_internal.h" 15#include "lockspace.h" 16#include "member.h" 17#include "recoverd.h" 18#include "dir.h" 19#include "midcomms.h" 20#include "lowcomms.h" 21#include "config.h" 22#include "memory.h" 23#include "lock.h" 24#include "recover.h" 25#include "requestqueue.h" 26#include "user.h" 27#include "ast.h" 28 29static int ls_count; 30static struct mutex ls_lock; 31static struct list_head lslist; 32static spinlock_t lslist_lock; 33static struct task_struct * scand_task; 34 35 36static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 37{ 38 ssize_t ret = len; 39 int n; 40 int rc = kstrtoint(buf, 0, &n); 41 42 if (rc) 43 return rc; 44 ls = dlm_find_lockspace_local(ls->ls_local_handle); 45 if (!ls) 46 return -EINVAL; 47 48 switch (n) { 49 case 0: 50 dlm_ls_stop(ls); 51 break; 52 case 1: 53 dlm_ls_start(ls); 54 break; 55 default: 56 ret = -EINVAL; 57 } 58 dlm_put_lockspace(ls); 59 return ret; 60} 61 62static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 63{ 64 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 65 66 if (rc) 67 return rc; 68 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 69 wake_up(&ls->ls_uevent_wait); 70 return len; 71} 72 73static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 74{ 75 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 76} 77 78static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 79{ 80 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 81 82 if (rc) 83 return rc; 84 return len; 85} 86 87static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 88{ 89 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 90} 91 92static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 93{ 94 int val; 95 int rc = kstrtoint(buf, 0, &val); 96 97 if (rc) 98 return rc; 99 if (val == 1) 100 set_bit(LSFL_NODIR, &ls->ls_flags); 101 return len; 102} 103 104static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 105{ 106 uint32_t status = dlm_recover_status(ls); 107 return snprintf(buf, PAGE_SIZE, "%x\n", status); 108} 109 110static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 111{ 112 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 113} 114 115struct dlm_attr { 116 struct attribute attr; 117 ssize_t (*show)(struct dlm_ls *, char *); 118 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 119}; 120 121static struct dlm_attr dlm_attr_control = { 122 .attr = {.name = "control", .mode = S_IWUSR}, 123 .store = dlm_control_store 124}; 125 126static struct dlm_attr dlm_attr_event = { 127 .attr = {.name = "event_done", .mode = S_IWUSR}, 128 .store = dlm_event_store 129}; 130 131static struct dlm_attr dlm_attr_id = { 132 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 133 .show = dlm_id_show, 134 .store = dlm_id_store 135}; 136 137static struct dlm_attr dlm_attr_nodir = { 138 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 139 .show = dlm_nodir_show, 140 .store = dlm_nodir_store 141}; 142 143static struct dlm_attr dlm_attr_recover_status = { 144 .attr = {.name = "recover_status", .mode = S_IRUGO}, 145 .show = dlm_recover_status_show 146}; 147 148static struct dlm_attr dlm_attr_recover_nodeid = { 149 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 150 .show = dlm_recover_nodeid_show 151}; 152 153static struct attribute *dlm_attrs[] = { 154 &dlm_attr_control.attr, 155 &dlm_attr_event.attr, 156 &dlm_attr_id.attr, 157 &dlm_attr_nodir.attr, 158 &dlm_attr_recover_status.attr, 159 &dlm_attr_recover_nodeid.attr, 160 NULL, 161}; 162ATTRIBUTE_GROUPS(dlm); 163 164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 165 char *buf) 166{ 167 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 168 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 169 return a->show ? a->show(ls, buf) : 0; 170} 171 172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 173 const char *buf, size_t len) 174{ 175 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 176 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 177 return a->store ? a->store(ls, buf, len) : len; 178} 179 180static void lockspace_kobj_release(struct kobject *k) 181{ 182 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 183 kfree(ls); 184} 185 186static const struct sysfs_ops dlm_attr_ops = { 187 .show = dlm_attr_show, 188 .store = dlm_attr_store, 189}; 190 191static struct kobj_type dlm_ktype = { 192 .default_groups = dlm_groups, 193 .sysfs_ops = &dlm_attr_ops, 194 .release = lockspace_kobj_release, 195}; 196 197static struct kset *dlm_kset; 198 199static int do_uevent(struct dlm_ls *ls, int in) 200{ 201 if (in) 202 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 203 else 204 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 205 206 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 207 208 /* dlm_controld will see the uevent, do the necessary group management 209 and then write to sysfs to wake us */ 210 211 wait_event(ls->ls_uevent_wait, 212 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 213 214 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 215 216 return ls->ls_uevent_result; 217} 218 219static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env) 220{ 221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 222 223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 224 return 0; 225} 226 227static const struct kset_uevent_ops dlm_uevent_ops = { 228 .uevent = dlm_uevent, 229}; 230 231int __init dlm_lockspace_init(void) 232{ 233 ls_count = 0; 234 mutex_init(&ls_lock); 235 INIT_LIST_HEAD(&lslist); 236 spin_lock_init(&lslist_lock); 237 238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 239 if (!dlm_kset) { 240 printk(KERN_WARNING "%s: can not create kset\n", __func__); 241 return -ENOMEM; 242 } 243 return 0; 244} 245 246void dlm_lockspace_exit(void) 247{ 248 kset_unregister(dlm_kset); 249} 250 251static struct dlm_ls *find_ls_to_scan(void) 252{ 253 struct dlm_ls *ls; 254 255 spin_lock(&lslist_lock); 256 list_for_each_entry(ls, &lslist, ls_list) { 257 if (time_after_eq(jiffies, ls->ls_scan_time + 258 dlm_config.ci_scan_secs * HZ)) { 259 spin_unlock(&lslist_lock); 260 return ls; 261 } 262 } 263 spin_unlock(&lslist_lock); 264 return NULL; 265} 266 267static int dlm_scand(void *data) 268{ 269 struct dlm_ls *ls; 270 271 while (!kthread_should_stop()) { 272 ls = find_ls_to_scan(); 273 if (ls) { 274 if (dlm_lock_recovery_try(ls)) { 275 ls->ls_scan_time = jiffies; 276 dlm_scan_rsbs(ls); 277 dlm_scan_timeout(ls); 278 dlm_scan_waiters(ls); 279 dlm_unlock_recovery(ls); 280 } else { 281 ls->ls_scan_time += HZ; 282 } 283 continue; 284 } 285 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); 286 } 287 return 0; 288} 289 290static int dlm_scand_start(void) 291{ 292 struct task_struct *p; 293 int error = 0; 294 295 p = kthread_run(dlm_scand, NULL, "dlm_scand"); 296 if (IS_ERR(p)) 297 error = PTR_ERR(p); 298 else 299 scand_task = p; 300 return error; 301} 302 303static void dlm_scand_stop(void) 304{ 305 kthread_stop(scand_task); 306} 307 308struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 309{ 310 struct dlm_ls *ls; 311 312 spin_lock(&lslist_lock); 313 314 list_for_each_entry(ls, &lslist, ls_list) { 315 if (ls->ls_global_id == id) { 316 atomic_inc(&ls->ls_count); 317 goto out; 318 } 319 } 320 ls = NULL; 321 out: 322 spin_unlock(&lslist_lock); 323 return ls; 324} 325 326struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 327{ 328 struct dlm_ls *ls; 329 330 spin_lock(&lslist_lock); 331 list_for_each_entry(ls, &lslist, ls_list) { 332 if (ls->ls_local_handle == lockspace) { 333 atomic_inc(&ls->ls_count); 334 goto out; 335 } 336 } 337 ls = NULL; 338 out: 339 spin_unlock(&lslist_lock); 340 return ls; 341} 342 343struct dlm_ls *dlm_find_lockspace_device(int minor) 344{ 345 struct dlm_ls *ls; 346 347 spin_lock(&lslist_lock); 348 list_for_each_entry(ls, &lslist, ls_list) { 349 if (ls->ls_device.minor == minor) { 350 atomic_inc(&ls->ls_count); 351 goto out; 352 } 353 } 354 ls = NULL; 355 out: 356 spin_unlock(&lslist_lock); 357 return ls; 358} 359 360void dlm_put_lockspace(struct dlm_ls *ls) 361{ 362 if (atomic_dec_and_test(&ls->ls_count)) 363 wake_up(&ls->ls_count_wait); 364} 365 366static void remove_lockspace(struct dlm_ls *ls) 367{ 368retry: 369 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); 370 371 spin_lock(&lslist_lock); 372 if (atomic_read(&ls->ls_count) != 0) { 373 spin_unlock(&lslist_lock); 374 goto retry; 375 } 376 377 WARN_ON(ls->ls_create_count != 0); 378 list_del(&ls->ls_list); 379 spin_unlock(&lslist_lock); 380} 381 382static int threads_start(void) 383{ 384 int error; 385 386 error = dlm_scand_start(); 387 if (error) { 388 log_print("cannot start dlm_scand thread %d", error); 389 goto fail; 390 } 391 392 /* Thread for sending/receiving messages for all lockspace's */ 393 error = dlm_midcomms_start(); 394 if (error) { 395 log_print("cannot start dlm lowcomms %d", error); 396 goto scand_fail; 397 } 398 399 return 0; 400 401 scand_fail: 402 dlm_scand_stop(); 403 fail: 404 return error; 405} 406 407static int new_lockspace(const char *name, const char *cluster, 408 uint32_t flags, int lvblen, 409 const struct dlm_lockspace_ops *ops, void *ops_arg, 410 int *ops_result, dlm_lockspace_t **lockspace) 411{ 412 struct dlm_ls *ls; 413 int i, size, error; 414 int do_unreg = 0; 415 int namelen = strlen(name); 416 417 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 418 return -EINVAL; 419 420 if (!lvblen || (lvblen % 8)) 421 return -EINVAL; 422 423 if (!try_module_get(THIS_MODULE)) 424 return -EINVAL; 425 426 if (!dlm_user_daemon_available()) { 427 log_print("dlm user daemon not available"); 428 error = -EUNATCH; 429 goto out; 430 } 431 432 if (ops && ops_result) { 433 if (!dlm_config.ci_recover_callbacks) 434 *ops_result = -EOPNOTSUPP; 435 else 436 *ops_result = 0; 437 } 438 439 if (!cluster) 440 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 441 dlm_config.ci_cluster_name); 442 443 if (dlm_config.ci_recover_callbacks && cluster && 444 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 445 log_print("dlm cluster name '%s' does not match " 446 "the application cluster name '%s'", 447 dlm_config.ci_cluster_name, cluster); 448 error = -EBADR; 449 goto out; 450 } 451 452 error = 0; 453 454 spin_lock(&lslist_lock); 455 list_for_each_entry(ls, &lslist, ls_list) { 456 WARN_ON(ls->ls_create_count <= 0); 457 if (ls->ls_namelen != namelen) 458 continue; 459 if (memcmp(ls->ls_name, name, namelen)) 460 continue; 461 if (flags & DLM_LSFL_NEWEXCL) { 462 error = -EEXIST; 463 break; 464 } 465 ls->ls_create_count++; 466 *lockspace = ls; 467 error = 1; 468 break; 469 } 470 spin_unlock(&lslist_lock); 471 472 if (error) 473 goto out; 474 475 error = -ENOMEM; 476 477 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS); 478 if (!ls) 479 goto out; 480 memcpy(ls->ls_name, name, namelen); 481 ls->ls_namelen = namelen; 482 ls->ls_lvblen = lvblen; 483 atomic_set(&ls->ls_count, 0); 484 init_waitqueue_head(&ls->ls_count_wait); 485 ls->ls_flags = 0; 486 ls->ls_scan_time = jiffies; 487 488 if (ops && dlm_config.ci_recover_callbacks) { 489 ls->ls_ops = ops; 490 ls->ls_ops_arg = ops_arg; 491 } 492 493 if (flags & DLM_LSFL_TIMEWARN) 494 set_bit(LSFL_TIMEWARN, &ls->ls_flags); 495 496 /* ls_exflags are forced to match among nodes, and we don't 497 need to require all nodes to have some flags set */ 498 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS | 499 DLM_LSFL_NEWEXCL)); 500 501 size = READ_ONCE(dlm_config.ci_rsbtbl_size); 502 ls->ls_rsbtbl_size = size; 503 504 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable))); 505 if (!ls->ls_rsbtbl) 506 goto out_lsfree; 507 for (i = 0; i < size; i++) { 508 ls->ls_rsbtbl[i].keep.rb_node = NULL; 509 ls->ls_rsbtbl[i].toss.rb_node = NULL; 510 spin_lock_init(&ls->ls_rsbtbl[i].lock); 511 } 512 513 spin_lock_init(&ls->ls_remove_spin); 514 init_waitqueue_head(&ls->ls_remove_wait); 515 516 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { 517 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, 518 GFP_KERNEL); 519 if (!ls->ls_remove_names[i]) 520 goto out_rsbtbl; 521 } 522 523 idr_init(&ls->ls_lkbidr); 524 spin_lock_init(&ls->ls_lkbidr_spin); 525 526 INIT_LIST_HEAD(&ls->ls_waiters); 527 mutex_init(&ls->ls_waiters_mutex); 528 INIT_LIST_HEAD(&ls->ls_orphans); 529 mutex_init(&ls->ls_orphans_mutex); 530 INIT_LIST_HEAD(&ls->ls_timeout); 531 mutex_init(&ls->ls_timeout_mutex); 532 533 INIT_LIST_HEAD(&ls->ls_new_rsb); 534 spin_lock_init(&ls->ls_new_rsb_spin); 535 536 INIT_LIST_HEAD(&ls->ls_nodes); 537 INIT_LIST_HEAD(&ls->ls_nodes_gone); 538 ls->ls_num_nodes = 0; 539 ls->ls_low_nodeid = 0; 540 ls->ls_total_weight = 0; 541 ls->ls_node_array = NULL; 542 543 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb)); 544 ls->ls_stub_rsb.res_ls = ls; 545 546 ls->ls_debug_rsb_dentry = NULL; 547 ls->ls_debug_waiters_dentry = NULL; 548 549 init_waitqueue_head(&ls->ls_uevent_wait); 550 ls->ls_uevent_result = 0; 551 init_completion(&ls->ls_members_done); 552 ls->ls_members_result = -1; 553 554 mutex_init(&ls->ls_cb_mutex); 555 INIT_LIST_HEAD(&ls->ls_cb_delay); 556 557 ls->ls_recoverd_task = NULL; 558 mutex_init(&ls->ls_recoverd_active); 559 spin_lock_init(&ls->ls_recover_lock); 560 spin_lock_init(&ls->ls_rcom_spin); 561 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 562 ls->ls_recover_status = 0; 563 ls->ls_recover_seq = 0; 564 ls->ls_recover_args = NULL; 565 init_rwsem(&ls->ls_in_recovery); 566 init_rwsem(&ls->ls_recv_active); 567 INIT_LIST_HEAD(&ls->ls_requestqueue); 568 atomic_set(&ls->ls_requestqueue_cnt, 0); 569 init_waitqueue_head(&ls->ls_requestqueue_wait); 570 mutex_init(&ls->ls_requestqueue_mutex); 571 mutex_init(&ls->ls_clear_proc_locks); 572 573 /* Due backwards compatibility with 3.1 we need to use maximum 574 * possible dlm message size to be sure the message will fit and 575 * not having out of bounds issues. However on sending side 3.2 576 * might send less. 577 */ 578 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 579 if (!ls->ls_recover_buf) 580 goto out_lkbidr; 581 582 ls->ls_slot = 0; 583 ls->ls_num_slots = 0; 584 ls->ls_slots_size = 0; 585 ls->ls_slots = NULL; 586 587 INIT_LIST_HEAD(&ls->ls_recover_list); 588 spin_lock_init(&ls->ls_recover_list_lock); 589 idr_init(&ls->ls_recover_idr); 590 spin_lock_init(&ls->ls_recover_idr_lock); 591 ls->ls_recover_list_count = 0; 592 ls->ls_local_handle = ls; 593 init_waitqueue_head(&ls->ls_wait_general); 594 INIT_LIST_HEAD(&ls->ls_root_list); 595 init_rwsem(&ls->ls_root_sem); 596 597 spin_lock(&lslist_lock); 598 ls->ls_create_count = 1; 599 list_add(&ls->ls_list, &lslist); 600 spin_unlock(&lslist_lock); 601 602 if (flags & DLM_LSFL_FS) { 603 error = dlm_callback_start(ls); 604 if (error) { 605 log_error(ls, "can't start dlm_callback %d", error); 606 goto out_delist; 607 } 608 } 609 610 init_waitqueue_head(&ls->ls_recover_lock_wait); 611 612 /* 613 * Once started, dlm_recoverd first looks for ls in lslist, then 614 * initializes ls_in_recovery as locked in "down" mode. We need 615 * to wait for the wakeup from dlm_recoverd because in_recovery 616 * has to start out in down mode. 617 */ 618 619 error = dlm_recoverd_start(ls); 620 if (error) { 621 log_error(ls, "can't start dlm_recoverd %d", error); 622 goto out_callback; 623 } 624 625 wait_event(ls->ls_recover_lock_wait, 626 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 627 628 /* let kobject handle freeing of ls if there's an error */ 629 do_unreg = 1; 630 631 ls->ls_kobj.kset = dlm_kset; 632 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 633 "%s", ls->ls_name); 634 if (error) 635 goto out_recoverd; 636 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 637 638 /* This uevent triggers dlm_controld in userspace to add us to the 639 group of nodes that are members of this lockspace (managed by the 640 cluster infrastructure.) Once it's done that, it tells us who the 641 current lockspace members are (via configfs) and then tells the 642 lockspace to start running (via sysfs) in dlm_ls_start(). */ 643 644 error = do_uevent(ls, 1); 645 if (error) 646 goto out_recoverd; 647 648 wait_for_completion(&ls->ls_members_done); 649 error = ls->ls_members_result; 650 if (error) 651 goto out_members; 652 653 dlm_create_debug_file(ls); 654 655 log_rinfo(ls, "join complete"); 656 *lockspace = ls; 657 return 0; 658 659 out_members: 660 do_uevent(ls, 0); 661 dlm_clear_members(ls); 662 kfree(ls->ls_node_array); 663 out_recoverd: 664 dlm_recoverd_stop(ls); 665 out_callback: 666 dlm_callback_stop(ls); 667 out_delist: 668 spin_lock(&lslist_lock); 669 list_del(&ls->ls_list); 670 spin_unlock(&lslist_lock); 671 idr_destroy(&ls->ls_recover_idr); 672 kfree(ls->ls_recover_buf); 673 out_lkbidr: 674 idr_destroy(&ls->ls_lkbidr); 675 out_rsbtbl: 676 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) 677 kfree(ls->ls_remove_names[i]); 678 vfree(ls->ls_rsbtbl); 679 out_lsfree: 680 if (do_unreg) 681 kobject_put(&ls->ls_kobj); 682 else 683 kfree(ls); 684 out: 685 module_put(THIS_MODULE); 686 return error; 687} 688 689int dlm_new_lockspace(const char *name, const char *cluster, 690 uint32_t flags, int lvblen, 691 const struct dlm_lockspace_ops *ops, void *ops_arg, 692 int *ops_result, dlm_lockspace_t **lockspace) 693{ 694 int error = 0; 695 696 mutex_lock(&ls_lock); 697 if (!ls_count) 698 error = threads_start(); 699 if (error) 700 goto out; 701 702 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 703 ops_result, lockspace); 704 if (!error) 705 ls_count++; 706 if (error > 0) 707 error = 0; 708 if (!ls_count) { 709 dlm_scand_stop(); 710 dlm_midcomms_shutdown(); 711 dlm_lowcomms_stop(); 712 } 713 out: 714 mutex_unlock(&ls_lock); 715 return error; 716} 717 718static int lkb_idr_is_local(int id, void *p, void *data) 719{ 720 struct dlm_lkb *lkb = p; 721 722 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; 723} 724 725static int lkb_idr_is_any(int id, void *p, void *data) 726{ 727 return 1; 728} 729 730static int lkb_idr_free(int id, void *p, void *data) 731{ 732 struct dlm_lkb *lkb = p; 733 734 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) 735 dlm_free_lvb(lkb->lkb_lvbptr); 736 737 dlm_free_lkb(lkb); 738 return 0; 739} 740 741/* NOTE: We check the lkbidr here rather than the resource table. 742 This is because there may be LKBs queued as ASTs that have been unlinked 743 from their RSBs and are pending deletion once the AST has been delivered */ 744 745static int lockspace_busy(struct dlm_ls *ls, int force) 746{ 747 int rv; 748 749 spin_lock(&ls->ls_lkbidr_spin); 750 if (force == 0) { 751 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 752 } else if (force == 1) { 753 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 754 } else { 755 rv = 0; 756 } 757 spin_unlock(&ls->ls_lkbidr_spin); 758 return rv; 759} 760 761static int release_lockspace(struct dlm_ls *ls, int force) 762{ 763 struct dlm_rsb *rsb; 764 struct rb_node *n; 765 int i, busy, rv; 766 767 busy = lockspace_busy(ls, force); 768 769 spin_lock(&lslist_lock); 770 if (ls->ls_create_count == 1) { 771 if (busy) { 772 rv = -EBUSY; 773 } else { 774 /* remove_lockspace takes ls off lslist */ 775 ls->ls_create_count = 0; 776 rv = 0; 777 } 778 } else if (ls->ls_create_count > 1) { 779 rv = --ls->ls_create_count; 780 } else { 781 rv = -EINVAL; 782 } 783 spin_unlock(&lslist_lock); 784 785 if (rv) { 786 log_debug(ls, "release_lockspace no remove %d", rv); 787 return rv; 788 } 789 790 dlm_device_deregister(ls); 791 792 if (force < 3 && dlm_user_daemon_available()) 793 do_uevent(ls, 0); 794 795 dlm_recoverd_stop(ls); 796 797 if (ls_count == 1) { 798 dlm_scand_stop(); 799 dlm_clear_members(ls); 800 dlm_midcomms_shutdown(); 801 } 802 803 dlm_callback_stop(ls); 804 805 remove_lockspace(ls); 806 807 dlm_delete_debug_file(ls); 808 809 idr_destroy(&ls->ls_recover_idr); 810 kfree(ls->ls_recover_buf); 811 812 /* 813 * Free all lkb's in idr 814 */ 815 816 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 817 idr_destroy(&ls->ls_lkbidr); 818 819 /* 820 * Free all rsb's on rsbtbl[] lists 821 */ 822 823 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 824 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { 825 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 826 rb_erase(n, &ls->ls_rsbtbl[i].keep); 827 dlm_free_rsb(rsb); 828 } 829 830 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { 831 rsb = rb_entry(n, struct dlm_rsb, res_hashnode); 832 rb_erase(n, &ls->ls_rsbtbl[i].toss); 833 dlm_free_rsb(rsb); 834 } 835 } 836 837 vfree(ls->ls_rsbtbl); 838 839 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) 840 kfree(ls->ls_remove_names[i]); 841 842 while (!list_empty(&ls->ls_new_rsb)) { 843 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 844 res_hashchain); 845 list_del(&rsb->res_hashchain); 846 dlm_free_rsb(rsb); 847 } 848 849 /* 850 * Free structures on any other lists 851 */ 852 853 dlm_purge_requestqueue(ls); 854 kfree(ls->ls_recover_args); 855 dlm_clear_members(ls); 856 dlm_clear_members_gone(ls); 857 kfree(ls->ls_node_array); 858 log_rinfo(ls, "release_lockspace final free"); 859 kobject_put(&ls->ls_kobj); 860 /* The ls structure will be freed when the kobject is done with */ 861 862 module_put(THIS_MODULE); 863 return 0; 864} 865 866/* 867 * Called when a system has released all its locks and is not going to use the 868 * lockspace any longer. We free everything we're managing for this lockspace. 869 * Remaining nodes will go through the recovery process as if we'd died. The 870 * lockspace must continue to function as usual, participating in recoveries, 871 * until this returns. 872 * 873 * Force has 4 possible values: 874 * 0 - don't destroy lockspace if it has any LKBs 875 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 876 * 2 - destroy lockspace regardless of LKBs 877 * 3 - destroy lockspace as part of a forced shutdown 878 */ 879 880int dlm_release_lockspace(void *lockspace, int force) 881{ 882 struct dlm_ls *ls; 883 int error; 884 885 ls = dlm_find_lockspace_local(lockspace); 886 if (!ls) 887 return -EINVAL; 888 dlm_put_lockspace(ls); 889 890 mutex_lock(&ls_lock); 891 error = release_lockspace(ls, force); 892 if (!error) 893 ls_count--; 894 if (!ls_count) 895 dlm_lowcomms_stop(); 896 mutex_unlock(&ls_lock); 897 898 return error; 899} 900 901void dlm_stop_lockspaces(void) 902{ 903 struct dlm_ls *ls; 904 int count; 905 906 restart: 907 count = 0; 908 spin_lock(&lslist_lock); 909 list_for_each_entry(ls, &lslist, ls_list) { 910 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 911 count++; 912 continue; 913 } 914 spin_unlock(&lslist_lock); 915 log_error(ls, "no userland control daemon, stopping lockspace"); 916 dlm_ls_stop(ls); 917 goto restart; 918 } 919 spin_unlock(&lslist_lock); 920 921 if (count) 922 log_print("dlm user daemon left %d lockspaces", count); 923} 924 925void dlm_stop_lockspaces_check(void) 926{ 927 struct dlm_ls *ls; 928 929 spin_lock(&lslist_lock); 930 list_for_each_entry(ls, &lslist, ls_list) { 931 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) || 932 !dlm_locking_stopped(ls))) 933 break; 934 } 935 spin_unlock(&lslist_lock); 936}