lock.c (166099B)
1// SPDX-License-Identifier: GPL-2.0-only 2/****************************************************************************** 3******************************************************************************* 4** 5** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 6** 7** 8******************************************************************************* 9******************************************************************************/ 10 11/* Central locking logic has four stages: 12 13 dlm_lock() 14 dlm_unlock() 15 16 request_lock(ls, lkb) 17 convert_lock(ls, lkb) 18 unlock_lock(ls, lkb) 19 cancel_lock(ls, lkb) 20 21 _request_lock(r, lkb) 22 _convert_lock(r, lkb) 23 _unlock_lock(r, lkb) 24 _cancel_lock(r, lkb) 25 26 do_request(r, lkb) 27 do_convert(r, lkb) 28 do_unlock(r, lkb) 29 do_cancel(r, lkb) 30 31 Stage 1 (lock, unlock) is mainly about checking input args and 32 splitting into one of the four main operations: 33 34 dlm_lock = request_lock 35 dlm_lock+CONVERT = convert_lock 36 dlm_unlock = unlock_lock 37 dlm_unlock+CANCEL = cancel_lock 38 39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 40 provided to the next stage. 41 42 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 43 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 44 45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 46 given rsb and lkb and queues callbacks. 47 48 For remote operations, send_xxxx() results in the corresponding do_xxxx() 49 function being executed on the remote node. The connecting send/receive 50 calls on local (L) and remote (R) nodes: 51 52 L: send_xxxx() -> R: receive_xxxx() 53 R: do_xxxx() 54 L: receive_xxxx_reply() <- R: send_xxxx_reply() 55*/ 56#include <trace/events/dlm.h> 57 58#include <linux/types.h> 59#include <linux/rbtree.h> 60#include <linux/slab.h> 61#include "dlm_internal.h" 62#include <linux/dlm_device.h> 63#include "memory.h" 64#include "midcomms.h" 65#include "requestqueue.h" 66#include "util.h" 67#include "dir.h" 68#include "member.h" 69#include "lockspace.h" 70#include "ast.h" 71#include "lock.h" 72#include "rcom.h" 73#include "recover.h" 74#include "lvb_table.h" 75#include "user.h" 76#include "config.h" 77 78static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85static int send_remove(struct dlm_rsb *r); 86static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 struct dlm_message *ms); 90static int receive_extralen(struct dlm_message *ms); 91static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92static void del_timeout(struct dlm_lkb *lkb); 93static void toss_rsb(struct kref *kref); 94 95/* 96 * Lock compatibilty matrix - thanks Steve 97 * UN = Unlocked state. Not really a state, used as a flag 98 * PD = Padding. Used to make the matrix a nice power of two in size 99 * Other states are the same as the VMS DLM. 100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 101 */ 102 103static const int __dlm_compat_matrix[8][8] = { 104 /* UN NL CR CW PR PW EX PD */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 113}; 114 115/* 116 * This defines the direction of transfer of LVB data. 117 * Granted mode is the row; requested mode is the column. 118 * Usage: matrix[grmode+1][rqmode+1] 119 * 1 = LVB is returned to the caller 120 * 0 = LVB is written to the resource 121 * -1 = nothing happens to the LVB 122 */ 123 124const int dlm_lvb_operations[8][8] = { 125 /* UN NL CR CW PR PW EX PD*/ 126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 134}; 135 136#define modes_compat(gr, rq) \ 137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 138 139int dlm_modes_compat(int mode1, int mode2) 140{ 141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 142} 143 144/* 145 * Compatibility matrix for conversions with QUECVT set. 146 * Granted mode is the row; requested mode is the column. 147 * Usage: matrix[grmode+1][rqmode+1] 148 */ 149 150static const int __quecvt_compat_matrix[8][8] = { 151 /* UN NL CR CW PR PW EX PD */ 152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 160}; 161 162void dlm_print_lkb(struct dlm_lkb *lkb) 163{ 164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 169 (unsigned long long)lkb->lkb_recover_seq); 170} 171 172static void dlm_print_rsb(struct dlm_rsb *r) 173{ 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 175 "rlc %d name %s\n", 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 178 r->res_name); 179} 180 181void dlm_dump_rsb(struct dlm_rsb *r) 182{ 183 struct dlm_lkb *lkb; 184 185 dlm_print_rsb(r); 186 187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 189 printk(KERN_ERR "rsb lookup list\n"); 190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 191 dlm_print_lkb(lkb); 192 printk(KERN_ERR "rsb grant queue:\n"); 193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 194 dlm_print_lkb(lkb); 195 printk(KERN_ERR "rsb convert queue:\n"); 196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 197 dlm_print_lkb(lkb); 198 printk(KERN_ERR "rsb wait queue:\n"); 199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 200 dlm_print_lkb(lkb); 201} 202 203/* Threads cannot use the lockspace while it's being recovered */ 204 205static inline void dlm_lock_recovery(struct dlm_ls *ls) 206{ 207 down_read(&ls->ls_in_recovery); 208} 209 210void dlm_unlock_recovery(struct dlm_ls *ls) 211{ 212 up_read(&ls->ls_in_recovery); 213} 214 215int dlm_lock_recovery_try(struct dlm_ls *ls) 216{ 217 return down_read_trylock(&ls->ls_in_recovery); 218} 219 220static inline int can_be_queued(struct dlm_lkb *lkb) 221{ 222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 223} 224 225static inline int force_blocking_asts(struct dlm_lkb *lkb) 226{ 227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 228} 229 230static inline int is_demoted(struct dlm_lkb *lkb) 231{ 232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 233} 234 235static inline int is_altmode(struct dlm_lkb *lkb) 236{ 237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 238} 239 240static inline int is_granted(struct dlm_lkb *lkb) 241{ 242 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 243} 244 245static inline int is_remote(struct dlm_rsb *r) 246{ 247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 248 return !!r->res_nodeid; 249} 250 251static inline int is_process_copy(struct dlm_lkb *lkb) 252{ 253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 254} 255 256static inline int is_master_copy(struct dlm_lkb *lkb) 257{ 258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 259} 260 261static inline int middle_conversion(struct dlm_lkb *lkb) 262{ 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267} 268 269static inline int down_conversion(struct dlm_lkb *lkb) 270{ 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272} 273 274static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275{ 276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 277} 278 279static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280{ 281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 282} 283 284static inline int is_overlap(struct dlm_lkb *lkb) 285{ 286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 287 DLM_IFL_OVERLAP_CANCEL)); 288} 289 290static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291{ 292 if (is_master_copy(lkb)) 293 return; 294 295 del_timeout(lkb); 296 297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 298 299 /* if the operation was a cancel, then return -DLM_ECANCEL, if a 300 timeout caused the cancel then return -ETIMEDOUT */ 301 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { 302 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; 303 rv = -ETIMEDOUT; 304 } 305 306 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { 307 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; 308 rv = -EDEADLK; 309 } 310 311 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 312} 313 314static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 315{ 316 queue_cast(r, lkb, 317 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 318} 319 320static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 321{ 322 if (is_master_copy(lkb)) { 323 send_bast(r, lkb, rqmode); 324 } else { 325 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 326 } 327} 328 329/* 330 * Basic operations on rsb's and lkb's 331 */ 332 333/* This is only called to add a reference when the code already holds 334 a valid reference to the rsb, so there's no need for locking. */ 335 336static inline void hold_rsb(struct dlm_rsb *r) 337{ 338 kref_get(&r->res_ref); 339} 340 341void dlm_hold_rsb(struct dlm_rsb *r) 342{ 343 hold_rsb(r); 344} 345 346/* When all references to the rsb are gone it's transferred to 347 the tossed list for later disposal. */ 348 349static void put_rsb(struct dlm_rsb *r) 350{ 351 struct dlm_ls *ls = r->res_ls; 352 uint32_t bucket = r->res_bucket; 353 int rv; 354 355 rv = kref_put_lock(&r->res_ref, toss_rsb, 356 &ls->ls_rsbtbl[bucket].lock); 357 if (rv) 358 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 359} 360 361void dlm_put_rsb(struct dlm_rsb *r) 362{ 363 put_rsb(r); 364} 365 366static int pre_rsb_struct(struct dlm_ls *ls) 367{ 368 struct dlm_rsb *r1, *r2; 369 int count = 0; 370 371 spin_lock(&ls->ls_new_rsb_spin); 372 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 373 spin_unlock(&ls->ls_new_rsb_spin); 374 return 0; 375 } 376 spin_unlock(&ls->ls_new_rsb_spin); 377 378 r1 = dlm_allocate_rsb(ls); 379 r2 = dlm_allocate_rsb(ls); 380 381 spin_lock(&ls->ls_new_rsb_spin); 382 if (r1) { 383 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 384 ls->ls_new_rsb_count++; 385 } 386 if (r2) { 387 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 388 ls->ls_new_rsb_count++; 389 } 390 count = ls->ls_new_rsb_count; 391 spin_unlock(&ls->ls_new_rsb_spin); 392 393 if (!count) 394 return -ENOMEM; 395 return 0; 396} 397 398/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 399 unlock any spinlocks, go back and call pre_rsb_struct again. 400 Otherwise, take an rsb off the list and return it. */ 401 402static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, 403 struct dlm_rsb **r_ret) 404{ 405 struct dlm_rsb *r; 406 int count; 407 408 spin_lock(&ls->ls_new_rsb_spin); 409 if (list_empty(&ls->ls_new_rsb)) { 410 count = ls->ls_new_rsb_count; 411 spin_unlock(&ls->ls_new_rsb_spin); 412 log_debug(ls, "find_rsb retry %d %d %s", 413 count, dlm_config.ci_new_rsb_count, name); 414 return -EAGAIN; 415 } 416 417 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 418 list_del(&r->res_hashchain); 419 /* Convert the empty list_head to a NULL rb_node for tree usage: */ 420 memset(&r->res_hashnode, 0, sizeof(struct rb_node)); 421 ls->ls_new_rsb_count--; 422 spin_unlock(&ls->ls_new_rsb_spin); 423 424 r->res_ls = ls; 425 r->res_length = len; 426 memcpy(r->res_name, name, len); 427 mutex_init(&r->res_mutex); 428 429 INIT_LIST_HEAD(&r->res_lookup); 430 INIT_LIST_HEAD(&r->res_grantqueue); 431 INIT_LIST_HEAD(&r->res_convertqueue); 432 INIT_LIST_HEAD(&r->res_waitqueue); 433 INIT_LIST_HEAD(&r->res_root_list); 434 INIT_LIST_HEAD(&r->res_recover_list); 435 436 *r_ret = r; 437 return 0; 438} 439 440static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) 441{ 442 char maxname[DLM_RESNAME_MAXLEN]; 443 444 memset(maxname, 0, DLM_RESNAME_MAXLEN); 445 memcpy(maxname, name, nlen); 446 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); 447} 448 449int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 450 struct dlm_rsb **r_ret) 451{ 452 struct rb_node *node = tree->rb_node; 453 struct dlm_rsb *r; 454 int rc; 455 456 while (node) { 457 r = rb_entry(node, struct dlm_rsb, res_hashnode); 458 rc = rsb_cmp(r, name, len); 459 if (rc < 0) 460 node = node->rb_left; 461 else if (rc > 0) 462 node = node->rb_right; 463 else 464 goto found; 465 } 466 *r_ret = NULL; 467 return -EBADR; 468 469 found: 470 *r_ret = r; 471 return 0; 472} 473 474static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 475{ 476 struct rb_node **newn = &tree->rb_node; 477 struct rb_node *parent = NULL; 478 int rc; 479 480 while (*newn) { 481 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, 482 res_hashnode); 483 484 parent = *newn; 485 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); 486 if (rc < 0) 487 newn = &parent->rb_left; 488 else if (rc > 0) 489 newn = &parent->rb_right; 490 else { 491 log_print("rsb_insert match"); 492 dlm_dump_rsb(rsb); 493 dlm_dump_rsb(cur); 494 return -EEXIST; 495 } 496 } 497 498 rb_link_node(&rsb->res_hashnode, parent, newn); 499 rb_insert_color(&rsb->res_hashnode, tree); 500 return 0; 501} 502 503/* 504 * Find rsb in rsbtbl and potentially create/add one 505 * 506 * Delaying the release of rsb's has a similar benefit to applications keeping 507 * NL locks on an rsb, but without the guarantee that the cached master value 508 * will still be valid when the rsb is reused. Apps aren't always smart enough 509 * to keep NL locks on an rsb that they may lock again shortly; this can lead 510 * to excessive master lookups and removals if we don't delay the release. 511 * 512 * Searching for an rsb means looking through both the normal list and toss 513 * list. When found on the toss list the rsb is moved to the normal list with 514 * ref count of 1; when found on normal list the ref count is incremented. 515 * 516 * rsb's on the keep list are being used locally and refcounted. 517 * rsb's on the toss list are not being used locally, and are not refcounted. 518 * 519 * The toss list rsb's were either 520 * - previously used locally but not any more (were on keep list, then 521 * moved to toss list when last refcount dropped) 522 * - created and put on toss list as a directory record for a lookup 523 * (we are the dir node for the res, but are not using the res right now, 524 * but some other node is) 525 * 526 * The purpose of find_rsb() is to return a refcounted rsb for local use. 527 * So, if the given rsb is on the toss list, it is moved to the keep list 528 * before being returned. 529 * 530 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 531 * more refcounts exist, so the rsb is moved from the keep list to the 532 * toss list. 533 * 534 * rsb's on both keep and toss lists are used for doing a name to master 535 * lookups. rsb's that are in use locally (and being refcounted) are on 536 * the keep list, rsb's that are not in use locally (not refcounted) and 537 * only exist for name/master lookups are on the toss list. 538 * 539 * rsb's on the toss list who's dir_nodeid is not local can have stale 540 * name/master mappings. So, remote requests on such rsb's can potentially 541 * return with an error, which means the mapping is stale and needs to 542 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 543 * first_lkid is to keep only a single outstanding request on an rsb 544 * while that rsb has a potentially stale master.) 545 */ 546 547static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, 548 uint32_t hash, uint32_t b, 549 int dir_nodeid, int from_nodeid, 550 unsigned int flags, struct dlm_rsb **r_ret) 551{ 552 struct dlm_rsb *r = NULL; 553 int our_nodeid = dlm_our_nodeid(); 554 int from_local = 0; 555 int from_other = 0; 556 int from_dir = 0; 557 int create = 0; 558 int error; 559 560 if (flags & R_RECEIVE_REQUEST) { 561 if (from_nodeid == dir_nodeid) 562 from_dir = 1; 563 else 564 from_other = 1; 565 } else if (flags & R_REQUEST) { 566 from_local = 1; 567 } 568 569 /* 570 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 571 * from_nodeid has sent us a lock in dlm_recover_locks, believing 572 * we're the new master. Our local recovery may not have set 573 * res_master_nodeid to our_nodeid yet, so allow either. Don't 574 * create the rsb; dlm_recover_process_copy() will handle EBADR 575 * by resending. 576 * 577 * If someone sends us a request, we are the dir node, and we do 578 * not find the rsb anywhere, then recreate it. This happens if 579 * someone sends us a request after we have removed/freed an rsb 580 * from our toss list. (They sent a request instead of lookup 581 * because they are using an rsb from their toss list.) 582 */ 583 584 if (from_local || from_dir || 585 (from_other && (dir_nodeid == our_nodeid))) { 586 create = 1; 587 } 588 589 retry: 590 if (create) { 591 error = pre_rsb_struct(ls); 592 if (error < 0) 593 goto out; 594 } 595 596 spin_lock(&ls->ls_rsbtbl[b].lock); 597 598 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 599 if (error) 600 goto do_toss; 601 602 /* 603 * rsb is active, so we can't check master_nodeid without lock_rsb. 604 */ 605 606 kref_get(&r->res_ref); 607 goto out_unlock; 608 609 610 do_toss: 611 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 612 if (error) 613 goto do_new; 614 615 /* 616 * rsb found inactive (master_nodeid may be out of date unless 617 * we are the dir_nodeid or were the master) No other thread 618 * is using this rsb because it's on the toss list, so we can 619 * look at or update res_master_nodeid without lock_rsb. 620 */ 621 622 if ((r->res_master_nodeid != our_nodeid) && from_other) { 623 /* our rsb was not master, and another node (not the dir node) 624 has sent us a request */ 625 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 626 from_nodeid, r->res_master_nodeid, dir_nodeid, 627 r->res_name); 628 error = -ENOTBLK; 629 goto out_unlock; 630 } 631 632 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 633 /* don't think this should ever happen */ 634 log_error(ls, "find_rsb toss from_dir %d master %d", 635 from_nodeid, r->res_master_nodeid); 636 dlm_print_rsb(r); 637 /* fix it and go on */ 638 r->res_master_nodeid = our_nodeid; 639 r->res_nodeid = 0; 640 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 641 r->res_first_lkid = 0; 642 } 643 644 if (from_local && (r->res_master_nodeid != our_nodeid)) { 645 /* Because we have held no locks on this rsb, 646 res_master_nodeid could have become stale. */ 647 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 648 r->res_first_lkid = 0; 649 } 650 651 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 652 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 653 goto out_unlock; 654 655 656 do_new: 657 /* 658 * rsb not found 659 */ 660 661 if (error == -EBADR && !create) 662 goto out_unlock; 663 664 error = get_rsb_struct(ls, name, len, &r); 665 if (error == -EAGAIN) { 666 spin_unlock(&ls->ls_rsbtbl[b].lock); 667 goto retry; 668 } 669 if (error) 670 goto out_unlock; 671 672 r->res_hash = hash; 673 r->res_bucket = b; 674 r->res_dir_nodeid = dir_nodeid; 675 kref_init(&r->res_ref); 676 677 if (from_dir) { 678 /* want to see how often this happens */ 679 log_debug(ls, "find_rsb new from_dir %d recreate %s", 680 from_nodeid, r->res_name); 681 r->res_master_nodeid = our_nodeid; 682 r->res_nodeid = 0; 683 goto out_add; 684 } 685 686 if (from_other && (dir_nodeid != our_nodeid)) { 687 /* should never happen */ 688 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 689 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 690 dlm_free_rsb(r); 691 r = NULL; 692 error = -ENOTBLK; 693 goto out_unlock; 694 } 695 696 if (from_other) { 697 log_debug(ls, "find_rsb new from_other %d dir %d %s", 698 from_nodeid, dir_nodeid, r->res_name); 699 } 700 701 if (dir_nodeid == our_nodeid) { 702 /* When we are the dir nodeid, we can set the master 703 node immediately */ 704 r->res_master_nodeid = our_nodeid; 705 r->res_nodeid = 0; 706 } else { 707 /* set_master will send_lookup to dir_nodeid */ 708 r->res_master_nodeid = 0; 709 r->res_nodeid = -1; 710 } 711 712 out_add: 713 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 714 out_unlock: 715 spin_unlock(&ls->ls_rsbtbl[b].lock); 716 out: 717 *r_ret = r; 718 return error; 719} 720 721/* During recovery, other nodes can send us new MSTCPY locks (from 722 dlm_recover_locks) before we've made ourself master (in 723 dlm_recover_masters). */ 724 725static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, 726 uint32_t hash, uint32_t b, 727 int dir_nodeid, int from_nodeid, 728 unsigned int flags, struct dlm_rsb **r_ret) 729{ 730 struct dlm_rsb *r = NULL; 731 int our_nodeid = dlm_our_nodeid(); 732 int recover = (flags & R_RECEIVE_RECOVER); 733 int error; 734 735 retry: 736 error = pre_rsb_struct(ls); 737 if (error < 0) 738 goto out; 739 740 spin_lock(&ls->ls_rsbtbl[b].lock); 741 742 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 743 if (error) 744 goto do_toss; 745 746 /* 747 * rsb is active, so we can't check master_nodeid without lock_rsb. 748 */ 749 750 kref_get(&r->res_ref); 751 goto out_unlock; 752 753 754 do_toss: 755 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 756 if (error) 757 goto do_new; 758 759 /* 760 * rsb found inactive. No other thread is using this rsb because 761 * it's on the toss list, so we can look at or update 762 * res_master_nodeid without lock_rsb. 763 */ 764 765 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 766 /* our rsb is not master, and another node has sent us a 767 request; this should never happen */ 768 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 769 from_nodeid, r->res_master_nodeid, dir_nodeid); 770 dlm_print_rsb(r); 771 error = -ENOTBLK; 772 goto out_unlock; 773 } 774 775 if (!recover && (r->res_master_nodeid != our_nodeid) && 776 (dir_nodeid == our_nodeid)) { 777 /* our rsb is not master, and we are dir; may as well fix it; 778 this should never happen */ 779 log_error(ls, "find_rsb toss our %d master %d dir %d", 780 our_nodeid, r->res_master_nodeid, dir_nodeid); 781 dlm_print_rsb(r); 782 r->res_master_nodeid = our_nodeid; 783 r->res_nodeid = 0; 784 } 785 786 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 787 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 788 goto out_unlock; 789 790 791 do_new: 792 /* 793 * rsb not found 794 */ 795 796 error = get_rsb_struct(ls, name, len, &r); 797 if (error == -EAGAIN) { 798 spin_unlock(&ls->ls_rsbtbl[b].lock); 799 goto retry; 800 } 801 if (error) 802 goto out_unlock; 803 804 r->res_hash = hash; 805 r->res_bucket = b; 806 r->res_dir_nodeid = dir_nodeid; 807 r->res_master_nodeid = dir_nodeid; 808 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 809 kref_init(&r->res_ref); 810 811 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 812 out_unlock: 813 spin_unlock(&ls->ls_rsbtbl[b].lock); 814 out: 815 *r_ret = r; 816 return error; 817} 818 819static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, 820 unsigned int flags, struct dlm_rsb **r_ret) 821{ 822 uint32_t hash, b; 823 int dir_nodeid; 824 825 if (len > DLM_RESNAME_MAXLEN) 826 return -EINVAL; 827 828 hash = jhash(name, len, 0); 829 b = hash & (ls->ls_rsbtbl_size - 1); 830 831 dir_nodeid = dlm_hash2nodeid(ls, hash); 832 833 if (dlm_no_directory(ls)) 834 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, 835 from_nodeid, flags, r_ret); 836 else 837 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, 838 from_nodeid, flags, r_ret); 839} 840 841/* we have received a request and found that res_master_nodeid != our_nodeid, 842 so we need to return an error or make ourself the master */ 843 844static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 845 int from_nodeid) 846{ 847 if (dlm_no_directory(ls)) { 848 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 849 from_nodeid, r->res_master_nodeid, 850 r->res_dir_nodeid); 851 dlm_print_rsb(r); 852 return -ENOTBLK; 853 } 854 855 if (from_nodeid != r->res_dir_nodeid) { 856 /* our rsb is not master, and another node (not the dir node) 857 has sent us a request. this is much more common when our 858 master_nodeid is zero, so limit debug to non-zero. */ 859 860 if (r->res_master_nodeid) { 861 log_debug(ls, "validate master from_other %d master %d " 862 "dir %d first %x %s", from_nodeid, 863 r->res_master_nodeid, r->res_dir_nodeid, 864 r->res_first_lkid, r->res_name); 865 } 866 return -ENOTBLK; 867 } else { 868 /* our rsb is not master, but the dir nodeid has sent us a 869 request; this could happen with master 0 / res_nodeid -1 */ 870 871 if (r->res_master_nodeid) { 872 log_error(ls, "validate master from_dir %d master %d " 873 "first %x %s", 874 from_nodeid, r->res_master_nodeid, 875 r->res_first_lkid, r->res_name); 876 } 877 878 r->res_master_nodeid = dlm_our_nodeid(); 879 r->res_nodeid = 0; 880 return 0; 881 } 882} 883 884static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, 885 int from_nodeid, bool toss_list, unsigned int flags, 886 int *r_nodeid, int *result) 887{ 888 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 889 int from_master = (flags & DLM_LU_RECOVER_DIR); 890 891 if (r->res_dir_nodeid != our_nodeid) { 892 /* should not happen, but may as well fix it and carry on */ 893 log_error(ls, "%s res_dir %d our %d %s", __func__, 894 r->res_dir_nodeid, our_nodeid, r->res_name); 895 r->res_dir_nodeid = our_nodeid; 896 } 897 898 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 899 /* Recovery uses this function to set a new master when 900 * the previous master failed. Setting NEW_MASTER will 901 * force dlm_recover_masters to call recover_master on this 902 * rsb even though the res_nodeid is no longer removed. 903 */ 904 905 r->res_master_nodeid = from_nodeid; 906 r->res_nodeid = from_nodeid; 907 rsb_set_flag(r, RSB_NEW_MASTER); 908 909 if (toss_list) { 910 /* I don't think we should ever find it on toss list. */ 911 log_error(ls, "%s fix_master on toss", __func__); 912 dlm_dump_rsb(r); 913 } 914 } 915 916 if (from_master && (r->res_master_nodeid != from_nodeid)) { 917 /* this will happen if from_nodeid became master during 918 * a previous recovery cycle, and we aborted the previous 919 * cycle before recovering this master value 920 */ 921 922 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", 923 __func__, from_nodeid, r->res_master_nodeid, 924 r->res_nodeid, r->res_first_lkid, r->res_name); 925 926 if (r->res_master_nodeid == our_nodeid) { 927 log_error(ls, "from_master %d our_master", from_nodeid); 928 dlm_dump_rsb(r); 929 goto ret_assign; 930 } 931 932 r->res_master_nodeid = from_nodeid; 933 r->res_nodeid = from_nodeid; 934 rsb_set_flag(r, RSB_NEW_MASTER); 935 } 936 937 if (!r->res_master_nodeid) { 938 /* this will happen if recovery happens while we're looking 939 * up the master for this rsb 940 */ 941 942 log_debug(ls, "%s master 0 to %d first %x %s", __func__, 943 from_nodeid, r->res_first_lkid, r->res_name); 944 r->res_master_nodeid = from_nodeid; 945 r->res_nodeid = from_nodeid; 946 } 947 948 if (!from_master && !fix_master && 949 (r->res_master_nodeid == from_nodeid)) { 950 /* this can happen when the master sends remove, the dir node 951 * finds the rsb on the keep list and ignores the remove, 952 * and the former master sends a lookup 953 */ 954 955 log_limit(ls, "%s from master %d flags %x first %x %s", 956 __func__, from_nodeid, flags, r->res_first_lkid, 957 r->res_name); 958 } 959 960 ret_assign: 961 *r_nodeid = r->res_master_nodeid; 962 if (result) 963 *result = DLM_LU_MATCH; 964} 965 966/* 967 * We're the dir node for this res and another node wants to know the 968 * master nodeid. During normal operation (non recovery) this is only 969 * called from receive_lookup(); master lookups when the local node is 970 * the dir node are done by find_rsb(). 971 * 972 * normal operation, we are the dir node for a resource 973 * . _request_lock 974 * . set_master 975 * . send_lookup 976 * . receive_lookup 977 * . dlm_master_lookup flags 0 978 * 979 * recover directory, we are rebuilding dir for all resources 980 * . dlm_recover_directory 981 * . dlm_rcom_names 982 * remote node sends back the rsb names it is master of and we are dir of 983 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 984 * we either create new rsb setting remote node as master, or find existing 985 * rsb and set master to be the remote node. 986 * 987 * recover masters, we are finding the new master for resources 988 * . dlm_recover_masters 989 * . recover_master 990 * . dlm_send_rcom_lookup 991 * . receive_rcom_lookup 992 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 993 */ 994 995int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, 996 unsigned int flags, int *r_nodeid, int *result) 997{ 998 struct dlm_rsb *r = NULL; 999 uint32_t hash, b; 1000 int our_nodeid = dlm_our_nodeid(); 1001 int dir_nodeid, error; 1002 1003 if (len > DLM_RESNAME_MAXLEN) 1004 return -EINVAL; 1005 1006 if (from_nodeid == our_nodeid) { 1007 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 1008 our_nodeid, flags); 1009 return -EINVAL; 1010 } 1011 1012 hash = jhash(name, len, 0); 1013 b = hash & (ls->ls_rsbtbl_size - 1); 1014 1015 dir_nodeid = dlm_hash2nodeid(ls, hash); 1016 if (dir_nodeid != our_nodeid) { 1017 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 1018 from_nodeid, dir_nodeid, our_nodeid, hash, 1019 ls->ls_num_nodes); 1020 *r_nodeid = -1; 1021 return -EINVAL; 1022 } 1023 1024 retry: 1025 error = pre_rsb_struct(ls); 1026 if (error < 0) 1027 return error; 1028 1029 spin_lock(&ls->ls_rsbtbl[b].lock); 1030 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1031 if (!error) { 1032 /* because the rsb is active, we need to lock_rsb before 1033 * checking/changing re_master_nodeid 1034 */ 1035 1036 hold_rsb(r); 1037 spin_unlock(&ls->ls_rsbtbl[b].lock); 1038 lock_rsb(r); 1039 1040 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, 1041 flags, r_nodeid, result); 1042 1043 /* the rsb was active */ 1044 unlock_rsb(r); 1045 put_rsb(r); 1046 1047 return 0; 1048 } 1049 1050 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1051 if (error) 1052 goto not_found; 1053 1054 /* because the rsb is inactive (on toss list), it's not refcounted 1055 * and lock_rsb is not used, but is protected by the rsbtbl lock 1056 */ 1057 1058 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, 1059 r_nodeid, result); 1060 1061 r->res_toss_time = jiffies; 1062 /* the rsb was inactive (on toss list) */ 1063 spin_unlock(&ls->ls_rsbtbl[b].lock); 1064 1065 return 0; 1066 1067 not_found: 1068 error = get_rsb_struct(ls, name, len, &r); 1069 if (error == -EAGAIN) { 1070 spin_unlock(&ls->ls_rsbtbl[b].lock); 1071 goto retry; 1072 } 1073 if (error) 1074 goto out_unlock; 1075 1076 r->res_hash = hash; 1077 r->res_bucket = b; 1078 r->res_dir_nodeid = our_nodeid; 1079 r->res_master_nodeid = from_nodeid; 1080 r->res_nodeid = from_nodeid; 1081 kref_init(&r->res_ref); 1082 r->res_toss_time = jiffies; 1083 1084 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); 1085 if (error) { 1086 /* should never happen */ 1087 dlm_free_rsb(r); 1088 spin_unlock(&ls->ls_rsbtbl[b].lock); 1089 goto retry; 1090 } 1091 1092 if (result) 1093 *result = DLM_LU_ADD; 1094 *r_nodeid = from_nodeid; 1095 out_unlock: 1096 spin_unlock(&ls->ls_rsbtbl[b].lock); 1097 return error; 1098} 1099 1100static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1101{ 1102 struct rb_node *n; 1103 struct dlm_rsb *r; 1104 int i; 1105 1106 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1107 spin_lock(&ls->ls_rsbtbl[i].lock); 1108 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 1109 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1110 if (r->res_hash == hash) 1111 dlm_dump_rsb(r); 1112 } 1113 spin_unlock(&ls->ls_rsbtbl[i].lock); 1114 } 1115} 1116 1117void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) 1118{ 1119 struct dlm_rsb *r = NULL; 1120 uint32_t hash, b; 1121 int error; 1122 1123 hash = jhash(name, len, 0); 1124 b = hash & (ls->ls_rsbtbl_size - 1); 1125 1126 spin_lock(&ls->ls_rsbtbl[b].lock); 1127 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1128 if (!error) 1129 goto out_dump; 1130 1131 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1132 if (error) 1133 goto out; 1134 out_dump: 1135 dlm_dump_rsb(r); 1136 out: 1137 spin_unlock(&ls->ls_rsbtbl[b].lock); 1138} 1139 1140static void toss_rsb(struct kref *kref) 1141{ 1142 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1143 struct dlm_ls *ls = r->res_ls; 1144 1145 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1146 kref_init(&r->res_ref); 1147 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); 1148 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); 1149 r->res_toss_time = jiffies; 1150 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; 1151 if (r->res_lvbptr) { 1152 dlm_free_lvb(r->res_lvbptr); 1153 r->res_lvbptr = NULL; 1154 } 1155} 1156 1157/* See comment for unhold_lkb */ 1158 1159static void unhold_rsb(struct dlm_rsb *r) 1160{ 1161 int rv; 1162 rv = kref_put(&r->res_ref, toss_rsb); 1163 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1164} 1165 1166static void kill_rsb(struct kref *kref) 1167{ 1168 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1169 1170 /* All work is done after the return from kref_put() so we 1171 can release the write_lock before the remove and free. */ 1172 1173 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1174 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1175 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1176 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1177 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1178 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1179} 1180 1181/* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1182 The rsb must exist as long as any lkb's for it do. */ 1183 1184static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1185{ 1186 hold_rsb(r); 1187 lkb->lkb_resource = r; 1188} 1189 1190static void detach_lkb(struct dlm_lkb *lkb) 1191{ 1192 if (lkb->lkb_resource) { 1193 put_rsb(lkb->lkb_resource); 1194 lkb->lkb_resource = NULL; 1195 } 1196} 1197 1198static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1199 int start, int end) 1200{ 1201 struct dlm_lkb *lkb; 1202 int rv; 1203 1204 lkb = dlm_allocate_lkb(ls); 1205 if (!lkb) 1206 return -ENOMEM; 1207 1208 lkb->lkb_nodeid = -1; 1209 lkb->lkb_grmode = DLM_LOCK_IV; 1210 kref_init(&lkb->lkb_ref); 1211 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1212 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1213 INIT_LIST_HEAD(&lkb->lkb_time_list); 1214 INIT_LIST_HEAD(&lkb->lkb_cb_list); 1215 mutex_init(&lkb->lkb_cb_mutex); 1216 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); 1217 1218 idr_preload(GFP_NOFS); 1219 spin_lock(&ls->ls_lkbidr_spin); 1220 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1221 if (rv >= 0) 1222 lkb->lkb_id = rv; 1223 spin_unlock(&ls->ls_lkbidr_spin); 1224 idr_preload_end(); 1225 1226 if (rv < 0) { 1227 log_error(ls, "create_lkb idr error %d", rv); 1228 dlm_free_lkb(lkb); 1229 return rv; 1230 } 1231 1232 *lkb_ret = lkb; 1233 return 0; 1234} 1235 1236static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1237{ 1238 return _create_lkb(ls, lkb_ret, 1, 0); 1239} 1240 1241static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1242{ 1243 struct dlm_lkb *lkb; 1244 1245 spin_lock(&ls->ls_lkbidr_spin); 1246 lkb = idr_find(&ls->ls_lkbidr, lkid); 1247 if (lkb) 1248 kref_get(&lkb->lkb_ref); 1249 spin_unlock(&ls->ls_lkbidr_spin); 1250 1251 *lkb_ret = lkb; 1252 return lkb ? 0 : -ENOENT; 1253} 1254 1255static void kill_lkb(struct kref *kref) 1256{ 1257 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1258 1259 /* All work is done after the return from kref_put() so we 1260 can release the write_lock before the detach_lkb */ 1261 1262 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1263} 1264 1265/* __put_lkb() is used when an lkb may not have an rsb attached to 1266 it so we need to provide the lockspace explicitly */ 1267 1268static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1269{ 1270 uint32_t lkid = lkb->lkb_id; 1271 int rv; 1272 1273 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, 1274 &ls->ls_lkbidr_spin); 1275 if (rv) { 1276 idr_remove(&ls->ls_lkbidr, lkid); 1277 spin_unlock(&ls->ls_lkbidr_spin); 1278 1279 detach_lkb(lkb); 1280 1281 /* for local/process lkbs, lvbptr points to caller's lksb */ 1282 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1283 dlm_free_lvb(lkb->lkb_lvbptr); 1284 dlm_free_lkb(lkb); 1285 } 1286 1287 return rv; 1288} 1289 1290int dlm_put_lkb(struct dlm_lkb *lkb) 1291{ 1292 struct dlm_ls *ls; 1293 1294 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1295 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1296 1297 ls = lkb->lkb_resource->res_ls; 1298 return __put_lkb(ls, lkb); 1299} 1300 1301/* This is only called to add a reference when the code already holds 1302 a valid reference to the lkb, so there's no need for locking. */ 1303 1304static inline void hold_lkb(struct dlm_lkb *lkb) 1305{ 1306 kref_get(&lkb->lkb_ref); 1307} 1308 1309/* This is called when we need to remove a reference and are certain 1310 it's not the last ref. e.g. del_lkb is always called between a 1311 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1312 put_lkb would work fine, but would involve unnecessary locking */ 1313 1314static inline void unhold_lkb(struct dlm_lkb *lkb) 1315{ 1316 int rv; 1317 rv = kref_put(&lkb->lkb_ref, kill_lkb); 1318 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 1319} 1320 1321static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1322 int mode) 1323{ 1324 struct dlm_lkb *lkb = NULL, *iter; 1325 1326 list_for_each_entry(iter, head, lkb_statequeue) 1327 if (iter->lkb_rqmode < mode) { 1328 lkb = iter; 1329 list_add_tail(new, &iter->lkb_statequeue); 1330 break; 1331 } 1332 1333 if (!lkb) 1334 list_add_tail(new, head); 1335} 1336 1337/* add/remove lkb to rsb's grant/convert/wait queue */ 1338 1339static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1340{ 1341 kref_get(&lkb->lkb_ref); 1342 1343 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1344 1345 lkb->lkb_timestamp = ktime_get(); 1346 1347 lkb->lkb_status = status; 1348 1349 switch (status) { 1350 case DLM_LKSTS_WAITING: 1351 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1352 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1353 else 1354 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1355 break; 1356 case DLM_LKSTS_GRANTED: 1357 /* convention says granted locks kept in order of grmode */ 1358 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1359 lkb->lkb_grmode); 1360 break; 1361 case DLM_LKSTS_CONVERT: 1362 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1363 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1364 else 1365 list_add_tail(&lkb->lkb_statequeue, 1366 &r->res_convertqueue); 1367 break; 1368 default: 1369 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1370 } 1371} 1372 1373static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1374{ 1375 lkb->lkb_status = 0; 1376 list_del(&lkb->lkb_statequeue); 1377 unhold_lkb(lkb); 1378} 1379 1380static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1381{ 1382 hold_lkb(lkb); 1383 del_lkb(r, lkb); 1384 add_lkb(r, lkb, sts); 1385 unhold_lkb(lkb); 1386} 1387 1388static int msg_reply_type(int mstype) 1389{ 1390 switch (mstype) { 1391 case DLM_MSG_REQUEST: 1392 return DLM_MSG_REQUEST_REPLY; 1393 case DLM_MSG_CONVERT: 1394 return DLM_MSG_CONVERT_REPLY; 1395 case DLM_MSG_UNLOCK: 1396 return DLM_MSG_UNLOCK_REPLY; 1397 case DLM_MSG_CANCEL: 1398 return DLM_MSG_CANCEL_REPLY; 1399 case DLM_MSG_LOOKUP: 1400 return DLM_MSG_LOOKUP_REPLY; 1401 } 1402 return -1; 1403} 1404 1405static int nodeid_warned(int nodeid, int num_nodes, int *warned) 1406{ 1407 int i; 1408 1409 for (i = 0; i < num_nodes; i++) { 1410 if (!warned[i]) { 1411 warned[i] = nodeid; 1412 return 0; 1413 } 1414 if (warned[i] == nodeid) 1415 return 1; 1416 } 1417 return 0; 1418} 1419 1420void dlm_scan_waiters(struct dlm_ls *ls) 1421{ 1422 struct dlm_lkb *lkb; 1423 s64 us; 1424 s64 debug_maxus = 0; 1425 u32 debug_scanned = 0; 1426 u32 debug_expired = 0; 1427 int num_nodes = 0; 1428 int *warned = NULL; 1429 1430 if (!dlm_config.ci_waitwarn_us) 1431 return; 1432 1433 mutex_lock(&ls->ls_waiters_mutex); 1434 1435 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1436 if (!lkb->lkb_wait_time) 1437 continue; 1438 1439 debug_scanned++; 1440 1441 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time)); 1442 1443 if (us < dlm_config.ci_waitwarn_us) 1444 continue; 1445 1446 lkb->lkb_wait_time = 0; 1447 1448 debug_expired++; 1449 if (us > debug_maxus) 1450 debug_maxus = us; 1451 1452 if (!num_nodes) { 1453 num_nodes = ls->ls_num_nodes; 1454 warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL); 1455 } 1456 if (!warned) 1457 continue; 1458 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) 1459 continue; 1460 1461 log_error(ls, "waitwarn %x %lld %d us check connection to " 1462 "node %d", lkb->lkb_id, (long long)us, 1463 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); 1464 } 1465 mutex_unlock(&ls->ls_waiters_mutex); 1466 kfree(warned); 1467 1468 if (debug_expired) 1469 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", 1470 debug_scanned, debug_expired, 1471 dlm_config.ci_waitwarn_us, (long long)debug_maxus); 1472} 1473 1474/* add/remove lkb from global waiters list of lkb's waiting for 1475 a reply from a remote node */ 1476 1477static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1478{ 1479 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1480 int error = 0; 1481 1482 mutex_lock(&ls->ls_waiters_mutex); 1483 1484 if (is_overlap_unlock(lkb) || 1485 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1486 error = -EINVAL; 1487 goto out; 1488 } 1489 1490 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1491 switch (mstype) { 1492 case DLM_MSG_UNLOCK: 1493 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1494 break; 1495 case DLM_MSG_CANCEL: 1496 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1497 break; 1498 default: 1499 error = -EBUSY; 1500 goto out; 1501 } 1502 lkb->lkb_wait_count++; 1503 hold_lkb(lkb); 1504 1505 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1506 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1507 lkb->lkb_wait_count, lkb->lkb_flags); 1508 goto out; 1509 } 1510 1511 DLM_ASSERT(!lkb->lkb_wait_count, 1512 dlm_print_lkb(lkb); 1513 printk("wait_count %d\n", lkb->lkb_wait_count);); 1514 1515 lkb->lkb_wait_count++; 1516 lkb->lkb_wait_type = mstype; 1517 lkb->lkb_wait_time = ktime_get(); 1518 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1519 hold_lkb(lkb); 1520 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1521 out: 1522 if (error) 1523 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1524 lkb->lkb_id, error, lkb->lkb_flags, mstype, 1525 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1526 mutex_unlock(&ls->ls_waiters_mutex); 1527 return error; 1528} 1529 1530/* We clear the RESEND flag because we might be taking an lkb off the waiters 1531 list as part of process_requestqueue (e.g. a lookup that has an optimized 1532 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1533 set RESEND and dlm_recover_waiters_post() */ 1534 1535static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1536 struct dlm_message *ms) 1537{ 1538 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1539 int overlap_done = 0; 1540 1541 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 1542 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1543 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 1544 overlap_done = 1; 1545 goto out_del; 1546 } 1547 1548 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 1549 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1550 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1551 overlap_done = 1; 1552 goto out_del; 1553 } 1554 1555 /* Cancel state was preemptively cleared by a successful convert, 1556 see next comment, nothing to do. */ 1557 1558 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1559 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1560 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1561 lkb->lkb_id, lkb->lkb_wait_type); 1562 return -1; 1563 } 1564 1565 /* Remove for the convert reply, and premptively remove for the 1566 cancel reply. A convert has been granted while there's still 1567 an outstanding cancel on it (the cancel is moot and the result 1568 in the cancel reply should be 0). We preempt the cancel reply 1569 because the app gets the convert result and then can follow up 1570 with another op, like convert. This subsequent op would see the 1571 lingering state of the cancel and fail with -EBUSY. */ 1572 1573 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1574 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && 1575 is_overlap_cancel(lkb) && ms && !ms->m_result) { 1576 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1577 lkb->lkb_id); 1578 lkb->lkb_wait_type = 0; 1579 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1580 lkb->lkb_wait_count--; 1581 unhold_lkb(lkb); 1582 goto out_del; 1583 } 1584 1585 /* N.B. type of reply may not always correspond to type of original 1586 msg due to lookup->request optimization, verify others? */ 1587 1588 if (lkb->lkb_wait_type) { 1589 lkb->lkb_wait_type = 0; 1590 goto out_del; 1591 } 1592 1593 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1594 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, 1595 lkb->lkb_remid, mstype, lkb->lkb_flags); 1596 return -1; 1597 1598 out_del: 1599 /* the force-unlock/cancel has completed and we haven't recvd a reply 1600 to the op that was in progress prior to the unlock/cancel; we 1601 give up on any reply to the earlier op. FIXME: not sure when/how 1602 this would happen */ 1603 1604 if (overlap_done && lkb->lkb_wait_type) { 1605 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1606 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1607 lkb->lkb_wait_count--; 1608 unhold_lkb(lkb); 1609 lkb->lkb_wait_type = 0; 1610 } 1611 1612 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1613 1614 lkb->lkb_flags &= ~DLM_IFL_RESEND; 1615 lkb->lkb_wait_count--; 1616 if (!lkb->lkb_wait_count) 1617 list_del_init(&lkb->lkb_wait_reply); 1618 unhold_lkb(lkb); 1619 return 0; 1620} 1621 1622static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1623{ 1624 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1625 int error; 1626 1627 mutex_lock(&ls->ls_waiters_mutex); 1628 error = _remove_from_waiters(lkb, mstype, NULL); 1629 mutex_unlock(&ls->ls_waiters_mutex); 1630 return error; 1631} 1632 1633/* Handles situations where we might be processing a "fake" or "stub" reply in 1634 which we can't try to take waiters_mutex again. */ 1635 1636static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 1637{ 1638 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1639 int error; 1640 1641 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) 1642 mutex_lock(&ls->ls_waiters_mutex); 1643 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); 1644 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) 1645 mutex_unlock(&ls->ls_waiters_mutex); 1646 return error; 1647} 1648 1649/* If there's an rsb for the same resource being removed, ensure 1650 * that the remove message is sent before the new lookup message. 1651 */ 1652 1653#define DLM_WAIT_PENDING_COND(ls, r) \ 1654 (ls->ls_remove_len && \ 1655 !rsb_cmp(r, ls->ls_remove_name, \ 1656 ls->ls_remove_len)) 1657 1658static void wait_pending_remove(struct dlm_rsb *r) 1659{ 1660 struct dlm_ls *ls = r->res_ls; 1661 restart: 1662 spin_lock(&ls->ls_remove_spin); 1663 if (DLM_WAIT_PENDING_COND(ls, r)) { 1664 log_debug(ls, "delay lookup for remove dir %d %s", 1665 r->res_dir_nodeid, r->res_name); 1666 spin_unlock(&ls->ls_remove_spin); 1667 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r)); 1668 goto restart; 1669 } 1670 spin_unlock(&ls->ls_remove_spin); 1671} 1672 1673/* 1674 * ls_remove_spin protects ls_remove_name and ls_remove_len which are 1675 * read by other threads in wait_pending_remove. ls_remove_names 1676 * and ls_remove_lens are only used by the scan thread, so they do 1677 * not need protection. 1678 */ 1679 1680static void shrink_bucket(struct dlm_ls *ls, int b) 1681{ 1682 struct rb_node *n, *next; 1683 struct dlm_rsb *r; 1684 char *name; 1685 int our_nodeid = dlm_our_nodeid(); 1686 int remote_count = 0; 1687 int need_shrink = 0; 1688 int i, len, rv; 1689 1690 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); 1691 1692 spin_lock(&ls->ls_rsbtbl[b].lock); 1693 1694 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { 1695 spin_unlock(&ls->ls_rsbtbl[b].lock); 1696 return; 1697 } 1698 1699 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { 1700 next = rb_next(n); 1701 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1702 1703 /* If we're the directory record for this rsb, and 1704 we're not the master of it, then we need to wait 1705 for the master node to send us a dir remove for 1706 before removing the dir record. */ 1707 1708 if (!dlm_no_directory(ls) && 1709 (r->res_master_nodeid != our_nodeid) && 1710 (dlm_dir_nodeid(r) == our_nodeid)) { 1711 continue; 1712 } 1713 1714 need_shrink = 1; 1715 1716 if (!time_after_eq(jiffies, r->res_toss_time + 1717 dlm_config.ci_toss_secs * HZ)) { 1718 continue; 1719 } 1720 1721 if (!dlm_no_directory(ls) && 1722 (r->res_master_nodeid == our_nodeid) && 1723 (dlm_dir_nodeid(r) != our_nodeid)) { 1724 1725 /* We're the master of this rsb but we're not 1726 the directory record, so we need to tell the 1727 dir node to remove the dir record. */ 1728 1729 ls->ls_remove_lens[remote_count] = r->res_length; 1730 memcpy(ls->ls_remove_names[remote_count], r->res_name, 1731 DLM_RESNAME_MAXLEN); 1732 remote_count++; 1733 1734 if (remote_count >= DLM_REMOVE_NAMES_MAX) 1735 break; 1736 continue; 1737 } 1738 1739 if (!kref_put(&r->res_ref, kill_rsb)) { 1740 log_error(ls, "tossed rsb in use %s", r->res_name); 1741 continue; 1742 } 1743 1744 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1745 dlm_free_rsb(r); 1746 } 1747 1748 if (need_shrink) 1749 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; 1750 else 1751 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; 1752 spin_unlock(&ls->ls_rsbtbl[b].lock); 1753 1754 /* 1755 * While searching for rsb's to free, we found some that require 1756 * remote removal. We leave them in place and find them again here 1757 * so there is a very small gap between removing them from the toss 1758 * list and sending the removal. Keeping this gap small is 1759 * important to keep us (the master node) from being out of sync 1760 * with the remote dir node for very long. 1761 * 1762 * From the time the rsb is removed from toss until just after 1763 * send_remove, the rsb name is saved in ls_remove_name. A new 1764 * lookup checks this to ensure that a new lookup message for the 1765 * same resource name is not sent just before the remove message. 1766 */ 1767 1768 for (i = 0; i < remote_count; i++) { 1769 name = ls->ls_remove_names[i]; 1770 len = ls->ls_remove_lens[i]; 1771 1772 spin_lock(&ls->ls_rsbtbl[b].lock); 1773 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1774 if (rv) { 1775 spin_unlock(&ls->ls_rsbtbl[b].lock); 1776 log_debug(ls, "remove_name not toss %s", name); 1777 continue; 1778 } 1779 1780 if (r->res_master_nodeid != our_nodeid) { 1781 spin_unlock(&ls->ls_rsbtbl[b].lock); 1782 log_debug(ls, "remove_name master %d dir %d our %d %s", 1783 r->res_master_nodeid, r->res_dir_nodeid, 1784 our_nodeid, name); 1785 continue; 1786 } 1787 1788 if (r->res_dir_nodeid == our_nodeid) { 1789 /* should never happen */ 1790 spin_unlock(&ls->ls_rsbtbl[b].lock); 1791 log_error(ls, "remove_name dir %d master %d our %d %s", 1792 r->res_dir_nodeid, r->res_master_nodeid, 1793 our_nodeid, name); 1794 continue; 1795 } 1796 1797 if (!time_after_eq(jiffies, r->res_toss_time + 1798 dlm_config.ci_toss_secs * HZ)) { 1799 spin_unlock(&ls->ls_rsbtbl[b].lock); 1800 log_debug(ls, "remove_name toss_time %lu now %lu %s", 1801 r->res_toss_time, jiffies, name); 1802 continue; 1803 } 1804 1805 if (!kref_put(&r->res_ref, kill_rsb)) { 1806 spin_unlock(&ls->ls_rsbtbl[b].lock); 1807 log_error(ls, "remove_name in use %s", name); 1808 continue; 1809 } 1810 1811 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1812 1813 /* block lookup of same name until we've sent remove */ 1814 spin_lock(&ls->ls_remove_spin); 1815 ls->ls_remove_len = len; 1816 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 1817 spin_unlock(&ls->ls_remove_spin); 1818 spin_unlock(&ls->ls_rsbtbl[b].lock); 1819 1820 send_remove(r); 1821 1822 /* allow lookup of name again */ 1823 spin_lock(&ls->ls_remove_spin); 1824 ls->ls_remove_len = 0; 1825 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 1826 spin_unlock(&ls->ls_remove_spin); 1827 wake_up(&ls->ls_remove_wait); 1828 1829 dlm_free_rsb(r); 1830 } 1831} 1832 1833void dlm_scan_rsbs(struct dlm_ls *ls) 1834{ 1835 int i; 1836 1837 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1838 shrink_bucket(ls, i); 1839 if (dlm_locking_stopped(ls)) 1840 break; 1841 cond_resched(); 1842 } 1843} 1844 1845static void add_timeout(struct dlm_lkb *lkb) 1846{ 1847 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1848 1849 if (is_master_copy(lkb)) 1850 return; 1851 1852 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && 1853 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 1854 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; 1855 goto add_it; 1856 } 1857 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) 1858 goto add_it; 1859 return; 1860 1861 add_it: 1862 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); 1863 mutex_lock(&ls->ls_timeout_mutex); 1864 hold_lkb(lkb); 1865 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); 1866 mutex_unlock(&ls->ls_timeout_mutex); 1867} 1868 1869static void del_timeout(struct dlm_lkb *lkb) 1870{ 1871 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1872 1873 mutex_lock(&ls->ls_timeout_mutex); 1874 if (!list_empty(&lkb->lkb_time_list)) { 1875 list_del_init(&lkb->lkb_time_list); 1876 unhold_lkb(lkb); 1877 } 1878 mutex_unlock(&ls->ls_timeout_mutex); 1879} 1880 1881/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and 1882 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex 1883 and then lock rsb because of lock ordering in add_timeout. We may need 1884 to specify some special timeout-related bits in the lkb that are just to 1885 be accessed under the timeout_mutex. */ 1886 1887void dlm_scan_timeout(struct dlm_ls *ls) 1888{ 1889 struct dlm_rsb *r; 1890 struct dlm_lkb *lkb = NULL, *iter; 1891 int do_cancel, do_warn; 1892 s64 wait_us; 1893 1894 for (;;) { 1895 if (dlm_locking_stopped(ls)) 1896 break; 1897 1898 do_cancel = 0; 1899 do_warn = 0; 1900 mutex_lock(&ls->ls_timeout_mutex); 1901 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) { 1902 1903 wait_us = ktime_to_us(ktime_sub(ktime_get(), 1904 iter->lkb_timestamp)); 1905 1906 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) && 1907 wait_us >= (iter->lkb_timeout_cs * 10000)) 1908 do_cancel = 1; 1909 1910 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && 1911 wait_us >= dlm_config.ci_timewarn_cs * 10000) 1912 do_warn = 1; 1913 1914 if (!do_cancel && !do_warn) 1915 continue; 1916 hold_lkb(iter); 1917 lkb = iter; 1918 break; 1919 } 1920 mutex_unlock(&ls->ls_timeout_mutex); 1921 1922 if (!lkb) 1923 break; 1924 1925 r = lkb->lkb_resource; 1926 hold_rsb(r); 1927 lock_rsb(r); 1928 1929 if (do_warn) { 1930 /* clear flag so we only warn once */ 1931 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1932 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) 1933 del_timeout(lkb); 1934 dlm_timeout_warn(lkb); 1935 } 1936 1937 if (do_cancel) { 1938 log_debug(ls, "timeout cancel %x node %d %s", 1939 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1940 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1941 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; 1942 del_timeout(lkb); 1943 _cancel_lock(r, lkb); 1944 } 1945 1946 unlock_rsb(r); 1947 unhold_rsb(r); 1948 dlm_put_lkb(lkb); 1949 } 1950} 1951 1952/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping 1953 dlm_recoverd before checking/setting ls_recover_begin. */ 1954 1955void dlm_adjust_timeouts(struct dlm_ls *ls) 1956{ 1957 struct dlm_lkb *lkb; 1958 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); 1959 1960 ls->ls_recover_begin = 0; 1961 mutex_lock(&ls->ls_timeout_mutex); 1962 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) 1963 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); 1964 mutex_unlock(&ls->ls_timeout_mutex); 1965 1966 if (!dlm_config.ci_waitwarn_us) 1967 return; 1968 1969 mutex_lock(&ls->ls_waiters_mutex); 1970 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1971 if (ktime_to_us(lkb->lkb_wait_time)) 1972 lkb->lkb_wait_time = ktime_get(); 1973 } 1974 mutex_unlock(&ls->ls_waiters_mutex); 1975} 1976 1977/* lkb is master or local copy */ 1978 1979static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1980{ 1981 int b, len = r->res_ls->ls_lvblen; 1982 1983 /* b=1 lvb returned to caller 1984 b=0 lvb written to rsb or invalidated 1985 b=-1 do nothing */ 1986 1987 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1988 1989 if (b == 1) { 1990 if (!lkb->lkb_lvbptr) 1991 return; 1992 1993 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1994 return; 1995 1996 if (!r->res_lvbptr) 1997 return; 1998 1999 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 2000 lkb->lkb_lvbseq = r->res_lvbseq; 2001 2002 } else if (b == 0) { 2003 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 2004 rsb_set_flag(r, RSB_VALNOTVALID); 2005 return; 2006 } 2007 2008 if (!lkb->lkb_lvbptr) 2009 return; 2010 2011 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2012 return; 2013 2014 if (!r->res_lvbptr) 2015 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 2016 2017 if (!r->res_lvbptr) 2018 return; 2019 2020 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 2021 r->res_lvbseq++; 2022 lkb->lkb_lvbseq = r->res_lvbseq; 2023 rsb_clear_flag(r, RSB_VALNOTVALID); 2024 } 2025 2026 if (rsb_flag(r, RSB_VALNOTVALID)) 2027 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 2028} 2029 2030static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2031{ 2032 if (lkb->lkb_grmode < DLM_LOCK_PW) 2033 return; 2034 2035 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 2036 rsb_set_flag(r, RSB_VALNOTVALID); 2037 return; 2038 } 2039 2040 if (!lkb->lkb_lvbptr) 2041 return; 2042 2043 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2044 return; 2045 2046 if (!r->res_lvbptr) 2047 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 2048 2049 if (!r->res_lvbptr) 2050 return; 2051 2052 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2053 r->res_lvbseq++; 2054 rsb_clear_flag(r, RSB_VALNOTVALID); 2055} 2056 2057/* lkb is process copy (pc) */ 2058 2059static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2060 struct dlm_message *ms) 2061{ 2062 int b; 2063 2064 if (!lkb->lkb_lvbptr) 2065 return; 2066 2067 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2068 return; 2069 2070 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 2071 if (b == 1) { 2072 int len = receive_extralen(ms); 2073 if (len > r->res_ls->ls_lvblen) 2074 len = r->res_ls->ls_lvblen; 2075 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2076 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 2077 } 2078} 2079 2080/* Manipulate lkb's on rsb's convert/granted/waiting queues 2081 remove_lock -- used for unlock, removes lkb from granted 2082 revert_lock -- used for cancel, moves lkb from convert to granted 2083 grant_lock -- used for request and convert, adds lkb to granted or 2084 moves lkb from convert or waiting to granted 2085 2086 Each of these is used for master or local copy lkb's. There is 2087 also a _pc() variation used to make the corresponding change on 2088 a process copy (pc) lkb. */ 2089 2090static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2091{ 2092 del_lkb(r, lkb); 2093 lkb->lkb_grmode = DLM_LOCK_IV; 2094 /* this unhold undoes the original ref from create_lkb() 2095 so this leads to the lkb being freed */ 2096 unhold_lkb(lkb); 2097} 2098 2099static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2100{ 2101 set_lvb_unlock(r, lkb); 2102 _remove_lock(r, lkb); 2103} 2104 2105static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2106{ 2107 _remove_lock(r, lkb); 2108} 2109 2110/* returns: 0 did nothing 2111 1 moved lock to granted 2112 -1 removed lock */ 2113 2114static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2115{ 2116 int rv = 0; 2117 2118 lkb->lkb_rqmode = DLM_LOCK_IV; 2119 2120 switch (lkb->lkb_status) { 2121 case DLM_LKSTS_GRANTED: 2122 break; 2123 case DLM_LKSTS_CONVERT: 2124 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2125 rv = 1; 2126 break; 2127 case DLM_LKSTS_WAITING: 2128 del_lkb(r, lkb); 2129 lkb->lkb_grmode = DLM_LOCK_IV; 2130 /* this unhold undoes the original ref from create_lkb() 2131 so this leads to the lkb being freed */ 2132 unhold_lkb(lkb); 2133 rv = -1; 2134 break; 2135 default: 2136 log_print("invalid status for revert %d", lkb->lkb_status); 2137 } 2138 return rv; 2139} 2140 2141static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2142{ 2143 return revert_lock(r, lkb); 2144} 2145 2146static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2147{ 2148 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 2149 lkb->lkb_grmode = lkb->lkb_rqmode; 2150 if (lkb->lkb_status) 2151 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2152 else 2153 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 2154 } 2155 2156 lkb->lkb_rqmode = DLM_LOCK_IV; 2157 lkb->lkb_highbast = 0; 2158} 2159 2160static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2161{ 2162 set_lvb_lock(r, lkb); 2163 _grant_lock(r, lkb); 2164} 2165 2166static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2167 struct dlm_message *ms) 2168{ 2169 set_lvb_lock_pc(r, lkb, ms); 2170 _grant_lock(r, lkb); 2171} 2172 2173/* called by grant_pending_locks() which means an async grant message must 2174 be sent to the requesting node in addition to granting the lock if the 2175 lkb belongs to a remote node. */ 2176 2177static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 2178{ 2179 grant_lock(r, lkb); 2180 if (is_master_copy(lkb)) 2181 send_grant(r, lkb); 2182 else 2183 queue_cast(r, lkb, 0); 2184} 2185 2186/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 2187 change the granted/requested modes. We're munging things accordingly in 2188 the process copy. 2189 CONVDEADLK: our grmode may have been forced down to NL to resolve a 2190 conversion deadlock 2191 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 2192 compatible with other granted locks */ 2193 2194static void munge_demoted(struct dlm_lkb *lkb) 2195{ 2196 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 2197 log_print("munge_demoted %x invalid modes gr %d rq %d", 2198 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 2199 return; 2200 } 2201 2202 lkb->lkb_grmode = DLM_LOCK_NL; 2203} 2204 2205static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 2206{ 2207 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && 2208 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { 2209 log_print("munge_altmode %x invalid reply type %d", 2210 lkb->lkb_id, le32_to_cpu(ms->m_type)); 2211 return; 2212 } 2213 2214 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 2215 lkb->lkb_rqmode = DLM_LOCK_PR; 2216 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 2217 lkb->lkb_rqmode = DLM_LOCK_CW; 2218 else { 2219 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 2220 dlm_print_lkb(lkb); 2221 } 2222} 2223 2224static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 2225{ 2226 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 2227 lkb_statequeue); 2228 if (lkb->lkb_id == first->lkb_id) 2229 return 1; 2230 2231 return 0; 2232} 2233 2234/* Check if the given lkb conflicts with another lkb on the queue. */ 2235 2236static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 2237{ 2238 struct dlm_lkb *this; 2239 2240 list_for_each_entry(this, head, lkb_statequeue) { 2241 if (this == lkb) 2242 continue; 2243 if (!modes_compat(this, lkb)) 2244 return 1; 2245 } 2246 return 0; 2247} 2248 2249/* 2250 * "A conversion deadlock arises with a pair of lock requests in the converting 2251 * queue for one resource. The granted mode of each lock blocks the requested 2252 * mode of the other lock." 2253 * 2254 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2255 * convert queue from being granted, then deadlk/demote lkb. 2256 * 2257 * Example: 2258 * Granted Queue: empty 2259 * Convert Queue: NL->EX (first lock) 2260 * PR->EX (second lock) 2261 * 2262 * The first lock can't be granted because of the granted mode of the second 2263 * lock and the second lock can't be granted because it's not first in the 2264 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2265 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2266 * flag set and return DEMOTED in the lksb flags. 2267 * 2268 * Originally, this function detected conv-deadlk in a more limited scope: 2269 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2270 * - if lkb1 was the first entry in the queue (not just earlier), and was 2271 * blocked by the granted mode of lkb2, and there was nothing on the 2272 * granted queue preventing lkb1 from being granted immediately, i.e. 2273 * lkb2 was the only thing preventing lkb1 from being granted. 2274 * 2275 * That second condition meant we'd only say there was conv-deadlk if 2276 * resolving it (by demotion) would lead to the first lock on the convert 2277 * queue being granted right away. It allowed conversion deadlocks to exist 2278 * between locks on the convert queue while they couldn't be granted anyway. 2279 * 2280 * Now, we detect and take action on conversion deadlocks immediately when 2281 * they're created, even if they may not be immediately consequential. If 2282 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2283 * mode that would prevent lkb1's conversion from being granted, we do a 2284 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2285 * I think this means that the lkb_is_ahead condition below should always 2286 * be zero, i.e. there will never be conv-deadlk between two locks that are 2287 * both already on the convert queue. 2288 */ 2289 2290static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2291{ 2292 struct dlm_lkb *lkb1; 2293 int lkb_is_ahead = 0; 2294 2295 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2296 if (lkb1 == lkb2) { 2297 lkb_is_ahead = 1; 2298 continue; 2299 } 2300 2301 if (!lkb_is_ahead) { 2302 if (!modes_compat(lkb2, lkb1)) 2303 return 1; 2304 } else { 2305 if (!modes_compat(lkb2, lkb1) && 2306 !modes_compat(lkb1, lkb2)) 2307 return 1; 2308 } 2309 } 2310 return 0; 2311} 2312 2313/* 2314 * Return 1 if the lock can be granted, 0 otherwise. 2315 * Also detect and resolve conversion deadlocks. 2316 * 2317 * lkb is the lock to be granted 2318 * 2319 * now is 1 if the function is being called in the context of the 2320 * immediate request, it is 0 if called later, after the lock has been 2321 * queued. 2322 * 2323 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2324 * after recovery. 2325 * 2326 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2327 */ 2328 2329static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2330 int recover) 2331{ 2332 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2333 2334 /* 2335 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2336 * a new request for a NL mode lock being blocked. 2337 * 2338 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2339 * request, then it would be granted. In essence, the use of this flag 2340 * tells the Lock Manager to expedite theis request by not considering 2341 * what may be in the CONVERTING or WAITING queues... As of this 2342 * writing, the EXPEDITE flag can be used only with new requests for NL 2343 * mode locks. This flag is not valid for conversion requests. 2344 * 2345 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2346 * conversion or used with a non-NL requested mode. We also know an 2347 * EXPEDITE request is always granted immediately, so now must always 2348 * be 1. The full condition to grant an expedite request: (now && 2349 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2350 * therefore be shortened to just checking the flag. 2351 */ 2352 2353 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2354 return 1; 2355 2356 /* 2357 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2358 * added to the remaining conditions. 2359 */ 2360 2361 if (queue_conflict(&r->res_grantqueue, lkb)) 2362 return 0; 2363 2364 /* 2365 * 6-3: By default, a conversion request is immediately granted if the 2366 * requested mode is compatible with the modes of all other granted 2367 * locks 2368 */ 2369 2370 if (queue_conflict(&r->res_convertqueue, lkb)) 2371 return 0; 2372 2373 /* 2374 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2375 * locks for a recovered rsb, on which lkb's have been rebuilt. 2376 * The lkb's may have been rebuilt on the queues in a different 2377 * order than they were in on the previous master. So, granting 2378 * queued conversions in order after recovery doesn't make sense 2379 * since the order hasn't been preserved anyway. The new order 2380 * could also have created a new "in place" conversion deadlock. 2381 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2382 * After recovery, there would be no granted locks, and possibly 2383 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2384 * recovery, grant conversions without considering order. 2385 */ 2386 2387 if (conv && recover) 2388 return 1; 2389 2390 /* 2391 * 6-5: But the default algorithm for deciding whether to grant or 2392 * queue conversion requests does not by itself guarantee that such 2393 * requests are serviced on a "first come first serve" basis. This, in 2394 * turn, can lead to a phenomenon known as "indefinate postponement". 2395 * 2396 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2397 * the system service employed to request a lock conversion. This flag 2398 * forces certain conversion requests to be queued, even if they are 2399 * compatible with the granted modes of other locks on the same 2400 * resource. Thus, the use of this flag results in conversion requests 2401 * being ordered on a "first come first servce" basis. 2402 * 2403 * DCT: This condition is all about new conversions being able to occur 2404 * "in place" while the lock remains on the granted queue (assuming 2405 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2406 * doesn't _have_ to go onto the convert queue where it's processed in 2407 * order. The "now" variable is necessary to distinguish converts 2408 * being received and processed for the first time now, because once a 2409 * convert is moved to the conversion queue the condition below applies 2410 * requiring fifo granting. 2411 */ 2412 2413 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2414 return 1; 2415 2416 /* 2417 * Even if the convert is compat with all granted locks, 2418 * QUECVT forces it behind other locks on the convert queue. 2419 */ 2420 2421 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2422 if (list_empty(&r->res_convertqueue)) 2423 return 1; 2424 else 2425 return 0; 2426 } 2427 2428 /* 2429 * The NOORDER flag is set to avoid the standard vms rules on grant 2430 * order. 2431 */ 2432 2433 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2434 return 1; 2435 2436 /* 2437 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2438 * granted until all other conversion requests ahead of it are granted 2439 * and/or canceled. 2440 */ 2441 2442 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2443 return 1; 2444 2445 /* 2446 * 6-4: By default, a new request is immediately granted only if all 2447 * three of the following conditions are satisfied when the request is 2448 * issued: 2449 * - The queue of ungranted conversion requests for the resource is 2450 * empty. 2451 * - The queue of ungranted new requests for the resource is empty. 2452 * - The mode of the new request is compatible with the most 2453 * restrictive mode of all granted locks on the resource. 2454 */ 2455 2456 if (now && !conv && list_empty(&r->res_convertqueue) && 2457 list_empty(&r->res_waitqueue)) 2458 return 1; 2459 2460 /* 2461 * 6-4: Once a lock request is in the queue of ungranted new requests, 2462 * it cannot be granted until the queue of ungranted conversion 2463 * requests is empty, all ungranted new requests ahead of it are 2464 * granted and/or canceled, and it is compatible with the granted mode 2465 * of the most restrictive lock granted on the resource. 2466 */ 2467 2468 if (!now && !conv && list_empty(&r->res_convertqueue) && 2469 first_in_list(lkb, &r->res_waitqueue)) 2470 return 1; 2471 2472 return 0; 2473} 2474 2475static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2476 int recover, int *err) 2477{ 2478 int rv; 2479 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2480 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2481 2482 if (err) 2483 *err = 0; 2484 2485 rv = _can_be_granted(r, lkb, now, recover); 2486 if (rv) 2487 goto out; 2488 2489 /* 2490 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2491 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2492 * cancels one of the locks. 2493 */ 2494 2495 if (is_convert && can_be_queued(lkb) && 2496 conversion_deadlock_detect(r, lkb)) { 2497 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2498 lkb->lkb_grmode = DLM_LOCK_NL; 2499 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 2500 } else if (err) { 2501 *err = -EDEADLK; 2502 } else { 2503 log_print("can_be_granted deadlock %x now %d", 2504 lkb->lkb_id, now); 2505 dlm_dump_rsb(r); 2506 } 2507 goto out; 2508 } 2509 2510 /* 2511 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2512 * to grant a request in a mode other than the normal rqmode. It's a 2513 * simple way to provide a big optimization to applications that can 2514 * use them. 2515 */ 2516 2517 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2518 alt = DLM_LOCK_PR; 2519 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2520 alt = DLM_LOCK_CW; 2521 2522 if (alt) { 2523 lkb->lkb_rqmode = alt; 2524 rv = _can_be_granted(r, lkb, now, 0); 2525 if (rv) 2526 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 2527 else 2528 lkb->lkb_rqmode = rqmode; 2529 } 2530 out: 2531 return rv; 2532} 2533 2534/* Returns the highest requested mode of all blocked conversions; sets 2535 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2536 2537static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2538 unsigned int *count) 2539{ 2540 struct dlm_lkb *lkb, *s; 2541 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2542 int hi, demoted, quit, grant_restart, demote_restart; 2543 int deadlk; 2544 2545 quit = 0; 2546 restart: 2547 grant_restart = 0; 2548 demote_restart = 0; 2549 hi = DLM_LOCK_IV; 2550 2551 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2552 demoted = is_demoted(lkb); 2553 deadlk = 0; 2554 2555 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2556 grant_lock_pending(r, lkb); 2557 grant_restart = 1; 2558 if (count) 2559 (*count)++; 2560 continue; 2561 } 2562 2563 if (!demoted && is_demoted(lkb)) { 2564 log_print("WARN: pending demoted %x node %d %s", 2565 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2566 demote_restart = 1; 2567 continue; 2568 } 2569 2570 if (deadlk) { 2571 /* 2572 * If DLM_LKB_NODLKWT flag is set and conversion 2573 * deadlock is detected, we request blocking AST and 2574 * down (or cancel) conversion. 2575 */ 2576 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { 2577 if (lkb->lkb_highbast < lkb->lkb_rqmode) { 2578 queue_bast(r, lkb, lkb->lkb_rqmode); 2579 lkb->lkb_highbast = lkb->lkb_rqmode; 2580 } 2581 } else { 2582 log_print("WARN: pending deadlock %x node %d %s", 2583 lkb->lkb_id, lkb->lkb_nodeid, 2584 r->res_name); 2585 dlm_dump_rsb(r); 2586 } 2587 continue; 2588 } 2589 2590 hi = max_t(int, lkb->lkb_rqmode, hi); 2591 2592 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2593 *cw = 1; 2594 } 2595 2596 if (grant_restart) 2597 goto restart; 2598 if (demote_restart && !quit) { 2599 quit = 1; 2600 goto restart; 2601 } 2602 2603 return max_t(int, high, hi); 2604} 2605 2606static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2607 unsigned int *count) 2608{ 2609 struct dlm_lkb *lkb, *s; 2610 2611 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2612 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2613 grant_lock_pending(r, lkb); 2614 if (count) 2615 (*count)++; 2616 } else { 2617 high = max_t(int, lkb->lkb_rqmode, high); 2618 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2619 *cw = 1; 2620 } 2621 } 2622 2623 return high; 2624} 2625 2626/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2627 on either the convert or waiting queue. 2628 high is the largest rqmode of all locks blocked on the convert or 2629 waiting queue. */ 2630 2631static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2632{ 2633 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2634 if (gr->lkb_highbast < DLM_LOCK_EX) 2635 return 1; 2636 return 0; 2637 } 2638 2639 if (gr->lkb_highbast < high && 2640 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2641 return 1; 2642 return 0; 2643} 2644 2645static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2646{ 2647 struct dlm_lkb *lkb, *s; 2648 int high = DLM_LOCK_IV; 2649 int cw = 0; 2650 2651 if (!is_master(r)) { 2652 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2653 dlm_dump_rsb(r); 2654 return; 2655 } 2656 2657 high = grant_pending_convert(r, high, &cw, count); 2658 high = grant_pending_wait(r, high, &cw, count); 2659 2660 if (high == DLM_LOCK_IV) 2661 return; 2662 2663 /* 2664 * If there are locks left on the wait/convert queue then send blocking 2665 * ASTs to granted locks based on the largest requested mode (high) 2666 * found above. 2667 */ 2668 2669 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2670 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2671 if (cw && high == DLM_LOCK_PR && 2672 lkb->lkb_grmode == DLM_LOCK_PR) 2673 queue_bast(r, lkb, DLM_LOCK_CW); 2674 else 2675 queue_bast(r, lkb, high); 2676 lkb->lkb_highbast = high; 2677 } 2678 } 2679} 2680 2681static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2682{ 2683 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2684 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2685 if (gr->lkb_highbast < DLM_LOCK_EX) 2686 return 1; 2687 return 0; 2688 } 2689 2690 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2691 return 1; 2692 return 0; 2693} 2694 2695static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2696 struct dlm_lkb *lkb) 2697{ 2698 struct dlm_lkb *gr; 2699 2700 list_for_each_entry(gr, head, lkb_statequeue) { 2701 /* skip self when sending basts to convertqueue */ 2702 if (gr == lkb) 2703 continue; 2704 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2705 queue_bast(r, gr, lkb->lkb_rqmode); 2706 gr->lkb_highbast = lkb->lkb_rqmode; 2707 } 2708 } 2709} 2710 2711static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2712{ 2713 send_bast_queue(r, &r->res_grantqueue, lkb); 2714} 2715 2716static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2717{ 2718 send_bast_queue(r, &r->res_grantqueue, lkb); 2719 send_bast_queue(r, &r->res_convertqueue, lkb); 2720} 2721 2722/* set_master(r, lkb) -- set the master nodeid of a resource 2723 2724 The purpose of this function is to set the nodeid field in the given 2725 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2726 known, it can just be copied to the lkb and the function will return 2727 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2728 before it can be copied to the lkb. 2729 2730 When the rsb nodeid is being looked up remotely, the initial lkb 2731 causing the lookup is kept on the ls_waiters list waiting for the 2732 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2733 on the rsb's res_lookup list until the master is verified. 2734 2735 Return values: 2736 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2737 1: the rsb master is not available and the lkb has been placed on 2738 a wait queue 2739*/ 2740 2741static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2742{ 2743 int our_nodeid = dlm_our_nodeid(); 2744 2745 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2746 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2747 r->res_first_lkid = lkb->lkb_id; 2748 lkb->lkb_nodeid = r->res_nodeid; 2749 return 0; 2750 } 2751 2752 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2753 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2754 return 1; 2755 } 2756 2757 if (r->res_master_nodeid == our_nodeid) { 2758 lkb->lkb_nodeid = 0; 2759 return 0; 2760 } 2761 2762 if (r->res_master_nodeid) { 2763 lkb->lkb_nodeid = r->res_master_nodeid; 2764 return 0; 2765 } 2766 2767 if (dlm_dir_nodeid(r) == our_nodeid) { 2768 /* This is a somewhat unusual case; find_rsb will usually 2769 have set res_master_nodeid when dir nodeid is local, but 2770 there are cases where we become the dir node after we've 2771 past find_rsb and go through _request_lock again. 2772 confirm_master() or process_lookup_list() needs to be 2773 called after this. */ 2774 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2775 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2776 r->res_name); 2777 r->res_master_nodeid = our_nodeid; 2778 r->res_nodeid = 0; 2779 lkb->lkb_nodeid = 0; 2780 return 0; 2781 } 2782 2783 wait_pending_remove(r); 2784 2785 r->res_first_lkid = lkb->lkb_id; 2786 send_lookup(r, lkb); 2787 return 1; 2788} 2789 2790static void process_lookup_list(struct dlm_rsb *r) 2791{ 2792 struct dlm_lkb *lkb, *safe; 2793 2794 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2795 list_del_init(&lkb->lkb_rsb_lookup); 2796 _request_lock(r, lkb); 2797 schedule(); 2798 } 2799} 2800 2801/* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2802 2803static void confirm_master(struct dlm_rsb *r, int error) 2804{ 2805 struct dlm_lkb *lkb; 2806 2807 if (!r->res_first_lkid) 2808 return; 2809 2810 switch (error) { 2811 case 0: 2812 case -EINPROGRESS: 2813 r->res_first_lkid = 0; 2814 process_lookup_list(r); 2815 break; 2816 2817 case -EAGAIN: 2818 case -EBADR: 2819 case -ENOTBLK: 2820 /* the remote request failed and won't be retried (it was 2821 a NOQUEUE, or has been canceled/unlocked); make a waiting 2822 lkb the first_lkid */ 2823 2824 r->res_first_lkid = 0; 2825 2826 if (!list_empty(&r->res_lookup)) { 2827 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2828 lkb_rsb_lookup); 2829 list_del_init(&lkb->lkb_rsb_lookup); 2830 r->res_first_lkid = lkb->lkb_id; 2831 _request_lock(r, lkb); 2832 } 2833 break; 2834 2835 default: 2836 log_error(r->res_ls, "confirm_master unknown error %d", error); 2837 } 2838} 2839 2840static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2841 int namelen, unsigned long timeout_cs, 2842 void (*ast) (void *astparam), 2843 void *astparam, 2844 void (*bast) (void *astparam, int mode), 2845 struct dlm_args *args) 2846{ 2847 int rv = -EINVAL; 2848 2849 /* check for invalid arg usage */ 2850 2851 if (mode < 0 || mode > DLM_LOCK_EX) 2852 goto out; 2853 2854 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2855 goto out; 2856 2857 if (flags & DLM_LKF_CANCEL) 2858 goto out; 2859 2860 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2861 goto out; 2862 2863 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2864 goto out; 2865 2866 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2867 goto out; 2868 2869 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2870 goto out; 2871 2872 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2873 goto out; 2874 2875 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2876 goto out; 2877 2878 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2879 goto out; 2880 2881 if (!ast || !lksb) 2882 goto out; 2883 2884 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2885 goto out; 2886 2887 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2888 goto out; 2889 2890 /* these args will be copied to the lkb in validate_lock_args, 2891 it cannot be done now because when converting locks, fields in 2892 an active lkb cannot be modified before locking the rsb */ 2893 2894 args->flags = flags; 2895 args->astfn = ast; 2896 args->astparam = astparam; 2897 args->bastfn = bast; 2898 args->timeout = timeout_cs; 2899 args->mode = mode; 2900 args->lksb = lksb; 2901 rv = 0; 2902 out: 2903 return rv; 2904} 2905 2906static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2907{ 2908 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2909 DLM_LKF_FORCEUNLOCK)) 2910 return -EINVAL; 2911 2912 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2913 return -EINVAL; 2914 2915 args->flags = flags; 2916 args->astparam = astarg; 2917 return 0; 2918} 2919 2920static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2921 struct dlm_args *args) 2922{ 2923 int rv = -EINVAL; 2924 2925 if (args->flags & DLM_LKF_CONVERT) { 2926 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 2927 goto out; 2928 2929 if (args->flags & DLM_LKF_QUECVT && 2930 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2931 goto out; 2932 2933 rv = -EBUSY; 2934 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2935 goto out; 2936 2937 /* lock not allowed if there's any op in progress */ 2938 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 2939 goto out; 2940 2941 if (is_overlap(lkb)) 2942 goto out; 2943 } 2944 2945 lkb->lkb_exflags = args->flags; 2946 lkb->lkb_sbflags = 0; 2947 lkb->lkb_astfn = args->astfn; 2948 lkb->lkb_astparam = args->astparam; 2949 lkb->lkb_bastfn = args->bastfn; 2950 lkb->lkb_rqmode = args->mode; 2951 lkb->lkb_lksb = args->lksb; 2952 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2953 lkb->lkb_ownpid = (int) current->pid; 2954 lkb->lkb_timeout_cs = args->timeout; 2955 rv = 0; 2956 out: 2957 if (rv) 2958 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s", 2959 rv, lkb->lkb_id, lkb->lkb_flags, args->flags, 2960 lkb->lkb_status, lkb->lkb_wait_type, 2961 lkb->lkb_resource->res_name); 2962 return rv; 2963} 2964 2965/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2966 for success */ 2967 2968/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2969 because there may be a lookup in progress and it's valid to do 2970 cancel/unlockf on it */ 2971 2972static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2973{ 2974 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2975 int rv = -EINVAL; 2976 2977 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 2978 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2979 dlm_print_lkb(lkb); 2980 goto out; 2981 } 2982 2983 /* an lkb may still exist even though the lock is EOL'ed due to a 2984 cancel, unlock or failed noqueue request; an app can't use these 2985 locks; return same error as if the lkid had not been found at all */ 2986 2987 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 2988 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2989 rv = -ENOENT; 2990 goto out; 2991 } 2992 2993 /* an lkb may be waiting for an rsb lookup to complete where the 2994 lookup was initiated by another lock */ 2995 2996 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2997 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2998 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2999 list_del_init(&lkb->lkb_rsb_lookup); 3000 queue_cast(lkb->lkb_resource, lkb, 3001 args->flags & DLM_LKF_CANCEL ? 3002 -DLM_ECANCEL : -DLM_EUNLOCK); 3003 unhold_lkb(lkb); /* undoes create_lkb() */ 3004 } 3005 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 3006 rv = -EBUSY; 3007 goto out; 3008 } 3009 3010 /* cancel not allowed with another cancel/unlock in progress */ 3011 3012 if (args->flags & DLM_LKF_CANCEL) { 3013 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 3014 goto out; 3015 3016 if (is_overlap(lkb)) 3017 goto out; 3018 3019 /* don't let scand try to do a cancel */ 3020 del_timeout(lkb); 3021 3022 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3023 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3024 rv = -EBUSY; 3025 goto out; 3026 } 3027 3028 /* there's nothing to cancel */ 3029 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 3030 !lkb->lkb_wait_type) { 3031 rv = -EBUSY; 3032 goto out; 3033 } 3034 3035 switch (lkb->lkb_wait_type) { 3036 case DLM_MSG_LOOKUP: 3037 case DLM_MSG_REQUEST: 3038 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3039 rv = -EBUSY; 3040 goto out; 3041 case DLM_MSG_UNLOCK: 3042 case DLM_MSG_CANCEL: 3043 goto out; 3044 } 3045 /* add_to_waiters() will set OVERLAP_CANCEL */ 3046 goto out_ok; 3047 } 3048 3049 /* do we need to allow a force-unlock if there's a normal unlock 3050 already in progress? in what conditions could the normal unlock 3051 fail such that we'd want to send a force-unlock to be sure? */ 3052 3053 if (args->flags & DLM_LKF_FORCEUNLOCK) { 3054 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 3055 goto out; 3056 3057 if (is_overlap_unlock(lkb)) 3058 goto out; 3059 3060 /* don't let scand try to do a cancel */ 3061 del_timeout(lkb); 3062 3063 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3064 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3065 rv = -EBUSY; 3066 goto out; 3067 } 3068 3069 switch (lkb->lkb_wait_type) { 3070 case DLM_MSG_LOOKUP: 3071 case DLM_MSG_REQUEST: 3072 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3073 rv = -EBUSY; 3074 goto out; 3075 case DLM_MSG_UNLOCK: 3076 goto out; 3077 } 3078 /* add_to_waiters() will set OVERLAP_UNLOCK */ 3079 goto out_ok; 3080 } 3081 3082 /* normal unlock not allowed if there's any op in progress */ 3083 rv = -EBUSY; 3084 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 3085 goto out; 3086 3087 out_ok: 3088 /* an overlapping op shouldn't blow away exflags from other op */ 3089 lkb->lkb_exflags |= args->flags; 3090 lkb->lkb_sbflags = 0; 3091 lkb->lkb_astparam = args->astparam; 3092 rv = 0; 3093 out: 3094 if (rv) 3095 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, 3096 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 3097 args->flags, lkb->lkb_wait_type, 3098 lkb->lkb_resource->res_name); 3099 return rv; 3100} 3101 3102/* 3103 * Four stage 4 varieties: 3104 * do_request(), do_convert(), do_unlock(), do_cancel() 3105 * These are called on the master node for the given lock and 3106 * from the central locking logic. 3107 */ 3108 3109static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3110{ 3111 int error = 0; 3112 3113 if (can_be_granted(r, lkb, 1, 0, NULL)) { 3114 grant_lock(r, lkb); 3115 queue_cast(r, lkb, 0); 3116 goto out; 3117 } 3118 3119 if (can_be_queued(lkb)) { 3120 error = -EINPROGRESS; 3121 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3122 add_timeout(lkb); 3123 goto out; 3124 } 3125 3126 error = -EAGAIN; 3127 queue_cast(r, lkb, -EAGAIN); 3128 out: 3129 return error; 3130} 3131 3132static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3133 int error) 3134{ 3135 switch (error) { 3136 case -EAGAIN: 3137 if (force_blocking_asts(lkb)) 3138 send_blocking_asts_all(r, lkb); 3139 break; 3140 case -EINPROGRESS: 3141 send_blocking_asts(r, lkb); 3142 break; 3143 } 3144} 3145 3146static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3147{ 3148 int error = 0; 3149 int deadlk = 0; 3150 3151 /* changing an existing lock may allow others to be granted */ 3152 3153 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 3154 grant_lock(r, lkb); 3155 queue_cast(r, lkb, 0); 3156 goto out; 3157 } 3158 3159 /* can_be_granted() detected that this lock would block in a conversion 3160 deadlock, so we leave it on the granted queue and return EDEADLK in 3161 the ast for the convert. */ 3162 3163 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 3164 /* it's left on the granted queue */ 3165 revert_lock(r, lkb); 3166 queue_cast(r, lkb, -EDEADLK); 3167 error = -EDEADLK; 3168 goto out; 3169 } 3170 3171 /* is_demoted() means the can_be_granted() above set the grmode 3172 to NL, and left us on the granted queue. This auto-demotion 3173 (due to CONVDEADLK) might mean other locks, and/or this lock, are 3174 now grantable. We have to try to grant other converting locks 3175 before we try again to grant this one. */ 3176 3177 if (is_demoted(lkb)) { 3178 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3179 if (_can_be_granted(r, lkb, 1, 0)) { 3180 grant_lock(r, lkb); 3181 queue_cast(r, lkb, 0); 3182 goto out; 3183 } 3184 /* else fall through and move to convert queue */ 3185 } 3186 3187 if (can_be_queued(lkb)) { 3188 error = -EINPROGRESS; 3189 del_lkb(r, lkb); 3190 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3191 add_timeout(lkb); 3192 goto out; 3193 } 3194 3195 error = -EAGAIN; 3196 queue_cast(r, lkb, -EAGAIN); 3197 out: 3198 return error; 3199} 3200 3201static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3202 int error) 3203{ 3204 switch (error) { 3205 case 0: 3206 grant_pending_locks(r, NULL); 3207 /* grant_pending_locks also sends basts */ 3208 break; 3209 case -EAGAIN: 3210 if (force_blocking_asts(lkb)) 3211 send_blocking_asts_all(r, lkb); 3212 break; 3213 case -EINPROGRESS: 3214 send_blocking_asts(r, lkb); 3215 break; 3216 } 3217} 3218 3219static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3220{ 3221 remove_lock(r, lkb); 3222 queue_cast(r, lkb, -DLM_EUNLOCK); 3223 return -DLM_EUNLOCK; 3224} 3225 3226static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3227 int error) 3228{ 3229 grant_pending_locks(r, NULL); 3230} 3231 3232/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3233 3234static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3235{ 3236 int error; 3237 3238 error = revert_lock(r, lkb); 3239 if (error) { 3240 queue_cast(r, lkb, -DLM_ECANCEL); 3241 return -DLM_ECANCEL; 3242 } 3243 return 0; 3244} 3245 3246static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3247 int error) 3248{ 3249 if (error) 3250 grant_pending_locks(r, NULL); 3251} 3252 3253/* 3254 * Four stage 3 varieties: 3255 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3256 */ 3257 3258/* add a new lkb to a possibly new rsb, called by requesting process */ 3259 3260static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3261{ 3262 int error; 3263 3264 /* set_master: sets lkb nodeid from r */ 3265 3266 error = set_master(r, lkb); 3267 if (error < 0) 3268 goto out; 3269 if (error) { 3270 error = 0; 3271 goto out; 3272 } 3273 3274 if (is_remote(r)) { 3275 /* receive_request() calls do_request() on remote node */ 3276 error = send_request(r, lkb); 3277 } else { 3278 error = do_request(r, lkb); 3279 /* for remote locks the request_reply is sent 3280 between do_request and do_request_effects */ 3281 do_request_effects(r, lkb, error); 3282 } 3283 out: 3284 return error; 3285} 3286 3287/* change some property of an existing lkb, e.g. mode */ 3288 3289static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3290{ 3291 int error; 3292 3293 if (is_remote(r)) { 3294 /* receive_convert() calls do_convert() on remote node */ 3295 error = send_convert(r, lkb); 3296 } else { 3297 error = do_convert(r, lkb); 3298 /* for remote locks the convert_reply is sent 3299 between do_convert and do_convert_effects */ 3300 do_convert_effects(r, lkb, error); 3301 } 3302 3303 return error; 3304} 3305 3306/* remove an existing lkb from the granted queue */ 3307 3308static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3309{ 3310 int error; 3311 3312 if (is_remote(r)) { 3313 /* receive_unlock() calls do_unlock() on remote node */ 3314 error = send_unlock(r, lkb); 3315 } else { 3316 error = do_unlock(r, lkb); 3317 /* for remote locks the unlock_reply is sent 3318 between do_unlock and do_unlock_effects */ 3319 do_unlock_effects(r, lkb, error); 3320 } 3321 3322 return error; 3323} 3324 3325/* remove an existing lkb from the convert or wait queue */ 3326 3327static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3328{ 3329 int error; 3330 3331 if (is_remote(r)) { 3332 /* receive_cancel() calls do_cancel() on remote node */ 3333 error = send_cancel(r, lkb); 3334 } else { 3335 error = do_cancel(r, lkb); 3336 /* for remote locks the cancel_reply is sent 3337 between do_cancel and do_cancel_effects */ 3338 do_cancel_effects(r, lkb, error); 3339 } 3340 3341 return error; 3342} 3343 3344/* 3345 * Four stage 2 varieties: 3346 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3347 */ 3348 3349static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 3350 int len, struct dlm_args *args) 3351{ 3352 struct dlm_rsb *r; 3353 int error; 3354 3355 error = validate_lock_args(ls, lkb, args); 3356 if (error) 3357 return error; 3358 3359 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3360 if (error) 3361 return error; 3362 3363 lock_rsb(r); 3364 3365 attach_lkb(r, lkb); 3366 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3367 3368 error = _request_lock(r, lkb); 3369 3370 unlock_rsb(r); 3371 put_rsb(r); 3372 return error; 3373} 3374 3375static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3376 struct dlm_args *args) 3377{ 3378 struct dlm_rsb *r; 3379 int error; 3380 3381 r = lkb->lkb_resource; 3382 3383 hold_rsb(r); 3384 lock_rsb(r); 3385 3386 error = validate_lock_args(ls, lkb, args); 3387 if (error) 3388 goto out; 3389 3390 error = _convert_lock(r, lkb); 3391 out: 3392 unlock_rsb(r); 3393 put_rsb(r); 3394 return error; 3395} 3396 3397static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3398 struct dlm_args *args) 3399{ 3400 struct dlm_rsb *r; 3401 int error; 3402 3403 r = lkb->lkb_resource; 3404 3405 hold_rsb(r); 3406 lock_rsb(r); 3407 3408 error = validate_unlock_args(lkb, args); 3409 if (error) 3410 goto out; 3411 3412 error = _unlock_lock(r, lkb); 3413 out: 3414 unlock_rsb(r); 3415 put_rsb(r); 3416 return error; 3417} 3418 3419static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3420 struct dlm_args *args) 3421{ 3422 struct dlm_rsb *r; 3423 int error; 3424 3425 r = lkb->lkb_resource; 3426 3427 hold_rsb(r); 3428 lock_rsb(r); 3429 3430 error = validate_unlock_args(lkb, args); 3431 if (error) 3432 goto out; 3433 3434 error = _cancel_lock(r, lkb); 3435 out: 3436 unlock_rsb(r); 3437 put_rsb(r); 3438 return error; 3439} 3440 3441/* 3442 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3443 */ 3444 3445int dlm_lock(dlm_lockspace_t *lockspace, 3446 int mode, 3447 struct dlm_lksb *lksb, 3448 uint32_t flags, 3449 void *name, 3450 unsigned int namelen, 3451 uint32_t parent_lkid, 3452 void (*ast) (void *astarg), 3453 void *astarg, 3454 void (*bast) (void *astarg, int mode)) 3455{ 3456 struct dlm_ls *ls; 3457 struct dlm_lkb *lkb; 3458 struct dlm_args args; 3459 int error, convert = flags & DLM_LKF_CONVERT; 3460 3461 ls = dlm_find_lockspace_local(lockspace); 3462 if (!ls) 3463 return -EINVAL; 3464 3465 dlm_lock_recovery(ls); 3466 3467 if (convert) 3468 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3469 else 3470 error = create_lkb(ls, &lkb); 3471 3472 if (error) 3473 goto out; 3474 3475 trace_dlm_lock_start(ls, lkb, mode, flags); 3476 3477 error = set_lock_args(mode, lksb, flags, namelen, 0, ast, 3478 astarg, bast, &args); 3479 if (error) 3480 goto out_put; 3481 3482 if (convert) 3483 error = convert_lock(ls, lkb, &args); 3484 else 3485 error = request_lock(ls, lkb, name, namelen, &args); 3486 3487 if (error == -EINPROGRESS) 3488 error = 0; 3489 out_put: 3490 trace_dlm_lock_end(ls, lkb, mode, flags, error); 3491 3492 if (convert || error) 3493 __put_lkb(ls, lkb); 3494 if (error == -EAGAIN || error == -EDEADLK) 3495 error = 0; 3496 out: 3497 dlm_unlock_recovery(ls); 3498 dlm_put_lockspace(ls); 3499 return error; 3500} 3501 3502int dlm_unlock(dlm_lockspace_t *lockspace, 3503 uint32_t lkid, 3504 uint32_t flags, 3505 struct dlm_lksb *lksb, 3506 void *astarg) 3507{ 3508 struct dlm_ls *ls; 3509 struct dlm_lkb *lkb; 3510 struct dlm_args args; 3511 int error; 3512 3513 ls = dlm_find_lockspace_local(lockspace); 3514 if (!ls) 3515 return -EINVAL; 3516 3517 dlm_lock_recovery(ls); 3518 3519 error = find_lkb(ls, lkid, &lkb); 3520 if (error) 3521 goto out; 3522 3523 trace_dlm_unlock_start(ls, lkb, flags); 3524 3525 error = set_unlock_args(flags, astarg, &args); 3526 if (error) 3527 goto out_put; 3528 3529 if (flags & DLM_LKF_CANCEL) 3530 error = cancel_lock(ls, lkb, &args); 3531 else 3532 error = unlock_lock(ls, lkb, &args); 3533 3534 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3535 error = 0; 3536 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3537 error = 0; 3538 out_put: 3539 trace_dlm_unlock_end(ls, lkb, flags, error); 3540 3541 dlm_put_lkb(lkb); 3542 out: 3543 dlm_unlock_recovery(ls); 3544 dlm_put_lockspace(ls); 3545 return error; 3546} 3547 3548/* 3549 * send/receive routines for remote operations and replies 3550 * 3551 * send_args 3552 * send_common 3553 * send_request receive_request 3554 * send_convert receive_convert 3555 * send_unlock receive_unlock 3556 * send_cancel receive_cancel 3557 * send_grant receive_grant 3558 * send_bast receive_bast 3559 * send_lookup receive_lookup 3560 * send_remove receive_remove 3561 * 3562 * send_common_reply 3563 * receive_request_reply send_request_reply 3564 * receive_convert_reply send_convert_reply 3565 * receive_unlock_reply send_unlock_reply 3566 * receive_cancel_reply send_cancel_reply 3567 * receive_lookup_reply send_lookup_reply 3568 */ 3569 3570static int _create_message(struct dlm_ls *ls, int mb_len, 3571 int to_nodeid, int mstype, 3572 struct dlm_message **ms_ret, 3573 struct dlm_mhandle **mh_ret) 3574{ 3575 struct dlm_message *ms; 3576 struct dlm_mhandle *mh; 3577 char *mb; 3578 3579 /* get_buffer gives us a message handle (mh) that we need to 3580 pass into midcomms_commit and a message buffer (mb) that we 3581 write our data into */ 3582 3583 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); 3584 if (!mh) 3585 return -ENOBUFS; 3586 3587 ms = (struct dlm_message *) mb; 3588 3589 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3590 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 3591 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 3592 ms->m_header.h_length = cpu_to_le16(mb_len); 3593 ms->m_header.h_cmd = DLM_MSG; 3594 3595 ms->m_type = cpu_to_le32(mstype); 3596 3597 *mh_ret = mh; 3598 *ms_ret = ms; 3599 return 0; 3600} 3601 3602static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3603 int to_nodeid, int mstype, 3604 struct dlm_message **ms_ret, 3605 struct dlm_mhandle **mh_ret) 3606{ 3607 int mb_len = sizeof(struct dlm_message); 3608 3609 switch (mstype) { 3610 case DLM_MSG_REQUEST: 3611 case DLM_MSG_LOOKUP: 3612 case DLM_MSG_REMOVE: 3613 mb_len += r->res_length; 3614 break; 3615 case DLM_MSG_CONVERT: 3616 case DLM_MSG_UNLOCK: 3617 case DLM_MSG_REQUEST_REPLY: 3618 case DLM_MSG_CONVERT_REPLY: 3619 case DLM_MSG_GRANT: 3620 if (lkb && lkb->lkb_lvbptr) 3621 mb_len += r->res_ls->ls_lvblen; 3622 break; 3623 } 3624 3625 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3626 ms_ret, mh_ret); 3627} 3628 3629/* further lowcomms enhancements or alternate implementations may make 3630 the return value from this function useful at some point */ 3631 3632static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 3633{ 3634 dlm_midcomms_commit_mhandle(mh); 3635 return 0; 3636} 3637 3638static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3639 struct dlm_message *ms) 3640{ 3641 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); 3642 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); 3643 ms->m_lkid = cpu_to_le32(lkb->lkb_id); 3644 ms->m_remid = cpu_to_le32(lkb->lkb_remid); 3645 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); 3646 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags); 3647 ms->m_flags = cpu_to_le32(lkb->lkb_flags); 3648 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 3649 ms->m_status = cpu_to_le32(lkb->lkb_status); 3650 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); 3651 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); 3652 ms->m_hash = cpu_to_le32(r->res_hash); 3653 3654 /* m_result and m_bastmode are set from function args, 3655 not from lkb fields */ 3656 3657 if (lkb->lkb_bastfn) 3658 ms->m_asts |= cpu_to_le32(DLM_CB_BAST); 3659 if (lkb->lkb_astfn) 3660 ms->m_asts |= cpu_to_le32(DLM_CB_CAST); 3661 3662 /* compare with switch in create_message; send_remove() doesn't 3663 use send_args() */ 3664 3665 switch (ms->m_type) { 3666 case cpu_to_le32(DLM_MSG_REQUEST): 3667 case cpu_to_le32(DLM_MSG_LOOKUP): 3668 memcpy(ms->m_extra, r->res_name, r->res_length); 3669 break; 3670 case cpu_to_le32(DLM_MSG_CONVERT): 3671 case cpu_to_le32(DLM_MSG_UNLOCK): 3672 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3673 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3674 case cpu_to_le32(DLM_MSG_GRANT): 3675 if (!lkb->lkb_lvbptr) 3676 break; 3677 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3678 break; 3679 } 3680} 3681 3682static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3683{ 3684 struct dlm_message *ms; 3685 struct dlm_mhandle *mh; 3686 int to_nodeid, error; 3687 3688 to_nodeid = r->res_nodeid; 3689 3690 error = add_to_waiters(lkb, mstype, to_nodeid); 3691 if (error) 3692 return error; 3693 3694 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3695 if (error) 3696 goto fail; 3697 3698 send_args(r, lkb, ms); 3699 3700 error = send_message(mh, ms); 3701 if (error) 3702 goto fail; 3703 return 0; 3704 3705 fail: 3706 remove_from_waiters(lkb, msg_reply_type(mstype)); 3707 return error; 3708} 3709 3710static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3711{ 3712 return send_common(r, lkb, DLM_MSG_REQUEST); 3713} 3714 3715static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3716{ 3717 int error; 3718 3719 error = send_common(r, lkb, DLM_MSG_CONVERT); 3720 3721 /* down conversions go without a reply from the master */ 3722 if (!error && down_conversion(lkb)) { 3723 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3724 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 3725 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 3726 r->res_ls->ls_stub_ms.m_result = 0; 3727 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 3728 } 3729 3730 return error; 3731} 3732 3733/* FIXME: if this lkb is the only lock we hold on the rsb, then set 3734 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3735 that the master is still correct. */ 3736 3737static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3738{ 3739 return send_common(r, lkb, DLM_MSG_UNLOCK); 3740} 3741 3742static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3743{ 3744 return send_common(r, lkb, DLM_MSG_CANCEL); 3745} 3746 3747static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3748{ 3749 struct dlm_message *ms; 3750 struct dlm_mhandle *mh; 3751 int to_nodeid, error; 3752 3753 to_nodeid = lkb->lkb_nodeid; 3754 3755 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 3756 if (error) 3757 goto out; 3758 3759 send_args(r, lkb, ms); 3760 3761 ms->m_result = 0; 3762 3763 error = send_message(mh, ms); 3764 out: 3765 return error; 3766} 3767 3768static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3769{ 3770 struct dlm_message *ms; 3771 struct dlm_mhandle *mh; 3772 int to_nodeid, error; 3773 3774 to_nodeid = lkb->lkb_nodeid; 3775 3776 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 3777 if (error) 3778 goto out; 3779 3780 send_args(r, lkb, ms); 3781 3782 ms->m_bastmode = cpu_to_le32(mode); 3783 3784 error = send_message(mh, ms); 3785 out: 3786 return error; 3787} 3788 3789static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3790{ 3791 struct dlm_message *ms; 3792 struct dlm_mhandle *mh; 3793 int to_nodeid, error; 3794 3795 to_nodeid = dlm_dir_nodeid(r); 3796 3797 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3798 if (error) 3799 return error; 3800 3801 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3802 if (error) 3803 goto fail; 3804 3805 send_args(r, lkb, ms); 3806 3807 error = send_message(mh, ms); 3808 if (error) 3809 goto fail; 3810 return 0; 3811 3812 fail: 3813 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3814 return error; 3815} 3816 3817static int send_remove(struct dlm_rsb *r) 3818{ 3819 struct dlm_message *ms; 3820 struct dlm_mhandle *mh; 3821 int to_nodeid, error; 3822 3823 to_nodeid = dlm_dir_nodeid(r); 3824 3825 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 3826 if (error) 3827 goto out; 3828 3829 memcpy(ms->m_extra, r->res_name, r->res_length); 3830 ms->m_hash = cpu_to_le32(r->res_hash); 3831 3832 error = send_message(mh, ms); 3833 out: 3834 return error; 3835} 3836 3837static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3838 int mstype, int rv) 3839{ 3840 struct dlm_message *ms; 3841 struct dlm_mhandle *mh; 3842 int to_nodeid, error; 3843 3844 to_nodeid = lkb->lkb_nodeid; 3845 3846 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3847 if (error) 3848 goto out; 3849 3850 send_args(r, lkb, ms); 3851 3852 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3853 3854 error = send_message(mh, ms); 3855 out: 3856 return error; 3857} 3858 3859static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3860{ 3861 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3862} 3863 3864static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3865{ 3866 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3867} 3868 3869static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3870{ 3871 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3872} 3873 3874static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3875{ 3876 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3877} 3878 3879static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 3880 int ret_nodeid, int rv) 3881{ 3882 struct dlm_rsb *r = &ls->ls_stub_rsb; 3883 struct dlm_message *ms; 3884 struct dlm_mhandle *mh; 3885 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); 3886 3887 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 3888 if (error) 3889 goto out; 3890 3891 ms->m_lkid = ms_in->m_lkid; 3892 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3893 ms->m_nodeid = cpu_to_le32(ret_nodeid); 3894 3895 error = send_message(mh, ms); 3896 out: 3897 return error; 3898} 3899 3900/* which args we save from a received message depends heavily on the type 3901 of message, unlike the send side where we can safely send everything about 3902 the lkb for any type of message */ 3903 3904static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 3905{ 3906 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); 3907 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); 3908 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3909 (le32_to_cpu(ms->m_flags) & 0x0000FFFF); 3910} 3911 3912static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3913{ 3914 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS)) 3915 return; 3916 3917 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); 3918 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3919 (le32_to_cpu(ms->m_flags) & 0x0000FFFF); 3920} 3921 3922static int receive_extralen(struct dlm_message *ms) 3923{ 3924 return (le16_to_cpu(ms->m_header.h_length) - 3925 sizeof(struct dlm_message)); 3926} 3927 3928static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3929 struct dlm_message *ms) 3930{ 3931 int len; 3932 3933 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3934 if (!lkb->lkb_lvbptr) 3935 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3936 if (!lkb->lkb_lvbptr) 3937 return -ENOMEM; 3938 len = receive_extralen(ms); 3939 if (len > ls->ls_lvblen) 3940 len = ls->ls_lvblen; 3941 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3942 } 3943 return 0; 3944} 3945 3946static void fake_bastfn(void *astparam, int mode) 3947{ 3948 log_print("fake_bastfn should not be called"); 3949} 3950 3951static void fake_astfn(void *astparam) 3952{ 3953 log_print("fake_astfn should not be called"); 3954} 3955 3956static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3957 struct dlm_message *ms) 3958{ 3959 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3960 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); 3961 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3962 lkb->lkb_grmode = DLM_LOCK_IV; 3963 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3964 3965 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; 3966 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; 3967 3968 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3969 /* lkb was just created so there won't be an lvb yet */ 3970 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3971 if (!lkb->lkb_lvbptr) 3972 return -ENOMEM; 3973 } 3974 3975 return 0; 3976} 3977 3978static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3979 struct dlm_message *ms) 3980{ 3981 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3982 return -EBUSY; 3983 3984 if (receive_lvb(ls, lkb, ms)) 3985 return -ENOMEM; 3986 3987 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3988 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 3989 3990 return 0; 3991} 3992 3993static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3994 struct dlm_message *ms) 3995{ 3996 if (receive_lvb(ls, lkb, ms)) 3997 return -ENOMEM; 3998 return 0; 3999} 4000 4001/* We fill in the stub-lkb fields with the info that send_xxxx_reply() 4002 uses to send a reply and that the remote end uses to process the reply. */ 4003 4004static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 4005{ 4006 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 4007 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4008 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 4009} 4010 4011/* This is called after the rsb is locked so that we can safely inspect 4012 fields in the lkb. */ 4013 4014static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) 4015{ 4016 int from = le32_to_cpu(ms->m_header.h_nodeid); 4017 int error = 0; 4018 4019 /* currently mixing of user/kernel locks are not supported */ 4020 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) && 4021 ~lkb->lkb_flags & DLM_IFL_USER) { 4022 log_error(lkb->lkb_resource->res_ls, 4023 "got user dlm message for a kernel lock"); 4024 error = -EINVAL; 4025 goto out; 4026 } 4027 4028 switch (ms->m_type) { 4029 case cpu_to_le32(DLM_MSG_CONVERT): 4030 case cpu_to_le32(DLM_MSG_UNLOCK): 4031 case cpu_to_le32(DLM_MSG_CANCEL): 4032 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 4033 error = -EINVAL; 4034 break; 4035 4036 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4037 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4038 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4039 case cpu_to_le32(DLM_MSG_GRANT): 4040 case cpu_to_le32(DLM_MSG_BAST): 4041 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 4042 error = -EINVAL; 4043 break; 4044 4045 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4046 if (!is_process_copy(lkb)) 4047 error = -EINVAL; 4048 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 4049 error = -EINVAL; 4050 break; 4051 4052 default: 4053 error = -EINVAL; 4054 } 4055 4056out: 4057 if (error) 4058 log_error(lkb->lkb_resource->res_ls, 4059 "ignore invalid message %d from %d %x %x %x %d", 4060 le32_to_cpu(ms->m_type), from, lkb->lkb_id, 4061 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid); 4062 return error; 4063} 4064 4065static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) 4066{ 4067 char name[DLM_RESNAME_MAXLEN + 1]; 4068 struct dlm_message *ms; 4069 struct dlm_mhandle *mh; 4070 struct dlm_rsb *r; 4071 uint32_t hash, b; 4072 int rv, dir_nodeid; 4073 4074 memset(name, 0, sizeof(name)); 4075 memcpy(name, ms_name, len); 4076 4077 hash = jhash(name, len, 0); 4078 b = hash & (ls->ls_rsbtbl_size - 1); 4079 4080 dir_nodeid = dlm_hash2nodeid(ls, hash); 4081 4082 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); 4083 4084 spin_lock(&ls->ls_rsbtbl[b].lock); 4085 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4086 if (!rv) { 4087 spin_unlock(&ls->ls_rsbtbl[b].lock); 4088 log_error(ls, "repeat_remove on keep %s", name); 4089 return; 4090 } 4091 4092 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4093 if (!rv) { 4094 spin_unlock(&ls->ls_rsbtbl[b].lock); 4095 log_error(ls, "repeat_remove on toss %s", name); 4096 return; 4097 } 4098 4099 /* use ls->remove_name2 to avoid conflict with shrink? */ 4100 4101 spin_lock(&ls->ls_remove_spin); 4102 ls->ls_remove_len = len; 4103 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 4104 spin_unlock(&ls->ls_remove_spin); 4105 spin_unlock(&ls->ls_rsbtbl[b].lock); 4106 4107 rv = _create_message(ls, sizeof(struct dlm_message) + len, 4108 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); 4109 if (rv) 4110 goto out; 4111 4112 memcpy(ms->m_extra, name, len); 4113 ms->m_hash = cpu_to_le32(hash); 4114 4115 send_message(mh, ms); 4116 4117out: 4118 spin_lock(&ls->ls_remove_spin); 4119 ls->ls_remove_len = 0; 4120 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 4121 spin_unlock(&ls->ls_remove_spin); 4122 wake_up(&ls->ls_remove_wait); 4123} 4124 4125static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 4126{ 4127 struct dlm_lkb *lkb; 4128 struct dlm_rsb *r; 4129 int from_nodeid; 4130 int error, namelen = 0; 4131 4132 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4133 4134 error = create_lkb(ls, &lkb); 4135 if (error) 4136 goto fail; 4137 4138 receive_flags(lkb, ms); 4139 lkb->lkb_flags |= DLM_IFL_MSTCPY; 4140 error = receive_request_args(ls, lkb, ms); 4141 if (error) { 4142 __put_lkb(ls, lkb); 4143 goto fail; 4144 } 4145 4146 /* The dir node is the authority on whether we are the master 4147 for this rsb or not, so if the master sends us a request, we should 4148 recreate the rsb if we've destroyed it. This race happens when we 4149 send a remove message to the dir node at the same time that the dir 4150 node sends us a request for the rsb. */ 4151 4152 namelen = receive_extralen(ms); 4153 4154 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 4155 R_RECEIVE_REQUEST, &r); 4156 if (error) { 4157 __put_lkb(ls, lkb); 4158 goto fail; 4159 } 4160 4161 lock_rsb(r); 4162 4163 if (r->res_master_nodeid != dlm_our_nodeid()) { 4164 error = validate_master_nodeid(ls, r, from_nodeid); 4165 if (error) { 4166 unlock_rsb(r); 4167 put_rsb(r); 4168 __put_lkb(ls, lkb); 4169 goto fail; 4170 } 4171 } 4172 4173 attach_lkb(r, lkb); 4174 error = do_request(r, lkb); 4175 send_request_reply(r, lkb, error); 4176 do_request_effects(r, lkb, error); 4177 4178 unlock_rsb(r); 4179 put_rsb(r); 4180 4181 if (error == -EINPROGRESS) 4182 error = 0; 4183 if (error) 4184 dlm_put_lkb(lkb); 4185 return 0; 4186 4187 fail: 4188 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 4189 and do this receive_request again from process_lookup_list once 4190 we get the lookup reply. This would avoid a many repeated 4191 ENOTBLK request failures when the lookup reply designating us 4192 as master is delayed. */ 4193 4194 /* We could repeatedly return -EBADR here if our send_remove() is 4195 delayed in being sent/arriving/being processed on the dir node. 4196 Another node would repeatedly lookup up the master, and the dir 4197 node would continue returning our nodeid until our send_remove 4198 took effect. 4199 4200 We send another remove message in case our previous send_remove 4201 was lost/ignored/missed somehow. */ 4202 4203 if (error != -ENOTBLK) { 4204 log_limit(ls, "receive_request %x from %d %d", 4205 le32_to_cpu(ms->m_lkid), from_nodeid, error); 4206 } 4207 4208 if (namelen && error == -EBADR) { 4209 send_repeat_remove(ls, ms->m_extra, namelen); 4210 msleep(1000); 4211 } 4212 4213 setup_stub_lkb(ls, ms); 4214 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4215 return error; 4216} 4217 4218static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 4219{ 4220 struct dlm_lkb *lkb; 4221 struct dlm_rsb *r; 4222 int error, reply = 1; 4223 4224 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4225 if (error) 4226 goto fail; 4227 4228 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4229 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 4230 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 4231 (unsigned long long)lkb->lkb_recover_seq, 4232 le32_to_cpu(ms->m_header.h_nodeid), 4233 le32_to_cpu(ms->m_lkid)); 4234 error = -ENOENT; 4235 dlm_put_lkb(lkb); 4236 goto fail; 4237 } 4238 4239 r = lkb->lkb_resource; 4240 4241 hold_rsb(r); 4242 lock_rsb(r); 4243 4244 error = validate_message(lkb, ms); 4245 if (error) 4246 goto out; 4247 4248 receive_flags(lkb, ms); 4249 4250 error = receive_convert_args(ls, lkb, ms); 4251 if (error) { 4252 send_convert_reply(r, lkb, error); 4253 goto out; 4254 } 4255 4256 reply = !down_conversion(lkb); 4257 4258 error = do_convert(r, lkb); 4259 if (reply) 4260 send_convert_reply(r, lkb, error); 4261 do_convert_effects(r, lkb, error); 4262 out: 4263 unlock_rsb(r); 4264 put_rsb(r); 4265 dlm_put_lkb(lkb); 4266 return 0; 4267 4268 fail: 4269 setup_stub_lkb(ls, ms); 4270 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4271 return error; 4272} 4273 4274static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 4275{ 4276 struct dlm_lkb *lkb; 4277 struct dlm_rsb *r; 4278 int error; 4279 4280 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4281 if (error) 4282 goto fail; 4283 4284 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4285 log_error(ls, "receive_unlock %x remid %x remote %d %x", 4286 lkb->lkb_id, lkb->lkb_remid, 4287 le32_to_cpu(ms->m_header.h_nodeid), 4288 le32_to_cpu(ms->m_lkid)); 4289 error = -ENOENT; 4290 dlm_put_lkb(lkb); 4291 goto fail; 4292 } 4293 4294 r = lkb->lkb_resource; 4295 4296 hold_rsb(r); 4297 lock_rsb(r); 4298 4299 error = validate_message(lkb, ms); 4300 if (error) 4301 goto out; 4302 4303 receive_flags(lkb, ms); 4304 4305 error = receive_unlock_args(ls, lkb, ms); 4306 if (error) { 4307 send_unlock_reply(r, lkb, error); 4308 goto out; 4309 } 4310 4311 error = do_unlock(r, lkb); 4312 send_unlock_reply(r, lkb, error); 4313 do_unlock_effects(r, lkb, error); 4314 out: 4315 unlock_rsb(r); 4316 put_rsb(r); 4317 dlm_put_lkb(lkb); 4318 return 0; 4319 4320 fail: 4321 setup_stub_lkb(ls, ms); 4322 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4323 return error; 4324} 4325 4326static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 4327{ 4328 struct dlm_lkb *lkb; 4329 struct dlm_rsb *r; 4330 int error; 4331 4332 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4333 if (error) 4334 goto fail; 4335 4336 receive_flags(lkb, ms); 4337 4338 r = lkb->lkb_resource; 4339 4340 hold_rsb(r); 4341 lock_rsb(r); 4342 4343 error = validate_message(lkb, ms); 4344 if (error) 4345 goto out; 4346 4347 error = do_cancel(r, lkb); 4348 send_cancel_reply(r, lkb, error); 4349 do_cancel_effects(r, lkb, error); 4350 out: 4351 unlock_rsb(r); 4352 put_rsb(r); 4353 dlm_put_lkb(lkb); 4354 return 0; 4355 4356 fail: 4357 setup_stub_lkb(ls, ms); 4358 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4359 return error; 4360} 4361 4362static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 4363{ 4364 struct dlm_lkb *lkb; 4365 struct dlm_rsb *r; 4366 int error; 4367 4368 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4369 if (error) 4370 return error; 4371 4372 r = lkb->lkb_resource; 4373 4374 hold_rsb(r); 4375 lock_rsb(r); 4376 4377 error = validate_message(lkb, ms); 4378 if (error) 4379 goto out; 4380 4381 receive_flags_reply(lkb, ms); 4382 if (is_altmode(lkb)) 4383 munge_altmode(lkb, ms); 4384 grant_lock_pc(r, lkb, ms); 4385 queue_cast(r, lkb, 0); 4386 out: 4387 unlock_rsb(r); 4388 put_rsb(r); 4389 dlm_put_lkb(lkb); 4390 return 0; 4391} 4392 4393static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 4394{ 4395 struct dlm_lkb *lkb; 4396 struct dlm_rsb *r; 4397 int error; 4398 4399 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4400 if (error) 4401 return error; 4402 4403 r = lkb->lkb_resource; 4404 4405 hold_rsb(r); 4406 lock_rsb(r); 4407 4408 error = validate_message(lkb, ms); 4409 if (error) 4410 goto out; 4411 4412 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); 4413 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); 4414 out: 4415 unlock_rsb(r); 4416 put_rsb(r); 4417 dlm_put_lkb(lkb); 4418 return 0; 4419} 4420 4421static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4422{ 4423 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4424 4425 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4426 our_nodeid = dlm_our_nodeid(); 4427 4428 len = receive_extralen(ms); 4429 4430 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4431 &ret_nodeid, NULL); 4432 4433 /* Optimization: we're master so treat lookup as a request */ 4434 if (!error && ret_nodeid == our_nodeid) { 4435 receive_request(ls, ms); 4436 return; 4437 } 4438 send_lookup_reply(ls, ms, ret_nodeid, error); 4439} 4440 4441static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4442{ 4443 char name[DLM_RESNAME_MAXLEN+1]; 4444 struct dlm_rsb *r; 4445 uint32_t hash, b; 4446 int rv, len, dir_nodeid, from_nodeid; 4447 4448 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4449 4450 len = receive_extralen(ms); 4451 4452 if (len > DLM_RESNAME_MAXLEN) { 4453 log_error(ls, "receive_remove from %d bad len %d", 4454 from_nodeid, len); 4455 return; 4456 } 4457 4458 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); 4459 if (dir_nodeid != dlm_our_nodeid()) { 4460 log_error(ls, "receive_remove from %d bad nodeid %d", 4461 from_nodeid, dir_nodeid); 4462 return; 4463 } 4464 4465 /* Look for name on rsbtbl.toss, if it's there, kill it. 4466 If it's on rsbtbl.keep, it's being used, and we should ignore this 4467 message. This is an expected race between the dir node sending a 4468 request to the master node at the same time as the master node sends 4469 a remove to the dir node. The resolution to that race is for the 4470 dir node to ignore the remove message, and the master node to 4471 recreate the master rsb when it gets a request from the dir node for 4472 an rsb it doesn't have. */ 4473 4474 memset(name, 0, sizeof(name)); 4475 memcpy(name, ms->m_extra, len); 4476 4477 hash = jhash(name, len, 0); 4478 b = hash & (ls->ls_rsbtbl_size - 1); 4479 4480 spin_lock(&ls->ls_rsbtbl[b].lock); 4481 4482 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4483 if (rv) { 4484 /* verify the rsb is on keep list per comment above */ 4485 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4486 if (rv) { 4487 /* should not happen */ 4488 log_error(ls, "receive_remove from %d not found %s", 4489 from_nodeid, name); 4490 spin_unlock(&ls->ls_rsbtbl[b].lock); 4491 return; 4492 } 4493 if (r->res_master_nodeid != from_nodeid) { 4494 /* should not happen */ 4495 log_error(ls, "receive_remove keep from %d master %d", 4496 from_nodeid, r->res_master_nodeid); 4497 dlm_print_rsb(r); 4498 spin_unlock(&ls->ls_rsbtbl[b].lock); 4499 return; 4500 } 4501 4502 log_debug(ls, "receive_remove from %d master %d first %x %s", 4503 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4504 name); 4505 spin_unlock(&ls->ls_rsbtbl[b].lock); 4506 return; 4507 } 4508 4509 if (r->res_master_nodeid != from_nodeid) { 4510 log_error(ls, "receive_remove toss from %d master %d", 4511 from_nodeid, r->res_master_nodeid); 4512 dlm_print_rsb(r); 4513 spin_unlock(&ls->ls_rsbtbl[b].lock); 4514 return; 4515 } 4516 4517 if (kref_put(&r->res_ref, kill_rsb)) { 4518 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 4519 spin_unlock(&ls->ls_rsbtbl[b].lock); 4520 dlm_free_rsb(r); 4521 } else { 4522 log_error(ls, "receive_remove from %d rsb ref error", 4523 from_nodeid); 4524 dlm_print_rsb(r); 4525 spin_unlock(&ls->ls_rsbtbl[b].lock); 4526 } 4527} 4528 4529static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4530{ 4531 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); 4532} 4533 4534static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 4535{ 4536 struct dlm_lkb *lkb; 4537 struct dlm_rsb *r; 4538 int error, mstype, result; 4539 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4540 4541 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4542 if (error) 4543 return error; 4544 4545 r = lkb->lkb_resource; 4546 hold_rsb(r); 4547 lock_rsb(r); 4548 4549 error = validate_message(lkb, ms); 4550 if (error) 4551 goto out; 4552 4553 mstype = lkb->lkb_wait_type; 4554 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4555 if (error) { 4556 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4557 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), 4558 from_dlm_errno(le32_to_cpu(ms->m_result))); 4559 dlm_dump_rsb(r); 4560 goto out; 4561 } 4562 4563 /* Optimization: the dir node was also the master, so it took our 4564 lookup as a request and sent request reply instead of lookup reply */ 4565 if (mstype == DLM_MSG_LOOKUP) { 4566 r->res_master_nodeid = from_nodeid; 4567 r->res_nodeid = from_nodeid; 4568 lkb->lkb_nodeid = from_nodeid; 4569 } 4570 4571 /* this is the value returned from do_request() on the master */ 4572 result = from_dlm_errno(le32_to_cpu(ms->m_result)); 4573 4574 switch (result) { 4575 case -EAGAIN: 4576 /* request would block (be queued) on remote master */ 4577 queue_cast(r, lkb, -EAGAIN); 4578 confirm_master(r, -EAGAIN); 4579 unhold_lkb(lkb); /* undoes create_lkb() */ 4580 break; 4581 4582 case -EINPROGRESS: 4583 case 0: 4584 /* request was queued or granted on remote master */ 4585 receive_flags_reply(lkb, ms); 4586 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 4587 if (is_altmode(lkb)) 4588 munge_altmode(lkb, ms); 4589 if (result) { 4590 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4591 add_timeout(lkb); 4592 } else { 4593 grant_lock_pc(r, lkb, ms); 4594 queue_cast(r, lkb, 0); 4595 } 4596 confirm_master(r, result); 4597 break; 4598 4599 case -EBADR: 4600 case -ENOTBLK: 4601 /* find_rsb failed to find rsb or rsb wasn't master */ 4602 log_limit(ls, "receive_request_reply %x from %d %d " 4603 "master %d dir %d first %x %s", lkb->lkb_id, 4604 from_nodeid, result, r->res_master_nodeid, 4605 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4606 4607 if (r->res_dir_nodeid != dlm_our_nodeid() && 4608 r->res_master_nodeid != dlm_our_nodeid()) { 4609 /* cause _request_lock->set_master->send_lookup */ 4610 r->res_master_nodeid = 0; 4611 r->res_nodeid = -1; 4612 lkb->lkb_nodeid = -1; 4613 } 4614 4615 if (is_overlap(lkb)) { 4616 /* we'll ignore error in cancel/unlock reply */ 4617 queue_cast_overlap(r, lkb); 4618 confirm_master(r, result); 4619 unhold_lkb(lkb); /* undoes create_lkb() */ 4620 } else { 4621 _request_lock(r, lkb); 4622 4623 if (r->res_master_nodeid == dlm_our_nodeid()) 4624 confirm_master(r, 0); 4625 } 4626 break; 4627 4628 default: 4629 log_error(ls, "receive_request_reply %x error %d", 4630 lkb->lkb_id, result); 4631 } 4632 4633 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 4634 log_debug(ls, "receive_request_reply %x result %d unlock", 4635 lkb->lkb_id, result); 4636 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4637 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4638 send_unlock(r, lkb); 4639 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 4640 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4641 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4642 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4643 send_cancel(r, lkb); 4644 } else { 4645 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4646 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4647 } 4648 out: 4649 unlock_rsb(r); 4650 put_rsb(r); 4651 dlm_put_lkb(lkb); 4652 return 0; 4653} 4654 4655static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4656 struct dlm_message *ms) 4657{ 4658 /* this is the value returned from do_convert() on the master */ 4659 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4660 case -EAGAIN: 4661 /* convert would block (be queued) on remote master */ 4662 queue_cast(r, lkb, -EAGAIN); 4663 break; 4664 4665 case -EDEADLK: 4666 receive_flags_reply(lkb, ms); 4667 revert_lock_pc(r, lkb); 4668 queue_cast(r, lkb, -EDEADLK); 4669 break; 4670 4671 case -EINPROGRESS: 4672 /* convert was queued on remote master */ 4673 receive_flags_reply(lkb, ms); 4674 if (is_demoted(lkb)) 4675 munge_demoted(lkb); 4676 del_lkb(r, lkb); 4677 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4678 add_timeout(lkb); 4679 break; 4680 4681 case 0: 4682 /* convert was granted on remote master */ 4683 receive_flags_reply(lkb, ms); 4684 if (is_demoted(lkb)) 4685 munge_demoted(lkb); 4686 grant_lock_pc(r, lkb, ms); 4687 queue_cast(r, lkb, 0); 4688 break; 4689 4690 default: 4691 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4692 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4693 le32_to_cpu(ms->m_lkid), 4694 from_dlm_errno(le32_to_cpu(ms->m_result))); 4695 dlm_print_rsb(r); 4696 dlm_print_lkb(lkb); 4697 } 4698} 4699 4700static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4701{ 4702 struct dlm_rsb *r = lkb->lkb_resource; 4703 int error; 4704 4705 hold_rsb(r); 4706 lock_rsb(r); 4707 4708 error = validate_message(lkb, ms); 4709 if (error) 4710 goto out; 4711 4712 /* stub reply can happen with waiters_mutex held */ 4713 error = remove_from_waiters_ms(lkb, ms); 4714 if (error) 4715 goto out; 4716 4717 __receive_convert_reply(r, lkb, ms); 4718 out: 4719 unlock_rsb(r); 4720 put_rsb(r); 4721} 4722 4723static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 4724{ 4725 struct dlm_lkb *lkb; 4726 int error; 4727 4728 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4729 if (error) 4730 return error; 4731 4732 _receive_convert_reply(lkb, ms); 4733 dlm_put_lkb(lkb); 4734 return 0; 4735} 4736 4737static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4738{ 4739 struct dlm_rsb *r = lkb->lkb_resource; 4740 int error; 4741 4742 hold_rsb(r); 4743 lock_rsb(r); 4744 4745 error = validate_message(lkb, ms); 4746 if (error) 4747 goto out; 4748 4749 /* stub reply can happen with waiters_mutex held */ 4750 error = remove_from_waiters_ms(lkb, ms); 4751 if (error) 4752 goto out; 4753 4754 /* this is the value returned from do_unlock() on the master */ 4755 4756 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4757 case -DLM_EUNLOCK: 4758 receive_flags_reply(lkb, ms); 4759 remove_lock_pc(r, lkb); 4760 queue_cast(r, lkb, -DLM_EUNLOCK); 4761 break; 4762 case -ENOENT: 4763 break; 4764 default: 4765 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4766 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); 4767 } 4768 out: 4769 unlock_rsb(r); 4770 put_rsb(r); 4771} 4772 4773static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 4774{ 4775 struct dlm_lkb *lkb; 4776 int error; 4777 4778 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4779 if (error) 4780 return error; 4781 4782 _receive_unlock_reply(lkb, ms); 4783 dlm_put_lkb(lkb); 4784 return 0; 4785} 4786 4787static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4788{ 4789 struct dlm_rsb *r = lkb->lkb_resource; 4790 int error; 4791 4792 hold_rsb(r); 4793 lock_rsb(r); 4794 4795 error = validate_message(lkb, ms); 4796 if (error) 4797 goto out; 4798 4799 /* stub reply can happen with waiters_mutex held */ 4800 error = remove_from_waiters_ms(lkb, ms); 4801 if (error) 4802 goto out; 4803 4804 /* this is the value returned from do_cancel() on the master */ 4805 4806 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4807 case -DLM_ECANCEL: 4808 receive_flags_reply(lkb, ms); 4809 revert_lock_pc(r, lkb); 4810 queue_cast(r, lkb, -DLM_ECANCEL); 4811 break; 4812 case 0: 4813 break; 4814 default: 4815 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4816 lkb->lkb_id, 4817 from_dlm_errno(le32_to_cpu(ms->m_result))); 4818 } 4819 out: 4820 unlock_rsb(r); 4821 put_rsb(r); 4822} 4823 4824static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 4825{ 4826 struct dlm_lkb *lkb; 4827 int error; 4828 4829 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4830 if (error) 4831 return error; 4832 4833 _receive_cancel_reply(lkb, ms); 4834 dlm_put_lkb(lkb); 4835 return 0; 4836} 4837 4838static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 4839{ 4840 struct dlm_lkb *lkb; 4841 struct dlm_rsb *r; 4842 int error, ret_nodeid; 4843 int do_lookup_list = 0; 4844 4845 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); 4846 if (error) { 4847 log_error(ls, "%s no lkid %x", __func__, 4848 le32_to_cpu(ms->m_lkid)); 4849 return; 4850 } 4851 4852 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4853 FIXME: will a non-zero error ever be returned? */ 4854 4855 r = lkb->lkb_resource; 4856 hold_rsb(r); 4857 lock_rsb(r); 4858 4859 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4860 if (error) 4861 goto out; 4862 4863 ret_nodeid = le32_to_cpu(ms->m_nodeid); 4864 4865 /* We sometimes receive a request from the dir node for this 4866 rsb before we've received the dir node's loookup_reply for it. 4867 The request from the dir node implies we're the master, so we set 4868 ourself as master in receive_request_reply, and verify here that 4869 we are indeed the master. */ 4870 4871 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4872 /* This should never happen */ 4873 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4874 "master %d dir %d our %d first %x %s", 4875 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4876 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 4877 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4878 } 4879 4880 if (ret_nodeid == dlm_our_nodeid()) { 4881 r->res_master_nodeid = ret_nodeid; 4882 r->res_nodeid = 0; 4883 do_lookup_list = 1; 4884 r->res_first_lkid = 0; 4885 } else if (ret_nodeid == -1) { 4886 /* the remote node doesn't believe it's the dir node */ 4887 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4888 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); 4889 r->res_master_nodeid = 0; 4890 r->res_nodeid = -1; 4891 lkb->lkb_nodeid = -1; 4892 } else { 4893 /* set_master() will set lkb_nodeid from r */ 4894 r->res_master_nodeid = ret_nodeid; 4895 r->res_nodeid = ret_nodeid; 4896 } 4897 4898 if (is_overlap(lkb)) { 4899 log_debug(ls, "receive_lookup_reply %x unlock %x", 4900 lkb->lkb_id, lkb->lkb_flags); 4901 queue_cast_overlap(r, lkb); 4902 unhold_lkb(lkb); /* undoes create_lkb() */ 4903 goto out_list; 4904 } 4905 4906 _request_lock(r, lkb); 4907 4908 out_list: 4909 if (do_lookup_list) 4910 process_lookup_list(r); 4911 out: 4912 unlock_rsb(r); 4913 put_rsb(r); 4914 dlm_put_lkb(lkb); 4915} 4916 4917static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4918 uint32_t saved_seq) 4919{ 4920 int error = 0, noent = 0; 4921 4922 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { 4923 log_limit(ls, "receive %d from non-member %d %x %x %d", 4924 le32_to_cpu(ms->m_type), 4925 le32_to_cpu(ms->m_header.h_nodeid), 4926 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4927 from_dlm_errno(le32_to_cpu(ms->m_result))); 4928 return; 4929 } 4930 4931 switch (ms->m_type) { 4932 4933 /* messages sent to a master node */ 4934 4935 case cpu_to_le32(DLM_MSG_REQUEST): 4936 error = receive_request(ls, ms); 4937 break; 4938 4939 case cpu_to_le32(DLM_MSG_CONVERT): 4940 error = receive_convert(ls, ms); 4941 break; 4942 4943 case cpu_to_le32(DLM_MSG_UNLOCK): 4944 error = receive_unlock(ls, ms); 4945 break; 4946 4947 case cpu_to_le32(DLM_MSG_CANCEL): 4948 noent = 1; 4949 error = receive_cancel(ls, ms); 4950 break; 4951 4952 /* messages sent from a master node (replies to above) */ 4953 4954 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4955 error = receive_request_reply(ls, ms); 4956 break; 4957 4958 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4959 error = receive_convert_reply(ls, ms); 4960 break; 4961 4962 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4963 error = receive_unlock_reply(ls, ms); 4964 break; 4965 4966 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4967 error = receive_cancel_reply(ls, ms); 4968 break; 4969 4970 /* messages sent from a master node (only two types of async msg) */ 4971 4972 case cpu_to_le32(DLM_MSG_GRANT): 4973 noent = 1; 4974 error = receive_grant(ls, ms); 4975 break; 4976 4977 case cpu_to_le32(DLM_MSG_BAST): 4978 noent = 1; 4979 error = receive_bast(ls, ms); 4980 break; 4981 4982 /* messages sent to a dir node */ 4983 4984 case cpu_to_le32(DLM_MSG_LOOKUP): 4985 receive_lookup(ls, ms); 4986 break; 4987 4988 case cpu_to_le32(DLM_MSG_REMOVE): 4989 receive_remove(ls, ms); 4990 break; 4991 4992 /* messages sent from a dir node (remove has no reply) */ 4993 4994 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): 4995 receive_lookup_reply(ls, ms); 4996 break; 4997 4998 /* other messages */ 4999 5000 case cpu_to_le32(DLM_MSG_PURGE): 5001 receive_purge(ls, ms); 5002 break; 5003 5004 default: 5005 log_error(ls, "unknown message type %d", 5006 le32_to_cpu(ms->m_type)); 5007 } 5008 5009 /* 5010 * When checking for ENOENT, we're checking the result of 5011 * find_lkb(m_remid): 5012 * 5013 * The lock id referenced in the message wasn't found. This may 5014 * happen in normal usage for the async messages and cancel, so 5015 * only use log_debug for them. 5016 * 5017 * Some errors are expected and normal. 5018 */ 5019 5020 if (error == -ENOENT && noent) { 5021 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 5022 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 5023 le32_to_cpu(ms->m_header.h_nodeid), 5024 le32_to_cpu(ms->m_lkid), saved_seq); 5025 } else if (error == -ENOENT) { 5026 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 5027 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 5028 le32_to_cpu(ms->m_header.h_nodeid), 5029 le32_to_cpu(ms->m_lkid), saved_seq); 5030 5031 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) 5032 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); 5033 } 5034 5035 if (error == -EINVAL) { 5036 log_error(ls, "receive %d inval from %d lkid %x remid %x " 5037 "saved_seq %u", 5038 le32_to_cpu(ms->m_type), 5039 le32_to_cpu(ms->m_header.h_nodeid), 5040 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 5041 saved_seq); 5042 } 5043} 5044 5045/* If the lockspace is in recovery mode (locking stopped), then normal 5046 messages are saved on the requestqueue for processing after recovery is 5047 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 5048 messages off the requestqueue before we process new ones. This occurs right 5049 after recovery completes when we transition from saving all messages on 5050 requestqueue, to processing all the saved messages, to processing new 5051 messages as they arrive. */ 5052 5053static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 5054 int nodeid) 5055{ 5056 if (dlm_locking_stopped(ls)) { 5057 /* If we were a member of this lockspace, left, and rejoined, 5058 other nodes may still be sending us messages from the 5059 lockspace generation before we left. */ 5060 if (!ls->ls_generation) { 5061 log_limit(ls, "receive %d from %d ignore old gen", 5062 le32_to_cpu(ms->m_type), nodeid); 5063 return; 5064 } 5065 5066 dlm_add_requestqueue(ls, nodeid, ms); 5067 } else { 5068 dlm_wait_requestqueue(ls); 5069 _receive_message(ls, ms, 0); 5070 } 5071} 5072 5073/* This is called by dlm_recoverd to process messages that were saved on 5074 the requestqueue. */ 5075 5076void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 5077 uint32_t saved_seq) 5078{ 5079 _receive_message(ls, ms, saved_seq); 5080} 5081 5082/* This is called by the midcomms layer when something is received for 5083 the lockspace. It could be either a MSG (normal message sent as part of 5084 standard locking activity) or an RCOM (recovery message sent as part of 5085 lockspace recovery). */ 5086 5087void dlm_receive_buffer(union dlm_packet *p, int nodeid) 5088{ 5089 struct dlm_header *hd = &p->header; 5090 struct dlm_ls *ls; 5091 int type = 0; 5092 5093 switch (hd->h_cmd) { 5094 case DLM_MSG: 5095 type = le32_to_cpu(p->message.m_type); 5096 break; 5097 case DLM_RCOM: 5098 type = le32_to_cpu(p->rcom.rc_type); 5099 break; 5100 default: 5101 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 5102 return; 5103 } 5104 5105 if (le32_to_cpu(hd->h_nodeid) != nodeid) { 5106 log_print("invalid h_nodeid %d from %d lockspace %x", 5107 le32_to_cpu(hd->h_nodeid), nodeid, 5108 le32_to_cpu(hd->u.h_lockspace)); 5109 return; 5110 } 5111 5112 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); 5113 if (!ls) { 5114 if (dlm_config.ci_log_debug) { 5115 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 5116 "%u from %d cmd %d type %d\n", 5117 le32_to_cpu(hd->u.h_lockspace), nodeid, 5118 hd->h_cmd, type); 5119 } 5120 5121 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 5122 dlm_send_ls_not_ready(nodeid, &p->rcom); 5123 return; 5124 } 5125 5126 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 5127 be inactive (in this ls) before transitioning to recovery mode */ 5128 5129 down_read(&ls->ls_recv_active); 5130 if (hd->h_cmd == DLM_MSG) 5131 dlm_receive_message(ls, &p->message, nodeid); 5132 else 5133 dlm_receive_rcom(ls, &p->rcom, nodeid); 5134 up_read(&ls->ls_recv_active); 5135 5136 dlm_put_lockspace(ls); 5137} 5138 5139static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 5140 struct dlm_message *ms_stub) 5141{ 5142 if (middle_conversion(lkb)) { 5143 hold_lkb(lkb); 5144 memset(ms_stub, 0, sizeof(struct dlm_message)); 5145 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5146 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 5147 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); 5148 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5149 _receive_convert_reply(lkb, ms_stub); 5150 5151 /* Same special case as in receive_rcom_lock_args() */ 5152 lkb->lkb_grmode = DLM_LOCK_IV; 5153 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 5154 unhold_lkb(lkb); 5155 5156 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 5157 lkb->lkb_flags |= DLM_IFL_RESEND; 5158 } 5159 5160 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 5161 conversions are async; there's no reply from the remote master */ 5162} 5163 5164/* A waiting lkb needs recovery if the master node has failed, or 5165 the master node is changing (only when no directory is used) */ 5166 5167static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 5168 int dir_nodeid) 5169{ 5170 if (dlm_no_directory(ls)) 5171 return 1; 5172 5173 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 5174 return 1; 5175 5176 return 0; 5177} 5178 5179/* Recovery for locks that are waiting for replies from nodes that are now 5180 gone. We can just complete unlocks and cancels by faking a reply from the 5181 dead node. Requests and up-conversions we flag to be resent after 5182 recovery. Down-conversions can just be completed with a fake reply like 5183 unlocks. Conversions between PR and CW need special attention. */ 5184 5185void dlm_recover_waiters_pre(struct dlm_ls *ls) 5186{ 5187 struct dlm_lkb *lkb, *safe; 5188 struct dlm_message *ms_stub; 5189 int wait_type, stub_unlock_result, stub_cancel_result; 5190 int dir_nodeid; 5191 5192 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL); 5193 if (!ms_stub) 5194 return; 5195 5196 mutex_lock(&ls->ls_waiters_mutex); 5197 5198 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 5199 5200 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 5201 5202 /* exclude debug messages about unlocks because there can be so 5203 many and they aren't very interesting */ 5204 5205 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 5206 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5207 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 5208 lkb->lkb_id, 5209 lkb->lkb_remid, 5210 lkb->lkb_wait_type, 5211 lkb->lkb_resource->res_nodeid, 5212 lkb->lkb_nodeid, 5213 lkb->lkb_wait_nodeid, 5214 dir_nodeid); 5215 } 5216 5217 /* all outstanding lookups, regardless of destination will be 5218 resent after recovery is done */ 5219 5220 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 5221 lkb->lkb_flags |= DLM_IFL_RESEND; 5222 continue; 5223 } 5224 5225 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 5226 continue; 5227 5228 wait_type = lkb->lkb_wait_type; 5229 stub_unlock_result = -DLM_EUNLOCK; 5230 stub_cancel_result = -DLM_ECANCEL; 5231 5232 /* Main reply may have been received leaving a zero wait_type, 5233 but a reply for the overlapping op may not have been 5234 received. In that case we need to fake the appropriate 5235 reply for the overlap op. */ 5236 5237 if (!wait_type) { 5238 if (is_overlap_cancel(lkb)) { 5239 wait_type = DLM_MSG_CANCEL; 5240 if (lkb->lkb_grmode == DLM_LOCK_IV) 5241 stub_cancel_result = 0; 5242 } 5243 if (is_overlap_unlock(lkb)) { 5244 wait_type = DLM_MSG_UNLOCK; 5245 if (lkb->lkb_grmode == DLM_LOCK_IV) 5246 stub_unlock_result = -ENOENT; 5247 } 5248 5249 log_debug(ls, "rwpre overlap %x %x %d %d %d", 5250 lkb->lkb_id, lkb->lkb_flags, wait_type, 5251 stub_cancel_result, stub_unlock_result); 5252 } 5253 5254 switch (wait_type) { 5255 5256 case DLM_MSG_REQUEST: 5257 lkb->lkb_flags |= DLM_IFL_RESEND; 5258 break; 5259 5260 case DLM_MSG_CONVERT: 5261 recover_convert_waiter(ls, lkb, ms_stub); 5262 break; 5263 5264 case DLM_MSG_UNLOCK: 5265 hold_lkb(lkb); 5266 memset(ms_stub, 0, sizeof(struct dlm_message)); 5267 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5268 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); 5269 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result)); 5270 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5271 _receive_unlock_reply(lkb, ms_stub); 5272 dlm_put_lkb(lkb); 5273 break; 5274 5275 case DLM_MSG_CANCEL: 5276 hold_lkb(lkb); 5277 memset(ms_stub, 0, sizeof(struct dlm_message)); 5278 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); 5279 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); 5280 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result)); 5281 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5282 _receive_cancel_reply(lkb, ms_stub); 5283 dlm_put_lkb(lkb); 5284 break; 5285 5286 default: 5287 log_error(ls, "invalid lkb wait_type %d %d", 5288 lkb->lkb_wait_type, wait_type); 5289 } 5290 schedule(); 5291 } 5292 mutex_unlock(&ls->ls_waiters_mutex); 5293 kfree(ms_stub); 5294} 5295 5296static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 5297{ 5298 struct dlm_lkb *lkb = NULL, *iter; 5299 5300 mutex_lock(&ls->ls_waiters_mutex); 5301 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { 5302 if (iter->lkb_flags & DLM_IFL_RESEND) { 5303 hold_lkb(iter); 5304 lkb = iter; 5305 break; 5306 } 5307 } 5308 mutex_unlock(&ls->ls_waiters_mutex); 5309 5310 return lkb; 5311} 5312 5313/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 5314 master or dir-node for r. Processing the lkb may result in it being placed 5315 back on waiters. */ 5316 5317/* We do this after normal locking has been enabled and any saved messages 5318 (in requestqueue) have been processed. We should be confident that at 5319 this point we won't get or process a reply to any of these waiting 5320 operations. But, new ops may be coming in on the rsbs/locks here from 5321 userspace or remotely. */ 5322 5323/* there may have been an overlap unlock/cancel prior to recovery or after 5324 recovery. if before, the lkb may still have a pos wait_count; if after, the 5325 overlap flag would just have been set and nothing new sent. we can be 5326 confident here than any replies to either the initial op or overlap ops 5327 prior to recovery have been received. */ 5328 5329int dlm_recover_waiters_post(struct dlm_ls *ls) 5330{ 5331 struct dlm_lkb *lkb; 5332 struct dlm_rsb *r; 5333 int error = 0, mstype, err, oc, ou; 5334 5335 while (1) { 5336 if (dlm_locking_stopped(ls)) { 5337 log_debug(ls, "recover_waiters_post aborted"); 5338 error = -EINTR; 5339 break; 5340 } 5341 5342 lkb = find_resend_waiter(ls); 5343 if (!lkb) 5344 break; 5345 5346 r = lkb->lkb_resource; 5347 hold_rsb(r); 5348 lock_rsb(r); 5349 5350 mstype = lkb->lkb_wait_type; 5351 oc = is_overlap_cancel(lkb); 5352 ou = is_overlap_unlock(lkb); 5353 err = 0; 5354 5355 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5356 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5357 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5358 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5359 dlm_dir_nodeid(r), oc, ou); 5360 5361 /* At this point we assume that we won't get a reply to any 5362 previous op or overlap op on this lock. First, do a big 5363 remove_from_waiters() for all previous ops. */ 5364 5365 lkb->lkb_flags &= ~DLM_IFL_RESEND; 5366 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 5367 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 5368 lkb->lkb_wait_type = 0; 5369 /* drop all wait_count references we still 5370 * hold a reference for this iteration. 5371 */ 5372 while (lkb->lkb_wait_count) { 5373 lkb->lkb_wait_count--; 5374 unhold_lkb(lkb); 5375 } 5376 mutex_lock(&ls->ls_waiters_mutex); 5377 list_del_init(&lkb->lkb_wait_reply); 5378 mutex_unlock(&ls->ls_waiters_mutex); 5379 5380 if (oc || ou) { 5381 /* do an unlock or cancel instead of resending */ 5382 switch (mstype) { 5383 case DLM_MSG_LOOKUP: 5384 case DLM_MSG_REQUEST: 5385 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5386 -DLM_ECANCEL); 5387 unhold_lkb(lkb); /* undoes create_lkb() */ 5388 break; 5389 case DLM_MSG_CONVERT: 5390 if (oc) { 5391 queue_cast(r, lkb, -DLM_ECANCEL); 5392 } else { 5393 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5394 _unlock_lock(r, lkb); 5395 } 5396 break; 5397 default: 5398 err = 1; 5399 } 5400 } else { 5401 switch (mstype) { 5402 case DLM_MSG_LOOKUP: 5403 case DLM_MSG_REQUEST: 5404 _request_lock(r, lkb); 5405 if (is_master(r)) 5406 confirm_master(r, 0); 5407 break; 5408 case DLM_MSG_CONVERT: 5409 _convert_lock(r, lkb); 5410 break; 5411 default: 5412 err = 1; 5413 } 5414 } 5415 5416 if (err) { 5417 log_error(ls, "waiter %x msg %d r_nodeid %d " 5418 "dir_nodeid %d overlap %d %d", 5419 lkb->lkb_id, mstype, r->res_nodeid, 5420 dlm_dir_nodeid(r), oc, ou); 5421 } 5422 unlock_rsb(r); 5423 put_rsb(r); 5424 dlm_put_lkb(lkb); 5425 } 5426 5427 return error; 5428} 5429 5430static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5431 struct list_head *list) 5432{ 5433 struct dlm_lkb *lkb, *safe; 5434 5435 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5436 if (!is_master_copy(lkb)) 5437 continue; 5438 5439 /* don't purge lkbs we've added in recover_master_copy for 5440 the current recovery seq */ 5441 5442 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5443 continue; 5444 5445 del_lkb(r, lkb); 5446 5447 /* this put should free the lkb */ 5448 if (!dlm_put_lkb(lkb)) 5449 log_error(ls, "purged mstcpy lkb not released"); 5450 } 5451} 5452 5453void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5454{ 5455 struct dlm_ls *ls = r->res_ls; 5456 5457 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5458 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5459 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5460} 5461 5462static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5463 struct list_head *list, 5464 int nodeid_gone, unsigned int *count) 5465{ 5466 struct dlm_lkb *lkb, *safe; 5467 5468 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5469 if (!is_master_copy(lkb)) 5470 continue; 5471 5472 if ((lkb->lkb_nodeid == nodeid_gone) || 5473 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5474 5475 /* tell recover_lvb to invalidate the lvb 5476 because a node holding EX/PW failed */ 5477 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5478 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5479 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5480 } 5481 5482 del_lkb(r, lkb); 5483 5484 /* this put should free the lkb */ 5485 if (!dlm_put_lkb(lkb)) 5486 log_error(ls, "purged dead lkb not released"); 5487 5488 rsb_set_flag(r, RSB_RECOVER_GRANT); 5489 5490 (*count)++; 5491 } 5492 } 5493} 5494 5495/* Get rid of locks held by nodes that are gone. */ 5496 5497void dlm_recover_purge(struct dlm_ls *ls) 5498{ 5499 struct dlm_rsb *r; 5500 struct dlm_member *memb; 5501 int nodes_count = 0; 5502 int nodeid_gone = 0; 5503 unsigned int lkb_count = 0; 5504 5505 /* cache one removed nodeid to optimize the common 5506 case of a single node removed */ 5507 5508 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5509 nodes_count++; 5510 nodeid_gone = memb->nodeid; 5511 } 5512 5513 if (!nodes_count) 5514 return; 5515 5516 down_write(&ls->ls_root_sem); 5517 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 5518 hold_rsb(r); 5519 lock_rsb(r); 5520 if (is_master(r)) { 5521 purge_dead_list(ls, r, &r->res_grantqueue, 5522 nodeid_gone, &lkb_count); 5523 purge_dead_list(ls, r, &r->res_convertqueue, 5524 nodeid_gone, &lkb_count); 5525 purge_dead_list(ls, r, &r->res_waitqueue, 5526 nodeid_gone, &lkb_count); 5527 } 5528 unlock_rsb(r); 5529 unhold_rsb(r); 5530 cond_resched(); 5531 } 5532 up_write(&ls->ls_root_sem); 5533 5534 if (lkb_count) 5535 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", 5536 lkb_count, nodes_count); 5537} 5538 5539static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) 5540{ 5541 struct rb_node *n; 5542 struct dlm_rsb *r; 5543 5544 spin_lock(&ls->ls_rsbtbl[bucket].lock); 5545 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 5546 r = rb_entry(n, struct dlm_rsb, res_hashnode); 5547 5548 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5549 continue; 5550 if (!is_master(r)) { 5551 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5552 continue; 5553 } 5554 hold_rsb(r); 5555 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5556 return r; 5557 } 5558 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5559 return NULL; 5560} 5561 5562/* 5563 * Attempt to grant locks on resources that we are the master of. 5564 * Locks may have become grantable during recovery because locks 5565 * from departed nodes have been purged (or not rebuilt), allowing 5566 * previously blocked locks to now be granted. The subset of rsb's 5567 * we are interested in are those with lkb's on either the convert or 5568 * waiting queues. 5569 * 5570 * Simplest would be to go through each master rsb and check for non-empty 5571 * convert or waiting queues, and attempt to grant on those rsbs. 5572 * Checking the queues requires lock_rsb, though, for which we'd need 5573 * to release the rsbtbl lock. This would make iterating through all 5574 * rsb's very inefficient. So, we rely on earlier recovery routines 5575 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5576 * locks for. 5577 */ 5578 5579void dlm_recover_grant(struct dlm_ls *ls) 5580{ 5581 struct dlm_rsb *r; 5582 int bucket = 0; 5583 unsigned int count = 0; 5584 unsigned int rsb_count = 0; 5585 unsigned int lkb_count = 0; 5586 5587 while (1) { 5588 r = find_grant_rsb(ls, bucket); 5589 if (!r) { 5590 if (bucket == ls->ls_rsbtbl_size - 1) 5591 break; 5592 bucket++; 5593 continue; 5594 } 5595 rsb_count++; 5596 count = 0; 5597 lock_rsb(r); 5598 /* the RECOVER_GRANT flag is checked in the grant path */ 5599 grant_pending_locks(r, &count); 5600 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5601 lkb_count += count; 5602 confirm_master(r, 0); 5603 unlock_rsb(r); 5604 put_rsb(r); 5605 cond_resched(); 5606 } 5607 5608 if (lkb_count) 5609 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", 5610 lkb_count, rsb_count); 5611} 5612 5613static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5614 uint32_t remid) 5615{ 5616 struct dlm_lkb *lkb; 5617 5618 list_for_each_entry(lkb, head, lkb_statequeue) { 5619 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5620 return lkb; 5621 } 5622 return NULL; 5623} 5624 5625static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5626 uint32_t remid) 5627{ 5628 struct dlm_lkb *lkb; 5629 5630 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5631 if (lkb) 5632 return lkb; 5633 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5634 if (lkb) 5635 return lkb; 5636 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5637 if (lkb) 5638 return lkb; 5639 return NULL; 5640} 5641 5642/* needs at least dlm_rcom + rcom_lock */ 5643static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5644 struct dlm_rsb *r, struct dlm_rcom *rc) 5645{ 5646 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5647 5648 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5649 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5650 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5651 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5652 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; 5653 lkb->lkb_flags |= DLM_IFL_MSTCPY; 5654 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5655 lkb->lkb_rqmode = rl->rl_rqmode; 5656 lkb->lkb_grmode = rl->rl_grmode; 5657 /* don't set lkb_status because add_lkb wants to itself */ 5658 5659 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5660 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5661 5662 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5663 int lvblen = le16_to_cpu(rc->rc_header.h_length) - 5664 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); 5665 if (lvblen > ls->ls_lvblen) 5666 return -EINVAL; 5667 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5668 if (!lkb->lkb_lvbptr) 5669 return -ENOMEM; 5670 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5671 } 5672 5673 /* Conversions between PR and CW (middle modes) need special handling. 5674 The real granted mode of these converting locks cannot be determined 5675 until all locks have been rebuilt on the rsb (recover_conversion) */ 5676 5677 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5678 middle_conversion(lkb)) { 5679 rl->rl_status = DLM_LKSTS_CONVERT; 5680 lkb->lkb_grmode = DLM_LOCK_IV; 5681 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5682 } 5683 5684 return 0; 5685} 5686 5687/* This lkb may have been recovered in a previous aborted recovery so we need 5688 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5689 If so we just send back a standard reply. If not, we create a new lkb with 5690 the given values and send back our lkid. We send back our lkid by sending 5691 back the rcom_lock struct we got but with the remid field filled in. */ 5692 5693/* needs at least dlm_rcom + rcom_lock */ 5694int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5695{ 5696 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5697 struct dlm_rsb *r; 5698 struct dlm_lkb *lkb; 5699 uint32_t remid = 0; 5700 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5701 int error; 5702 5703 if (rl->rl_parent_lkid) { 5704 error = -EOPNOTSUPP; 5705 goto out; 5706 } 5707 5708 remid = le32_to_cpu(rl->rl_lkid); 5709 5710 /* In general we expect the rsb returned to be R_MASTER, but we don't 5711 have to require it. Recovery of masters on one node can overlap 5712 recovery of locks on another node, so one node can send us MSTCPY 5713 locks before we've made ourselves master of this rsb. We can still 5714 add new MSTCPY locks that we receive here without any harm; when 5715 we make ourselves master, dlm_recover_masters() won't touch the 5716 MSTCPY locks we've received early. */ 5717 5718 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5719 from_nodeid, R_RECEIVE_RECOVER, &r); 5720 if (error) 5721 goto out; 5722 5723 lock_rsb(r); 5724 5725 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5726 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5727 from_nodeid, remid); 5728 error = -EBADR; 5729 goto out_unlock; 5730 } 5731 5732 lkb = search_remid(r, from_nodeid, remid); 5733 if (lkb) { 5734 error = -EEXIST; 5735 goto out_remid; 5736 } 5737 5738 error = create_lkb(ls, &lkb); 5739 if (error) 5740 goto out_unlock; 5741 5742 error = receive_rcom_lock_args(ls, lkb, r, rc); 5743 if (error) { 5744 __put_lkb(ls, lkb); 5745 goto out_unlock; 5746 } 5747 5748 attach_lkb(r, lkb); 5749 add_lkb(r, lkb, rl->rl_status); 5750 ls->ls_recover_locks_in++; 5751 5752 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5753 rsb_set_flag(r, RSB_RECOVER_GRANT); 5754 5755 out_remid: 5756 /* this is the new value returned to the lock holder for 5757 saving in its process-copy lkb */ 5758 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 5759 5760 lkb->lkb_recover_seq = ls->ls_recover_seq; 5761 5762 out_unlock: 5763 unlock_rsb(r); 5764 put_rsb(r); 5765 out: 5766 if (error && error != -EEXIST) 5767 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", 5768 from_nodeid, remid, error); 5769 rl->rl_result = cpu_to_le32(error); 5770 return error; 5771} 5772 5773/* needs at least dlm_rcom + rcom_lock */ 5774int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5775{ 5776 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5777 struct dlm_rsb *r; 5778 struct dlm_lkb *lkb; 5779 uint32_t lkid, remid; 5780 int error, result; 5781 5782 lkid = le32_to_cpu(rl->rl_lkid); 5783 remid = le32_to_cpu(rl->rl_remid); 5784 result = le32_to_cpu(rl->rl_result); 5785 5786 error = find_lkb(ls, lkid, &lkb); 5787 if (error) { 5788 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5789 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5790 result); 5791 return error; 5792 } 5793 5794 r = lkb->lkb_resource; 5795 hold_rsb(r); 5796 lock_rsb(r); 5797 5798 if (!is_process_copy(lkb)) { 5799 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5800 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5801 result); 5802 dlm_dump_rsb(r); 5803 unlock_rsb(r); 5804 put_rsb(r); 5805 dlm_put_lkb(lkb); 5806 return -EINVAL; 5807 } 5808 5809 switch (result) { 5810 case -EBADR: 5811 /* There's a chance the new master received our lock before 5812 dlm_recover_master_reply(), this wouldn't happen if we did 5813 a barrier between recover_masters and recover_locks. */ 5814 5815 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5816 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5817 result); 5818 5819 dlm_send_rcom_lock(r, lkb); 5820 goto out; 5821 case -EEXIST: 5822 case 0: 5823 lkb->lkb_remid = remid; 5824 break; 5825 default: 5826 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5827 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5828 result); 5829 } 5830 5831 /* an ack for dlm_recover_locks() which waits for replies from 5832 all the locks it sends to new masters */ 5833 dlm_recovered_lock(r); 5834 out: 5835 unlock_rsb(r); 5836 put_rsb(r); 5837 dlm_put_lkb(lkb); 5838 5839 return 0; 5840} 5841 5842int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5843 int mode, uint32_t flags, void *name, unsigned int namelen, 5844 unsigned long timeout_cs) 5845{ 5846 struct dlm_lkb *lkb; 5847 struct dlm_args args; 5848 int error; 5849 5850 dlm_lock_recovery(ls); 5851 5852 error = create_lkb(ls, &lkb); 5853 if (error) { 5854 kfree(ua); 5855 goto out; 5856 } 5857 5858 if (flags & DLM_LKF_VALBLK) { 5859 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5860 if (!ua->lksb.sb_lvbptr) { 5861 kfree(ua); 5862 __put_lkb(ls, lkb); 5863 error = -ENOMEM; 5864 goto out; 5865 } 5866 } 5867 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, 5868 fake_astfn, ua, fake_bastfn, &args); 5869 if (error) { 5870 kfree(ua->lksb.sb_lvbptr); 5871 ua->lksb.sb_lvbptr = NULL; 5872 kfree(ua); 5873 __put_lkb(ls, lkb); 5874 goto out; 5875 } 5876 5877 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5878 When DLM_IFL_USER is set, the dlm knows that this is a userspace 5879 lock and that lkb_astparam is the dlm_user_args structure. */ 5880 lkb->lkb_flags |= DLM_IFL_USER; 5881 error = request_lock(ls, lkb, name, namelen, &args); 5882 5883 switch (error) { 5884 case 0: 5885 break; 5886 case -EINPROGRESS: 5887 error = 0; 5888 break; 5889 case -EAGAIN: 5890 error = 0; 5891 fallthrough; 5892 default: 5893 __put_lkb(ls, lkb); 5894 goto out; 5895 } 5896 5897 /* add this new lkb to the per-process list of locks */ 5898 spin_lock(&ua->proc->locks_spin); 5899 hold_lkb(lkb); 5900 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5901 spin_unlock(&ua->proc->locks_spin); 5902 out: 5903 dlm_unlock_recovery(ls); 5904 return error; 5905} 5906 5907int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5908 int mode, uint32_t flags, uint32_t lkid, char *lvb_in, 5909 unsigned long timeout_cs) 5910{ 5911 struct dlm_lkb *lkb; 5912 struct dlm_args args; 5913 struct dlm_user_args *ua; 5914 int error; 5915 5916 dlm_lock_recovery(ls); 5917 5918 error = find_lkb(ls, lkid, &lkb); 5919 if (error) 5920 goto out; 5921 5922 /* user can change the params on its lock when it converts it, or 5923 add an lvb that didn't exist before */ 5924 5925 ua = lkb->lkb_ua; 5926 5927 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5928 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5929 if (!ua->lksb.sb_lvbptr) { 5930 error = -ENOMEM; 5931 goto out_put; 5932 } 5933 } 5934 if (lvb_in && ua->lksb.sb_lvbptr) 5935 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5936 5937 ua->xid = ua_tmp->xid; 5938 ua->castparam = ua_tmp->castparam; 5939 ua->castaddr = ua_tmp->castaddr; 5940 ua->bastparam = ua_tmp->bastparam; 5941 ua->bastaddr = ua_tmp->bastaddr; 5942 ua->user_lksb = ua_tmp->user_lksb; 5943 5944 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, 5945 fake_astfn, ua, fake_bastfn, &args); 5946 if (error) 5947 goto out_put; 5948 5949 error = convert_lock(ls, lkb, &args); 5950 5951 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5952 error = 0; 5953 out_put: 5954 dlm_put_lkb(lkb); 5955 out: 5956 dlm_unlock_recovery(ls); 5957 kfree(ua_tmp); 5958 return error; 5959} 5960 5961/* 5962 * The caller asks for an orphan lock on a given resource with a given mode. 5963 * If a matching lock exists, it's moved to the owner's list of locks and 5964 * the lkid is returned. 5965 */ 5966 5967int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5968 int mode, uint32_t flags, void *name, unsigned int namelen, 5969 unsigned long timeout_cs, uint32_t *lkid) 5970{ 5971 struct dlm_lkb *lkb = NULL, *iter; 5972 struct dlm_user_args *ua; 5973 int found_other_mode = 0; 5974 int rv = 0; 5975 5976 mutex_lock(&ls->ls_orphans_mutex); 5977 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { 5978 if (iter->lkb_resource->res_length != namelen) 5979 continue; 5980 if (memcmp(iter->lkb_resource->res_name, name, namelen)) 5981 continue; 5982 if (iter->lkb_grmode != mode) { 5983 found_other_mode = 1; 5984 continue; 5985 } 5986 5987 lkb = iter; 5988 list_del_init(&iter->lkb_ownqueue); 5989 iter->lkb_flags &= ~DLM_IFL_ORPHAN; 5990 *lkid = iter->lkb_id; 5991 break; 5992 } 5993 mutex_unlock(&ls->ls_orphans_mutex); 5994 5995 if (!lkb && found_other_mode) { 5996 rv = -EAGAIN; 5997 goto out; 5998 } 5999 6000 if (!lkb) { 6001 rv = -ENOENT; 6002 goto out; 6003 } 6004 6005 lkb->lkb_exflags = flags; 6006 lkb->lkb_ownpid = (int) current->pid; 6007 6008 ua = lkb->lkb_ua; 6009 6010 ua->proc = ua_tmp->proc; 6011 ua->xid = ua_tmp->xid; 6012 ua->castparam = ua_tmp->castparam; 6013 ua->castaddr = ua_tmp->castaddr; 6014 ua->bastparam = ua_tmp->bastparam; 6015 ua->bastaddr = ua_tmp->bastaddr; 6016 ua->user_lksb = ua_tmp->user_lksb; 6017 6018 /* 6019 * The lkb reference from the ls_orphans list was not 6020 * removed above, and is now considered the reference 6021 * for the proc locks list. 6022 */ 6023 6024 spin_lock(&ua->proc->locks_spin); 6025 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 6026 spin_unlock(&ua->proc->locks_spin); 6027 out: 6028 kfree(ua_tmp); 6029 return rv; 6030} 6031 6032int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 6033 uint32_t flags, uint32_t lkid, char *lvb_in) 6034{ 6035 struct dlm_lkb *lkb; 6036 struct dlm_args args; 6037 struct dlm_user_args *ua; 6038 int error; 6039 6040 dlm_lock_recovery(ls); 6041 6042 error = find_lkb(ls, lkid, &lkb); 6043 if (error) 6044 goto out; 6045 6046 ua = lkb->lkb_ua; 6047 6048 if (lvb_in && ua->lksb.sb_lvbptr) 6049 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 6050 if (ua_tmp->castparam) 6051 ua->castparam = ua_tmp->castparam; 6052 ua->user_lksb = ua_tmp->user_lksb; 6053 6054 error = set_unlock_args(flags, ua, &args); 6055 if (error) 6056 goto out_put; 6057 6058 error = unlock_lock(ls, lkb, &args); 6059 6060 if (error == -DLM_EUNLOCK) 6061 error = 0; 6062 /* from validate_unlock_args() */ 6063 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 6064 error = 0; 6065 if (error) 6066 goto out_put; 6067 6068 spin_lock(&ua->proc->locks_spin); 6069 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 6070 if (!list_empty(&lkb->lkb_ownqueue)) 6071 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 6072 spin_unlock(&ua->proc->locks_spin); 6073 out_put: 6074 dlm_put_lkb(lkb); 6075 out: 6076 dlm_unlock_recovery(ls); 6077 kfree(ua_tmp); 6078 return error; 6079} 6080 6081int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 6082 uint32_t flags, uint32_t lkid) 6083{ 6084 struct dlm_lkb *lkb; 6085 struct dlm_args args; 6086 struct dlm_user_args *ua; 6087 int error; 6088 6089 dlm_lock_recovery(ls); 6090 6091 error = find_lkb(ls, lkid, &lkb); 6092 if (error) 6093 goto out; 6094 6095 ua = lkb->lkb_ua; 6096 if (ua_tmp->castparam) 6097 ua->castparam = ua_tmp->castparam; 6098 ua->user_lksb = ua_tmp->user_lksb; 6099 6100 error = set_unlock_args(flags, ua, &args); 6101 if (error) 6102 goto out_put; 6103 6104 error = cancel_lock(ls, lkb, &args); 6105 6106 if (error == -DLM_ECANCEL) 6107 error = 0; 6108 /* from validate_unlock_args() */ 6109 if (error == -EBUSY) 6110 error = 0; 6111 out_put: 6112 dlm_put_lkb(lkb); 6113 out: 6114 dlm_unlock_recovery(ls); 6115 kfree(ua_tmp); 6116 return error; 6117} 6118 6119int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 6120{ 6121 struct dlm_lkb *lkb; 6122 struct dlm_args args; 6123 struct dlm_user_args *ua; 6124 struct dlm_rsb *r; 6125 int error; 6126 6127 dlm_lock_recovery(ls); 6128 6129 error = find_lkb(ls, lkid, &lkb); 6130 if (error) 6131 goto out; 6132 6133 ua = lkb->lkb_ua; 6134 6135 error = set_unlock_args(flags, ua, &args); 6136 if (error) 6137 goto out_put; 6138 6139 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 6140 6141 r = lkb->lkb_resource; 6142 hold_rsb(r); 6143 lock_rsb(r); 6144 6145 error = validate_unlock_args(lkb, &args); 6146 if (error) 6147 goto out_r; 6148 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; 6149 6150 error = _cancel_lock(r, lkb); 6151 out_r: 6152 unlock_rsb(r); 6153 put_rsb(r); 6154 6155 if (error == -DLM_ECANCEL) 6156 error = 0; 6157 /* from validate_unlock_args() */ 6158 if (error == -EBUSY) 6159 error = 0; 6160 out_put: 6161 dlm_put_lkb(lkb); 6162 out: 6163 dlm_unlock_recovery(ls); 6164 return error; 6165} 6166 6167/* lkb's that are removed from the waiters list by revert are just left on the 6168 orphans list with the granted orphan locks, to be freed by purge */ 6169 6170static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6171{ 6172 struct dlm_args args; 6173 int error; 6174 6175 hold_lkb(lkb); /* reference for the ls_orphans list */ 6176 mutex_lock(&ls->ls_orphans_mutex); 6177 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 6178 mutex_unlock(&ls->ls_orphans_mutex); 6179 6180 set_unlock_args(0, lkb->lkb_ua, &args); 6181 6182 error = cancel_lock(ls, lkb, &args); 6183 if (error == -DLM_ECANCEL) 6184 error = 0; 6185 return error; 6186} 6187 6188/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 6189 granted. Regardless of what rsb queue the lock is on, it's removed and 6190 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 6191 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 6192 6193static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6194{ 6195 struct dlm_args args; 6196 int error; 6197 6198 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 6199 lkb->lkb_ua, &args); 6200 6201 error = unlock_lock(ls, lkb, &args); 6202 if (error == -DLM_EUNLOCK) 6203 error = 0; 6204 return error; 6205} 6206 6207/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 6208 (which does lock_rsb) due to deadlock with receiving a message that does 6209 lock_rsb followed by dlm_user_add_cb() */ 6210 6211static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 6212 struct dlm_user_proc *proc) 6213{ 6214 struct dlm_lkb *lkb = NULL; 6215 6216 mutex_lock(&ls->ls_clear_proc_locks); 6217 if (list_empty(&proc->locks)) 6218 goto out; 6219 6220 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 6221 list_del_init(&lkb->lkb_ownqueue); 6222 6223 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6224 lkb->lkb_flags |= DLM_IFL_ORPHAN; 6225 else 6226 lkb->lkb_flags |= DLM_IFL_DEAD; 6227 out: 6228 mutex_unlock(&ls->ls_clear_proc_locks); 6229 return lkb; 6230} 6231 6232/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 6233 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 6234 which we clear here. */ 6235 6236/* proc CLOSING flag is set so no more device_reads should look at proc->asts 6237 list, and no more device_writes should add lkb's to proc->locks list; so we 6238 shouldn't need to take asts_spin or locks_spin here. this assumes that 6239 device reads/writes/closes are serialized -- FIXME: we may need to serialize 6240 them ourself. */ 6241 6242void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6243{ 6244 struct dlm_lkb *lkb, *safe; 6245 6246 dlm_lock_recovery(ls); 6247 6248 while (1) { 6249 lkb = del_proc_lock(ls, proc); 6250 if (!lkb) 6251 break; 6252 del_timeout(lkb); 6253 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6254 orphan_proc_lock(ls, lkb); 6255 else 6256 unlock_proc_lock(ls, lkb); 6257 6258 /* this removes the reference for the proc->locks list 6259 added by dlm_user_request, it may result in the lkb 6260 being freed */ 6261 6262 dlm_put_lkb(lkb); 6263 } 6264 6265 mutex_lock(&ls->ls_clear_proc_locks); 6266 6267 /* in-progress unlocks */ 6268 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6269 list_del_init(&lkb->lkb_ownqueue); 6270 lkb->lkb_flags |= DLM_IFL_DEAD; 6271 dlm_put_lkb(lkb); 6272 } 6273 6274 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6275 memset(&lkb->lkb_callbacks, 0, 6276 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6277 list_del_init(&lkb->lkb_cb_list); 6278 dlm_put_lkb(lkb); 6279 } 6280 6281 mutex_unlock(&ls->ls_clear_proc_locks); 6282 dlm_unlock_recovery(ls); 6283} 6284 6285static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6286{ 6287 struct dlm_lkb *lkb, *safe; 6288 6289 while (1) { 6290 lkb = NULL; 6291 spin_lock(&proc->locks_spin); 6292 if (!list_empty(&proc->locks)) { 6293 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6294 lkb_ownqueue); 6295 list_del_init(&lkb->lkb_ownqueue); 6296 } 6297 spin_unlock(&proc->locks_spin); 6298 6299 if (!lkb) 6300 break; 6301 6302 lkb->lkb_flags |= DLM_IFL_DEAD; 6303 unlock_proc_lock(ls, lkb); 6304 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6305 } 6306 6307 spin_lock(&proc->locks_spin); 6308 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6309 list_del_init(&lkb->lkb_ownqueue); 6310 lkb->lkb_flags |= DLM_IFL_DEAD; 6311 dlm_put_lkb(lkb); 6312 } 6313 spin_unlock(&proc->locks_spin); 6314 6315 spin_lock(&proc->asts_spin); 6316 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6317 memset(&lkb->lkb_callbacks, 0, 6318 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6319 list_del_init(&lkb->lkb_cb_list); 6320 dlm_put_lkb(lkb); 6321 } 6322 spin_unlock(&proc->asts_spin); 6323} 6324 6325/* pid of 0 means purge all orphans */ 6326 6327static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6328{ 6329 struct dlm_lkb *lkb, *safe; 6330 6331 mutex_lock(&ls->ls_orphans_mutex); 6332 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6333 if (pid && lkb->lkb_ownpid != pid) 6334 continue; 6335 unlock_proc_lock(ls, lkb); 6336 list_del_init(&lkb->lkb_ownqueue); 6337 dlm_put_lkb(lkb); 6338 } 6339 mutex_unlock(&ls->ls_orphans_mutex); 6340} 6341 6342static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6343{ 6344 struct dlm_message *ms; 6345 struct dlm_mhandle *mh; 6346 int error; 6347 6348 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6349 DLM_MSG_PURGE, &ms, &mh); 6350 if (error) 6351 return error; 6352 ms->m_nodeid = cpu_to_le32(nodeid); 6353 ms->m_pid = cpu_to_le32(pid); 6354 6355 return send_message(mh, ms); 6356} 6357 6358int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6359 int nodeid, int pid) 6360{ 6361 int error = 0; 6362 6363 if (nodeid && (nodeid != dlm_our_nodeid())) { 6364 error = send_purge(ls, nodeid, pid); 6365 } else { 6366 dlm_lock_recovery(ls); 6367 if (pid == current->pid) 6368 purge_proc_locks(ls, proc); 6369 else 6370 do_purge(ls, nodeid, pid); 6371 dlm_unlock_recovery(ls); 6372 } 6373 return error; 6374} 6375 6376/* debug functionality */ 6377int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, 6378 int lkb_nodeid, unsigned int lkb_flags, int lkb_status) 6379{ 6380 struct dlm_lksb *lksb; 6381 struct dlm_lkb *lkb; 6382 struct dlm_rsb *r; 6383 int error; 6384 6385 /* we currently can't set a valid user lock */ 6386 if (lkb_flags & DLM_IFL_USER) 6387 return -EOPNOTSUPP; 6388 6389 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 6390 if (!lksb) 6391 return -ENOMEM; 6392 6393 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); 6394 if (error) { 6395 kfree(lksb); 6396 return error; 6397 } 6398 6399 lkb->lkb_flags = lkb_flags; 6400 lkb->lkb_nodeid = lkb_nodeid; 6401 lkb->lkb_lksb = lksb; 6402 /* user specific pointer, just don't have it NULL for kernel locks */ 6403 if (~lkb_flags & DLM_IFL_USER) 6404 lkb->lkb_astparam = (void *)0xDEADBEEF; 6405 6406 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 6407 if (error) { 6408 kfree(lksb); 6409 __put_lkb(ls, lkb); 6410 return error; 6411 } 6412 6413 lock_rsb(r); 6414 attach_lkb(r, lkb); 6415 add_lkb(r, lkb, lkb_status); 6416 unlock_rsb(r); 6417 put_rsb(r); 6418 6419 return 0; 6420} 6421 6422int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, 6423 int mstype, int to_nodeid) 6424{ 6425 struct dlm_lkb *lkb; 6426 int error; 6427 6428 error = find_lkb(ls, lkb_id, &lkb); 6429 if (error) 6430 return error; 6431 6432 error = add_to_waiters(lkb, mstype, to_nodeid); 6433 dlm_put_lkb(lkb); 6434 return error; 6435} 6436