dlmlock.c (19089B)
1// SPDX-License-Identifier: GPL-2.0-or-later 2/* 3 * dlmlock.c 4 * 5 * underlying calls for lock creation 6 * 7 * Copyright (C) 2004 Oracle. All rights reserved. 8 */ 9 10 11#include <linux/module.h> 12#include <linux/fs.h> 13#include <linux/types.h> 14#include <linux/slab.h> 15#include <linux/highmem.h> 16#include <linux/init.h> 17#include <linux/sysctl.h> 18#include <linux/random.h> 19#include <linux/blkdev.h> 20#include <linux/socket.h> 21#include <linux/inet.h> 22#include <linux/spinlock.h> 23#include <linux/delay.h> 24 25 26#include "../cluster/heartbeat.h" 27#include "../cluster/nodemanager.h" 28#include "../cluster/tcp.h" 29 30#include "dlmapi.h" 31#include "dlmcommon.h" 32 33#include "dlmconvert.h" 34 35#define MLOG_MASK_PREFIX ML_DLM 36#include "../cluster/masklog.h" 37 38static struct kmem_cache *dlm_lock_cache; 39 40static DEFINE_SPINLOCK(dlm_cookie_lock); 41static u64 dlm_next_cookie = 1; 42 43static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 44 struct dlm_lock_resource *res, 45 struct dlm_lock *lock, int flags); 46static void dlm_init_lock(struct dlm_lock *newlock, int type, 47 u8 node, u64 cookie); 48static void dlm_lock_release(struct kref *kref); 49static void dlm_lock_detach_lockres(struct dlm_lock *lock); 50 51int dlm_init_lock_cache(void) 52{ 53 dlm_lock_cache = kmem_cache_create("o2dlm_lock", 54 sizeof(struct dlm_lock), 55 0, SLAB_HWCACHE_ALIGN, NULL); 56 if (dlm_lock_cache == NULL) 57 return -ENOMEM; 58 return 0; 59} 60 61void dlm_destroy_lock_cache(void) 62{ 63 kmem_cache_destroy(dlm_lock_cache); 64} 65 66/* Tell us whether we can grant a new lock request. 67 * locking: 68 * caller needs: res->spinlock 69 * taken: none 70 * held on exit: none 71 * returns: 1 if the lock can be granted, 0 otherwise. 72 */ 73static int dlm_can_grant_new_lock(struct dlm_lock_resource *res, 74 struct dlm_lock *lock) 75{ 76 struct dlm_lock *tmplock; 77 78 list_for_each_entry(tmplock, &res->granted, list) { 79 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 80 return 0; 81 } 82 83 list_for_each_entry(tmplock, &res->converting, list) { 84 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 85 return 0; 86 if (!dlm_lock_compatible(tmplock->ml.convert_type, 87 lock->ml.type)) 88 return 0; 89 } 90 91 return 1; 92} 93 94/* performs lock creation at the lockres master site 95 * locking: 96 * caller needs: none 97 * taken: takes and drops res->spinlock 98 * held on exit: none 99 * returns: DLM_NORMAL, DLM_NOTQUEUED 100 */ 101static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, 102 struct dlm_lock_resource *res, 103 struct dlm_lock *lock, int flags) 104{ 105 int call_ast = 0, kick_thread = 0; 106 enum dlm_status status = DLM_NORMAL; 107 108 mlog(0, "type=%d\n", lock->ml.type); 109 110 spin_lock(&res->spinlock); 111 /* if called from dlm_create_lock_handler, need to 112 * ensure it will not sleep in dlm_wait_on_lockres */ 113 status = __dlm_lockres_state_to_status(res); 114 if (status != DLM_NORMAL && 115 lock->ml.node != dlm->node_num) { 116 /* erf. state changed after lock was dropped. */ 117 spin_unlock(&res->spinlock); 118 dlm_error(status); 119 return status; 120 } 121 __dlm_wait_on_lockres(res); 122 __dlm_lockres_reserve_ast(res); 123 124 if (dlm_can_grant_new_lock(res, lock)) { 125 mlog(0, "I can grant this lock right away\n"); 126 /* got it right away */ 127 lock->lksb->status = DLM_NORMAL; 128 status = DLM_NORMAL; 129 dlm_lock_get(lock); 130 list_add_tail(&lock->list, &res->granted); 131 132 /* for the recovery lock, we can't allow the ast 133 * to be queued since the dlmthread is already 134 * frozen. but the recovery lock is always locked 135 * with LKM_NOQUEUE so we do not need the ast in 136 * this special case */ 137 if (!dlm_is_recovery_lock(res->lockname.name, 138 res->lockname.len)) { 139 kick_thread = 1; 140 call_ast = 1; 141 } else { 142 mlog(0, "%s: returning DLM_NORMAL to " 143 "node %u for reco lock\n", dlm->name, 144 lock->ml.node); 145 } 146 } else { 147 /* for NOQUEUE request, unless we get the 148 * lock right away, return DLM_NOTQUEUED */ 149 if (flags & LKM_NOQUEUE) { 150 status = DLM_NOTQUEUED; 151 if (dlm_is_recovery_lock(res->lockname.name, 152 res->lockname.len)) { 153 mlog(0, "%s: returning NOTQUEUED to " 154 "node %u for reco lock\n", dlm->name, 155 lock->ml.node); 156 } 157 } else { 158 status = DLM_NORMAL; 159 dlm_lock_get(lock); 160 list_add_tail(&lock->list, &res->blocked); 161 kick_thread = 1; 162 } 163 } 164 165 spin_unlock(&res->spinlock); 166 wake_up(&res->wq); 167 168 /* either queue the ast or release it */ 169 if (call_ast) 170 dlm_queue_ast(dlm, lock); 171 else 172 dlm_lockres_release_ast(dlm, res); 173 174 dlm_lockres_calc_usage(dlm, res); 175 if (kick_thread) 176 dlm_kick_thread(dlm, res); 177 178 return status; 179} 180 181void dlm_revert_pending_lock(struct dlm_lock_resource *res, 182 struct dlm_lock *lock) 183{ 184 /* remove from local queue if it failed */ 185 list_del_init(&lock->list); 186 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 187} 188 189 190/* 191 * locking: 192 * caller needs: none 193 * taken: takes and drops res->spinlock 194 * held on exit: none 195 * returns: DLM_DENIED, DLM_RECOVERING, or net status 196 */ 197static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, 198 struct dlm_lock_resource *res, 199 struct dlm_lock *lock, int flags) 200{ 201 enum dlm_status status = DLM_DENIED; 202 int lockres_changed = 1; 203 204 mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n", 205 lock->ml.type, res->lockname.len, 206 res->lockname.name, flags); 207 208 /* 209 * Wait if resource is getting recovered, remastered, etc. 210 * If the resource was remastered and new owner is self, then exit. 211 */ 212 spin_lock(&res->spinlock); 213 __dlm_wait_on_lockres(res); 214 if (res->owner == dlm->node_num) { 215 spin_unlock(&res->spinlock); 216 return DLM_RECOVERING; 217 } 218 res->state |= DLM_LOCK_RES_IN_PROGRESS; 219 220 /* add lock to local (secondary) queue */ 221 dlm_lock_get(lock); 222 list_add_tail(&lock->list, &res->blocked); 223 lock->lock_pending = 1; 224 spin_unlock(&res->spinlock); 225 226 /* spec seems to say that you will get DLM_NORMAL when the lock 227 * has been queued, meaning we need to wait for a reply here. */ 228 status = dlm_send_remote_lock_request(dlm, res, lock, flags); 229 230 spin_lock(&res->spinlock); 231 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 232 lock->lock_pending = 0; 233 if (status != DLM_NORMAL) { 234 if (status == DLM_RECOVERING && 235 dlm_is_recovery_lock(res->lockname.name, 236 res->lockname.len)) { 237 /* recovery lock was mastered by dead node. 238 * we need to have calc_usage shoot down this 239 * lockres and completely remaster it. */ 240 mlog(0, "%s: recovery lock was owned by " 241 "dead node %u, remaster it now.\n", 242 dlm->name, res->owner); 243 } else if (status != DLM_NOTQUEUED) { 244 /* 245 * DO NOT call calc_usage, as this would unhash 246 * the remote lockres before we ever get to use 247 * it. treat as if we never made any change to 248 * the lockres. 249 */ 250 lockres_changed = 0; 251 dlm_error(status); 252 } 253 dlm_revert_pending_lock(res, lock); 254 dlm_lock_put(lock); 255 } else if (dlm_is_recovery_lock(res->lockname.name, 256 res->lockname.len)) { 257 /* special case for the $RECOVERY lock. 258 * there will never be an AST delivered to put 259 * this lock on the proper secondary queue 260 * (granted), so do it manually. */ 261 mlog(0, "%s: $RECOVERY lock for this node (%u) is " 262 "mastered by %u; got lock, manually granting (no ast)\n", 263 dlm->name, dlm->node_num, res->owner); 264 list_move_tail(&lock->list, &res->granted); 265 } 266 spin_unlock(&res->spinlock); 267 268 if (lockres_changed) 269 dlm_lockres_calc_usage(dlm, res); 270 271 wake_up(&res->wq); 272 return status; 273} 274 275 276/* for remote lock creation. 277 * locking: 278 * caller needs: none, but need res->state & DLM_LOCK_RES_IN_PROGRESS 279 * taken: none 280 * held on exit: none 281 * returns: DLM_NOLOCKMGR, or net status 282 */ 283static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 284 struct dlm_lock_resource *res, 285 struct dlm_lock *lock, int flags) 286{ 287 struct dlm_create_lock create; 288 int tmpret, status = 0; 289 enum dlm_status ret; 290 291 memset(&create, 0, sizeof(create)); 292 create.node_idx = dlm->node_num; 293 create.requested_type = lock->ml.type; 294 create.cookie = lock->ml.cookie; 295 create.namelen = res->lockname.len; 296 create.flags = cpu_to_be32(flags); 297 memcpy(create.name, res->lockname.name, create.namelen); 298 299 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 300 sizeof(create), res->owner, &status); 301 if (tmpret >= 0) { 302 ret = status; 303 if (ret == DLM_REJECTED) { 304 mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer " 305 "owned by node %u. That node is coming back up " 306 "currently.\n", dlm->name, create.namelen, 307 create.name, res->owner); 308 dlm_print_one_lock_resource(res); 309 BUG(); 310 } 311 } else { 312 mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to " 313 "node %u\n", dlm->name, create.namelen, create.name, 314 tmpret, res->owner); 315 if (dlm_is_host_down(tmpret)) 316 ret = DLM_RECOVERING; 317 else 318 ret = dlm_err_to_dlm_status(tmpret); 319 } 320 321 return ret; 322} 323 324void dlm_lock_get(struct dlm_lock *lock) 325{ 326 kref_get(&lock->lock_refs); 327} 328 329void dlm_lock_put(struct dlm_lock *lock) 330{ 331 kref_put(&lock->lock_refs, dlm_lock_release); 332} 333 334static void dlm_lock_release(struct kref *kref) 335{ 336 struct dlm_lock *lock; 337 338 lock = container_of(kref, struct dlm_lock, lock_refs); 339 340 BUG_ON(!list_empty(&lock->list)); 341 BUG_ON(!list_empty(&lock->ast_list)); 342 BUG_ON(!list_empty(&lock->bast_list)); 343 BUG_ON(lock->ast_pending); 344 BUG_ON(lock->bast_pending); 345 346 dlm_lock_detach_lockres(lock); 347 348 if (lock->lksb_kernel_allocated) { 349 mlog(0, "freeing kernel-allocated lksb\n"); 350 kfree(lock->lksb); 351 } 352 kmem_cache_free(dlm_lock_cache, lock); 353} 354 355/* associate a lock with it's lockres, getting a ref on the lockres */ 356void dlm_lock_attach_lockres(struct dlm_lock *lock, 357 struct dlm_lock_resource *res) 358{ 359 dlm_lockres_get(res); 360 lock->lockres = res; 361} 362 363/* drop ref on lockres, if there is still one associated with lock */ 364static void dlm_lock_detach_lockres(struct dlm_lock *lock) 365{ 366 struct dlm_lock_resource *res; 367 368 res = lock->lockres; 369 if (res) { 370 lock->lockres = NULL; 371 mlog(0, "removing lock's lockres reference\n"); 372 dlm_lockres_put(res); 373 } 374} 375 376static void dlm_init_lock(struct dlm_lock *newlock, int type, 377 u8 node, u64 cookie) 378{ 379 INIT_LIST_HEAD(&newlock->list); 380 INIT_LIST_HEAD(&newlock->ast_list); 381 INIT_LIST_HEAD(&newlock->bast_list); 382 spin_lock_init(&newlock->spinlock); 383 newlock->ml.type = type; 384 newlock->ml.convert_type = LKM_IVMODE; 385 newlock->ml.highest_blocked = LKM_IVMODE; 386 newlock->ml.node = node; 387 newlock->ml.pad1 = 0; 388 newlock->ml.list = 0; 389 newlock->ml.flags = 0; 390 newlock->ast = NULL; 391 newlock->bast = NULL; 392 newlock->astdata = NULL; 393 newlock->ml.cookie = cpu_to_be64(cookie); 394 newlock->ast_pending = 0; 395 newlock->bast_pending = 0; 396 newlock->convert_pending = 0; 397 newlock->lock_pending = 0; 398 newlock->unlock_pending = 0; 399 newlock->cancel_pending = 0; 400 newlock->lksb_kernel_allocated = 0; 401 402 kref_init(&newlock->lock_refs); 403} 404 405struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, 406 struct dlm_lockstatus *lksb) 407{ 408 struct dlm_lock *lock; 409 int kernel_allocated = 0; 410 411 lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS); 412 if (!lock) 413 return NULL; 414 415 if (!lksb) { 416 /* zero memory only if kernel-allocated */ 417 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 418 if (!lksb) { 419 kmem_cache_free(dlm_lock_cache, lock); 420 return NULL; 421 } 422 kernel_allocated = 1; 423 } 424 425 dlm_init_lock(lock, type, node, cookie); 426 if (kernel_allocated) 427 lock->lksb_kernel_allocated = 1; 428 lock->lksb = lksb; 429 lksb->lockid = lock; 430 return lock; 431} 432 433/* handler for lock creation net message 434 * locking: 435 * caller needs: none 436 * taken: takes and drops res->spinlock 437 * held on exit: none 438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 439 */ 440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data, 441 void **ret_data) 442{ 443 struct dlm_ctxt *dlm = data; 444 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 445 struct dlm_lock_resource *res = NULL; 446 struct dlm_lock *newlock = NULL; 447 struct dlm_lockstatus *lksb = NULL; 448 enum dlm_status status = DLM_NORMAL; 449 char *name; 450 unsigned int namelen; 451 452 BUG_ON(!dlm); 453 454 if (!dlm_grab(dlm)) 455 return DLM_REJECTED; 456 457 name = create->name; 458 namelen = create->namelen; 459 status = DLM_REJECTED; 460 if (!dlm_domain_fully_joined(dlm)) { 461 mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " 462 "sending a create_lock message for lock %.*s!\n", 463 dlm->name, create->node_idx, namelen, name); 464 dlm_error(status); 465 goto leave; 466 } 467 468 status = DLM_IVBUFLEN; 469 if (namelen > DLM_LOCKID_NAME_MAX) { 470 dlm_error(status); 471 goto leave; 472 } 473 474 status = DLM_SYSERR; 475 newlock = dlm_new_lock(create->requested_type, 476 create->node_idx, 477 be64_to_cpu(create->cookie), NULL); 478 if (!newlock) { 479 dlm_error(status); 480 goto leave; 481 } 482 483 lksb = newlock->lksb; 484 485 if (be32_to_cpu(create->flags) & LKM_GET_LVB) { 486 lksb->flags |= DLM_LKSB_GET_LVB; 487 mlog(0, "set DLM_LKSB_GET_LVB flag\n"); 488 } 489 490 status = DLM_IVLOCKID; 491 res = dlm_lookup_lockres(dlm, name, namelen); 492 if (!res) { 493 dlm_error(status); 494 goto leave; 495 } 496 497 spin_lock(&res->spinlock); 498 status = __dlm_lockres_state_to_status(res); 499 spin_unlock(&res->spinlock); 500 501 if (status != DLM_NORMAL) { 502 mlog(0, "lockres recovering/migrating/in-progress\n"); 503 goto leave; 504 } 505 506 dlm_lock_attach_lockres(newlock, res); 507 508 status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags)); 509leave: 510 if (status != DLM_NORMAL) 511 if (newlock) 512 dlm_lock_put(newlock); 513 514 if (res) 515 dlm_lockres_put(res); 516 517 dlm_put(dlm); 518 519 return status; 520} 521 522 523/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */ 524static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie) 525{ 526 u64 tmpnode = node_num; 527 528 /* shift single byte of node num into top 8 bits */ 529 tmpnode <<= 56; 530 531 spin_lock(&dlm_cookie_lock); 532 *cookie = (dlm_next_cookie | tmpnode); 533 if (++dlm_next_cookie & 0xff00000000000000ull) { 534 mlog(0, "This node's cookie will now wrap!\n"); 535 dlm_next_cookie = 1; 536 } 537 spin_unlock(&dlm_cookie_lock); 538} 539 540enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, 541 struct dlm_lockstatus *lksb, int flags, 542 const char *name, int namelen, dlm_astlockfunc_t *ast, 543 void *data, dlm_bastlockfunc_t *bast) 544{ 545 enum dlm_status status; 546 struct dlm_lock_resource *res = NULL; 547 struct dlm_lock *lock = NULL; 548 int convert = 0, recovery = 0; 549 550 /* yes this function is a mess. 551 * TODO: clean this up. lots of common code in the 552 * lock and convert paths, especially in the retry blocks */ 553 if (!lksb) { 554 dlm_error(DLM_BADARGS); 555 return DLM_BADARGS; 556 } 557 558 status = DLM_BADPARAM; 559 if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) { 560 dlm_error(status); 561 goto error; 562 } 563 564 if (flags & ~LKM_VALID_FLAGS) { 565 dlm_error(status); 566 goto error; 567 } 568 569 convert = (flags & LKM_CONVERT); 570 recovery = (flags & LKM_RECOVERY); 571 572 if (recovery && 573 (!dlm_is_recovery_lock(name, namelen) || convert) ) { 574 dlm_error(status); 575 goto error; 576 } 577 if (convert && (flags & LKM_LOCAL)) { 578 mlog(ML_ERROR, "strange LOCAL convert request!\n"); 579 goto error; 580 } 581 582 if (convert) { 583 /* CONVERT request */ 584 585 /* if converting, must pass in a valid dlm_lock */ 586 lock = lksb->lockid; 587 if (!lock) { 588 mlog(ML_ERROR, "NULL lock pointer in convert " 589 "request\n"); 590 goto error; 591 } 592 593 res = lock->lockres; 594 if (!res) { 595 mlog(ML_ERROR, "NULL lockres pointer in convert " 596 "request\n"); 597 goto error; 598 } 599 dlm_lockres_get(res); 600 601 /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 602 * static after the original lock call. convert requests will 603 * ensure that everything is the same, or return DLM_BADARGS. 604 * this means that DLM_DENIED_NOASTS will never be returned. 605 */ 606 if (lock->lksb != lksb || lock->ast != ast || 607 lock->bast != bast || lock->astdata != data) { 608 status = DLM_BADARGS; 609 mlog(ML_ERROR, "new args: lksb=%p, ast=%p, bast=%p, " 610 "astdata=%p\n", lksb, ast, bast, data); 611 mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, " 612 "astdata=%p\n", lock->lksb, lock->ast, 613 lock->bast, lock->astdata); 614 goto error; 615 } 616retry_convert: 617 dlm_wait_for_recovery(dlm); 618 619 if (res->owner == dlm->node_num) 620 status = dlmconvert_master(dlm, res, lock, flags, mode); 621 else 622 status = dlmconvert_remote(dlm, res, lock, flags, mode); 623 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 624 status == DLM_FORWARD) { 625 /* for now, see how this works without sleeping 626 * and just retry right away. I suspect the reco 627 * or migration will complete fast enough that 628 * no waiting will be necessary */ 629 mlog(0, "retrying convert with migration/recovery/" 630 "in-progress\n"); 631 msleep(100); 632 goto retry_convert; 633 } 634 } else { 635 u64 tmpcookie; 636 637 /* LOCK request */ 638 status = DLM_BADARGS; 639 if (!name) { 640 dlm_error(status); 641 goto error; 642 } 643 644 status = DLM_IVBUFLEN; 645 if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) { 646 dlm_error(status); 647 goto error; 648 } 649 650 dlm_get_next_cookie(dlm->node_num, &tmpcookie); 651 lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb); 652 if (!lock) { 653 dlm_error(status); 654 goto error; 655 } 656 657 if (!recovery) 658 dlm_wait_for_recovery(dlm); 659 660 /* find or create the lock resource */ 661 res = dlm_get_lock_resource(dlm, name, namelen, flags); 662 if (!res) { 663 status = DLM_IVLOCKID; 664 dlm_error(status); 665 goto error; 666 } 667 668 mlog(0, "type=%d, flags = 0x%x\n", mode, flags); 669 mlog(0, "creating lock: lock=%p res=%p\n", lock, res); 670 671 dlm_lock_attach_lockres(lock, res); 672 lock->ast = ast; 673 lock->bast = bast; 674 lock->astdata = data; 675 676retry_lock: 677 if (flags & LKM_VALBLK) { 678 mlog(0, "LKM_VALBLK passed by caller\n"); 679 680 /* LVB requests for non PR, PW or EX locks are 681 * ignored. */ 682 if (mode < LKM_PRMODE) 683 flags &= ~LKM_VALBLK; 684 else { 685 flags |= LKM_GET_LVB; 686 lock->lksb->flags |= DLM_LKSB_GET_LVB; 687 } 688 } 689 690 if (res->owner == dlm->node_num) 691 status = dlmlock_master(dlm, res, lock, flags); 692 else 693 status = dlmlock_remote(dlm, res, lock, flags); 694 695 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 696 status == DLM_FORWARD) { 697 msleep(100); 698 if (recovery) { 699 if (status != DLM_RECOVERING) 700 goto retry_lock; 701 /* wait to see the node go down, then 702 * drop down and allow the lockres to 703 * get cleaned up. need to remaster. */ 704 dlm_wait_for_node_death(dlm, res->owner, 705 DLM_NODE_DEATH_WAIT_MAX); 706 } else { 707 dlm_wait_for_recovery(dlm); 708 goto retry_lock; 709 } 710 } 711 712 /* Inflight taken in dlm_get_lock_resource() is dropped here */ 713 spin_lock(&res->spinlock); 714 dlm_lockres_drop_inflight_ref(dlm, res); 715 spin_unlock(&res->spinlock); 716 717 dlm_lockres_calc_usage(dlm, res); 718 dlm_kick_thread(dlm, res); 719 720 if (status != DLM_NORMAL) { 721 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 722 if (status != DLM_NOTQUEUED) 723 dlm_error(status); 724 goto error; 725 } 726 } 727 728error: 729 if (status != DLM_NORMAL) { 730 if (lock && !convert) 731 dlm_lock_put(lock); 732 // this is kind of unnecessary 733 lksb->status = status; 734 } 735 736 /* put lockres ref from the convert path 737 * or from dlm_get_lock_resource */ 738 if (res) 739 dlm_lockres_put(res); 740 741 return status; 742} 743EXPORT_SYMBOL_GPL(dlmlock);