rcom.c (17435B)
1// SPDX-License-Identifier: GPL-2.0-only 2/****************************************************************************** 3******************************************************************************* 4** 5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 7** 8** 9******************************************************************************* 10******************************************************************************/ 11 12#include "dlm_internal.h" 13#include "lockspace.h" 14#include "member.h" 15#include "lowcomms.h" 16#include "midcomms.h" 17#include "rcom.h" 18#include "recover.h" 19#include "dir.h" 20#include "config.h" 21#include "memory.h" 22#include "lock.h" 23#include "util.h" 24 25static int rcom_response(struct dlm_ls *ls) 26{ 27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags); 28} 29 30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 31 struct dlm_rcom **rc_ret, char *mb, int mb_len) 32{ 33 struct dlm_rcom *rc; 34 35 rc = (struct dlm_rcom *) mb; 36 37 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 38 rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 39 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 40 rc->rc_header.h_length = cpu_to_le16(mb_len); 41 rc->rc_header.h_cmd = DLM_RCOM; 42 43 rc->rc_type = cpu_to_le32(type); 44 45 spin_lock(&ls->ls_recover_lock); 46 rc->rc_seq = cpu_to_le64(ls->ls_recover_seq); 47 spin_unlock(&ls->ls_recover_lock); 48 49 *rc_ret = rc; 50} 51 52static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 53 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) 54{ 55 int mb_len = sizeof(struct dlm_rcom) + len; 56 struct dlm_mhandle *mh; 57 char *mb; 58 59 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); 60 if (!mh) { 61 log_print("%s to %d type %d len %d ENOBUFS", 62 __func__, to_nodeid, type, len); 63 return -ENOBUFS; 64 } 65 66 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); 67 *mh_ret = mh; 68 return 0; 69} 70 71static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, 72 int len, struct dlm_rcom **rc_ret, 73 struct dlm_msg **msg_ret) 74{ 75 int mb_len = sizeof(struct dlm_rcom) + len; 76 struct dlm_msg *msg; 77 char *mb; 78 79 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb, 80 NULL, NULL); 81 if (!msg) { 82 log_print("create_rcom to %d type %d len %d ENOBUFS", 83 to_nodeid, type, len); 84 return -ENOBUFS; 85 } 86 87 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); 88 *msg_ret = msg; 89 return 0; 90} 91 92static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc) 93{ 94 dlm_midcomms_commit_mhandle(mh); 95} 96 97static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc) 98{ 99 dlm_lowcomms_commit_msg(msg); 100 dlm_lowcomms_put_msg(msg); 101} 102 103static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs, 104 uint32_t flags) 105{ 106 rs->rs_flags = cpu_to_le32(flags); 107} 108 109/* When replying to a status request, a node also sends back its 110 configuration values. The requesting node then checks that the remote 111 node is configured the same way as itself. */ 112 113static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf, 114 uint32_t num_slots) 115{ 116 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 117 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 118 119 rf->rf_our_slot = cpu_to_le16(ls->ls_slot); 120 rf->rf_num_slots = cpu_to_le16(num_slots); 121 rf->rf_generation = cpu_to_le32(ls->ls_generation); 122} 123 124static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 125{ 126 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 127 128 if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) { 129 log_error(ls, "version mismatch: %x nodeid %d: %x", 130 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, 131 le32_to_cpu(rc->rc_header.h_version)); 132 return -EPROTO; 133 } 134 135 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 136 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 137 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 138 ls->ls_lvblen, ls->ls_exflags, nodeid, 139 le32_to_cpu(rf->rf_lvblen), 140 le32_to_cpu(rf->rf_lsflags)); 141 return -EPROTO; 142 } 143 return 0; 144} 145 146static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq) 147{ 148 spin_lock(&ls->ls_rcom_spin); 149 *new_seq = cpu_to_le64(++ls->ls_rcom_seq); 150 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 151 spin_unlock(&ls->ls_rcom_spin); 152} 153 154static void disallow_sync_reply(struct dlm_ls *ls) 155{ 156 spin_lock(&ls->ls_rcom_spin); 157 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 158 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 159 spin_unlock(&ls->ls_rcom_spin); 160} 161 162/* 163 * low nodeid gathers one slot value at a time from each node. 164 * it sets need_slots=0, and saves rf_our_slot returned from each 165 * rcom_config. 166 * 167 * other nodes gather all slot values at once from the low nodeid. 168 * they set need_slots=1, and ignore the rf_our_slot returned from each 169 * rcom_config. they use the rf_num_slots returned from the low 170 * node's rcom_config. 171 */ 172 173int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) 174{ 175 struct dlm_rcom *rc; 176 struct dlm_msg *msg; 177 int error = 0; 178 179 ls->ls_recover_nodeid = nodeid; 180 181 if (nodeid == dlm_our_nodeid()) { 182 rc = ls->ls_recover_buf; 183 rc->rc_result = cpu_to_le32(dlm_recover_status(ls)); 184 goto out; 185 } 186 187retry: 188 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, 189 sizeof(struct rcom_status), &rc, &msg); 190 if (error) 191 goto out; 192 193 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags); 194 195 allow_sync_reply(ls, &rc->rc_id); 196 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 197 198 send_rcom_stateless(msg, rc); 199 200 error = dlm_wait_function(ls, &rcom_response); 201 disallow_sync_reply(ls); 202 if (error == -ETIMEDOUT) 203 goto retry; 204 if (error) 205 goto out; 206 207 rc = ls->ls_recover_buf; 208 209 if (rc->rc_result == cpu_to_le32(-ESRCH)) { 210 /* we pretend the remote lockspace exists with 0 status */ 211 log_debug(ls, "remote node %d not ready", nodeid); 212 rc->rc_result = 0; 213 error = 0; 214 } else { 215 error = check_rcom_config(ls, rc, nodeid); 216 } 217 218 /* the caller looks at rc_result for the remote recovery status */ 219 out: 220 return error; 221} 222 223static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) 224{ 225 struct dlm_rcom *rc; 226 struct rcom_status *rs; 227 uint32_t status; 228 int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 229 int len = sizeof(struct rcom_config); 230 struct dlm_msg *msg; 231 int num_slots = 0; 232 int error; 233 234 if (!dlm_slots_version(&rc_in->rc_header)) { 235 status = dlm_recover_status(ls); 236 goto do_create; 237 } 238 239 rs = (struct rcom_status *)rc_in->rc_buf; 240 241 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) { 242 status = dlm_recover_status(ls); 243 goto do_create; 244 } 245 246 spin_lock(&ls->ls_recover_lock); 247 status = ls->ls_recover_status; 248 num_slots = ls->ls_num_slots; 249 spin_unlock(&ls->ls_recover_lock); 250 len += num_slots * sizeof(struct rcom_slot); 251 252 do_create: 253 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, 254 len, &rc, &msg); 255 if (error) 256 return; 257 258 rc->rc_id = rc_in->rc_id; 259 rc->rc_seq_reply = rc_in->rc_seq; 260 rc->rc_result = cpu_to_le32(status); 261 262 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots); 263 264 if (!num_slots) 265 goto do_send; 266 267 spin_lock(&ls->ls_recover_lock); 268 if (ls->ls_num_slots != num_slots) { 269 spin_unlock(&ls->ls_recover_lock); 270 log_debug(ls, "receive_rcom_status num_slots %d to %d", 271 num_slots, ls->ls_num_slots); 272 rc->rc_result = 0; 273 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0); 274 goto do_send; 275 } 276 277 dlm_slots_copy_out(ls, rc); 278 spin_unlock(&ls->ls_recover_lock); 279 280 do_send: 281 send_rcom_stateless(msg, rc); 282} 283 284static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 285{ 286 spin_lock(&ls->ls_rcom_spin); 287 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || 288 le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) { 289 log_debug(ls, "reject reply %d from %d seq %llx expect %llx", 290 le32_to_cpu(rc_in->rc_type), 291 le32_to_cpu(rc_in->rc_header.h_nodeid), 292 (unsigned long long)le64_to_cpu(rc_in->rc_id), 293 (unsigned long long)ls->ls_rcom_seq); 294 goto out; 295 } 296 memcpy(ls->ls_recover_buf, rc_in, 297 le16_to_cpu(rc_in->rc_header.h_length)); 298 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 299 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 300 wake_up(&ls->ls_wait_general); 301 out: 302 spin_unlock(&ls->ls_rcom_spin); 303} 304 305int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) 306{ 307 struct dlm_rcom *rc; 308 struct dlm_msg *msg; 309 int error = 0; 310 311 ls->ls_recover_nodeid = nodeid; 312 313retry: 314 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, 315 &rc, &msg); 316 if (error) 317 goto out; 318 memcpy(rc->rc_buf, last_name, last_len); 319 320 allow_sync_reply(ls, &rc->rc_id); 321 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 322 323 send_rcom_stateless(msg, rc); 324 325 error = dlm_wait_function(ls, &rcom_response); 326 disallow_sync_reply(ls); 327 if (error == -ETIMEDOUT) 328 goto retry; 329 out: 330 return error; 331} 332 333static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) 334{ 335 struct dlm_rcom *rc; 336 int error, inlen, outlen, nodeid; 337 struct dlm_msg *msg; 338 339 nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 340 inlen = le16_to_cpu(rc_in->rc_header.h_length) - 341 sizeof(struct dlm_rcom); 342 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); 343 344 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, 345 &rc, &msg); 346 if (error) 347 return; 348 rc->rc_id = rc_in->rc_id; 349 rc->rc_seq_reply = rc_in->rc_seq; 350 351 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 352 nodeid); 353 send_rcom_stateless(msg, rc); 354} 355 356int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) 357{ 358 struct dlm_rcom *rc; 359 struct dlm_mhandle *mh; 360 struct dlm_ls *ls = r->res_ls; 361 int error; 362 363 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, 364 &rc, &mh); 365 if (error) 366 goto out; 367 memcpy(rc->rc_buf, r->res_name, r->res_length); 368 rc->rc_id = cpu_to_le64(r->res_id); 369 370 send_rcom(mh, rc); 371 out: 372 return error; 373} 374 375static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) 376{ 377 struct dlm_rcom *rc; 378 struct dlm_mhandle *mh; 379 int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 380 int len = le16_to_cpu(rc_in->rc_header.h_length) - 381 sizeof(struct dlm_rcom); 382 383 /* Old code would send this special id to trigger a debug dump. */ 384 if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) { 385 log_error(ls, "receive_rcom_lookup dump from %d", nodeid); 386 dlm_dump_rsb_name(ls, rc_in->rc_buf, len); 387 return; 388 } 389 390 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); 391 if (error) 392 return; 393 394 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len, 395 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL); 396 if (error) 397 ret_nodeid = error; 398 rc->rc_result = cpu_to_le32(ret_nodeid); 399 rc->rc_id = rc_in->rc_id; 400 rc->rc_seq_reply = rc_in->rc_seq; 401 402 send_rcom(mh, rc); 403} 404 405static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 406{ 407 dlm_recover_master_reply(ls, rc_in); 408} 409 410static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, 411 struct rcom_lock *rl) 412{ 413 memset(rl, 0, sizeof(*rl)); 414 415 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid); 416 rl->rl_lkid = cpu_to_le32(lkb->lkb_id); 417 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags); 418 rl->rl_flags = cpu_to_le32(lkb->lkb_flags); 419 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 420 rl->rl_rqmode = lkb->lkb_rqmode; 421 rl->rl_grmode = lkb->lkb_grmode; 422 rl->rl_status = lkb->lkb_status; 423 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type); 424 425 if (lkb->lkb_bastfn) 426 rl->rl_asts |= DLM_CB_BAST; 427 if (lkb->lkb_astfn) 428 rl->rl_asts |= DLM_CB_CAST; 429 430 rl->rl_namelen = cpu_to_le16(r->res_length); 431 memcpy(rl->rl_name, r->res_name, r->res_length); 432 433 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ? 434 If so, receive_rcom_lock_args() won't take this copy. */ 435 436 if (lkb->lkb_lvbptr) 437 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 438} 439 440int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 441{ 442 struct dlm_ls *ls = r->res_ls; 443 struct dlm_rcom *rc; 444 struct dlm_mhandle *mh; 445 struct rcom_lock *rl; 446 int error, len = sizeof(struct rcom_lock); 447 448 if (lkb->lkb_lvbptr) 449 len += ls->ls_lvblen; 450 451 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); 452 if (error) 453 goto out; 454 455 rl = (struct rcom_lock *) rc->rc_buf; 456 pack_rcom_lock(r, lkb, rl); 457 rc->rc_id = cpu_to_le64((uintptr_t)r); 458 459 send_rcom(mh, rc); 460 out: 461 return error; 462} 463 464/* needs at least dlm_rcom + rcom_lock */ 465static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) 466{ 467 struct dlm_rcom *rc; 468 struct dlm_mhandle *mh; 469 int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 470 471 dlm_recover_master_copy(ls, rc_in); 472 473 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, 474 sizeof(struct rcom_lock), &rc, &mh); 475 if (error) 476 return; 477 478 /* We send back the same rcom_lock struct we received, but 479 dlm_recover_master_copy() has filled in rl_remid and rl_result */ 480 481 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 482 rc->rc_id = rc_in->rc_id; 483 rc->rc_seq_reply = rc_in->rc_seq; 484 485 send_rcom(mh, rc); 486} 487 488/* If the lockspace doesn't exist then still send a status message 489 back; it's possible that it just doesn't have its global_id yet. */ 490 491int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) 492{ 493 struct dlm_rcom *rc; 494 struct rcom_config *rf; 495 struct dlm_mhandle *mh; 496 char *mb; 497 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); 498 499 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb); 500 if (!mh) 501 return -ENOBUFS; 502 503 rc = (struct dlm_rcom *) mb; 504 505 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 506 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace; 507 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 508 rc->rc_header.h_length = cpu_to_le16(mb_len); 509 rc->rc_header.h_cmd = DLM_RCOM; 510 511 rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY); 512 rc->rc_id = rc_in->rc_id; 513 rc->rc_seq_reply = rc_in->rc_seq; 514 rc->rc_result = cpu_to_le32(-ESRCH); 515 516 rf = (struct rcom_config *) rc->rc_buf; 517 rf->rf_lvblen = cpu_to_le32(~0U); 518 519 dlm_midcomms_commit_mhandle(mh); 520 521 return 0; 522} 523 524/* 525 * Ignore messages for stage Y before we set 526 * recover_status bit for stage X: 527 * 528 * recover_status = 0 529 * 530 * dlm_recover_members() 531 * - send nothing 532 * - recv nothing 533 * - ignore NAMES, NAMES_REPLY 534 * - ignore LOOKUP, LOOKUP_REPLY 535 * - ignore LOCK, LOCK_REPLY 536 * 537 * recover_status |= NODES 538 * 539 * dlm_recover_members_wait() 540 * 541 * dlm_recover_directory() 542 * - send NAMES 543 * - recv NAMES_REPLY 544 * - ignore LOOKUP, LOOKUP_REPLY 545 * - ignore LOCK, LOCK_REPLY 546 * 547 * recover_status |= DIR 548 * 549 * dlm_recover_directory_wait() 550 * 551 * dlm_recover_masters() 552 * - send LOOKUP 553 * - recv LOOKUP_REPLY 554 * 555 * dlm_recover_locks() 556 * - send LOCKS 557 * - recv LOCKS_REPLY 558 * 559 * recover_status |= LOCKS 560 * 561 * dlm_recover_locks_wait() 562 * 563 * recover_status |= DONE 564 */ 565 566/* Called by dlm_recv; corresponds to dlm_receive_message() but special 567 recovery-only comms are sent through here. */ 568 569void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 570{ 571 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 572 int stop, reply = 0, names = 0, lookup = 0, lock = 0; 573 uint32_t status; 574 uint64_t seq; 575 576 switch (rc->rc_type) { 577 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 578 reply = 1; 579 break; 580 case cpu_to_le32(DLM_RCOM_NAMES): 581 names = 1; 582 break; 583 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 584 names = 1; 585 reply = 1; 586 break; 587 case cpu_to_le32(DLM_RCOM_LOOKUP): 588 lookup = 1; 589 break; 590 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY): 591 lookup = 1; 592 reply = 1; 593 break; 594 case cpu_to_le32(DLM_RCOM_LOCK): 595 lock = 1; 596 break; 597 case cpu_to_le32(DLM_RCOM_LOCK_REPLY): 598 lock = 1; 599 reply = 1; 600 break; 601 } 602 603 spin_lock(&ls->ls_recover_lock); 604 status = ls->ls_recover_status; 605 stop = dlm_recovery_stopped(ls); 606 seq = ls->ls_recover_seq; 607 spin_unlock(&ls->ls_recover_lock); 608 609 if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS))) 610 goto ignore; 611 612 if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq)) 613 goto ignore; 614 615 if (!(status & DLM_RS_NODES) && (names || lookup || lock)) 616 goto ignore; 617 618 if (!(status & DLM_RS_DIR) && (lookup || lock)) 619 goto ignore; 620 621 switch (rc->rc_type) { 622 case cpu_to_le32(DLM_RCOM_STATUS): 623 receive_rcom_status(ls, rc); 624 break; 625 626 case cpu_to_le32(DLM_RCOM_NAMES): 627 receive_rcom_names(ls, rc); 628 break; 629 630 case cpu_to_le32(DLM_RCOM_LOOKUP): 631 receive_rcom_lookup(ls, rc); 632 break; 633 634 case cpu_to_le32(DLM_RCOM_LOCK): 635 if (le16_to_cpu(rc->rc_header.h_length) < lock_size) 636 goto Eshort; 637 receive_rcom_lock(ls, rc); 638 break; 639 640 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 641 receive_sync_reply(ls, rc); 642 break; 643 644 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 645 receive_sync_reply(ls, rc); 646 break; 647 648 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY): 649 receive_rcom_lookup_reply(ls, rc); 650 break; 651 652 case cpu_to_le32(DLM_RCOM_LOCK_REPLY): 653 if (le16_to_cpu(rc->rc_header.h_length) < lock_size) 654 goto Eshort; 655 dlm_recover_process_copy(ls, rc); 656 break; 657 658 default: 659 log_error(ls, "receive_rcom bad type %d", 660 le32_to_cpu(rc->rc_type)); 661 } 662 return; 663 664ignore: 665 log_limit(ls, "dlm_receive_rcom ignore msg %d " 666 "from %d %llu %llu recover seq %llu sts %x gen %u", 667 le32_to_cpu(rc->rc_type), 668 nodeid, 669 (unsigned long long)le64_to_cpu(rc->rc_seq), 670 (unsigned long long)le64_to_cpu(rc->rc_seq_reply), 671 (unsigned long long)seq, 672 status, ls->ls_generation); 673 return; 674Eshort: 675 log_error(ls, "recovery message %d from %d is too short", 676 le32_to_cpu(rc->rc_type), nodeid); 677} 678