mon_client.c (39150B)
1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/module.h> 5#include <linux/types.h> 6#include <linux/slab.h> 7#include <linux/random.h> 8#include <linux/sched.h> 9 10#include <linux/ceph/ceph_features.h> 11#include <linux/ceph/mon_client.h> 12#include <linux/ceph/libceph.h> 13#include <linux/ceph/debugfs.h> 14#include <linux/ceph/decode.h> 15#include <linux/ceph/auth.h> 16 17/* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35static const struct ceph_connection_operations mon_con_ops; 36 37static int __validate_auth(struct ceph_mon_client *monc); 38 39static int decode_mon_info(void **p, void *end, bool msgr2, 40 struct ceph_entity_addr *addr) 41{ 42 void *mon_info_end; 43 u32 struct_len; 44 u8 struct_v; 45 int ret; 46 47 ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v, 48 &struct_len); 49 if (ret) 50 return ret; 51 52 mon_info_end = *p + struct_len; 53 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 54 ret = ceph_decode_entity_addrvec(p, end, msgr2, addr); 55 if (ret) 56 return ret; 57 58 *p = mon_info_end; 59 return 0; 60 61e_inval: 62 return -EINVAL; 63} 64 65/* 66 * Decode a monmap blob (e.g., during mount). 67 * 68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC). 69 */ 70static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2) 71{ 72 struct ceph_monmap *monmap = NULL; 73 struct ceph_fsid fsid; 74 u32 struct_len; 75 int blob_len; 76 int num_mon; 77 u8 struct_v; 78 u32 epoch; 79 int ret; 80 int i; 81 82 ceph_decode_32_safe(p, end, blob_len, e_inval); 83 ceph_decode_need(p, end, blob_len, e_inval); 84 85 ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len); 86 if (ret) 87 goto fail; 88 89 dout("%s struct_v %d\n", __func__, struct_v); 90 ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval); 91 ceph_decode_32_safe(p, end, epoch, e_inval); 92 if (struct_v >= 6) { 93 u32 feat_struct_len; 94 u8 feat_struct_v; 95 96 *p += sizeof(struct ceph_timespec); /* skip last_changed */ 97 *p += sizeof(struct ceph_timespec); /* skip created */ 98 99 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 100 &feat_struct_v, &feat_struct_len); 101 if (ret) 102 goto fail; 103 104 *p += feat_struct_len; /* skip persistent_features */ 105 106 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 107 &feat_struct_v, &feat_struct_len); 108 if (ret) 109 goto fail; 110 111 *p += feat_struct_len; /* skip optional_features */ 112 } 113 ceph_decode_32_safe(p, end, num_mon, e_inval); 114 115 dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch, 116 num_mon); 117 if (num_mon > CEPH_MAX_MON) 118 goto e_inval; 119 120 monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO); 121 if (!monmap) { 122 ret = -ENOMEM; 123 goto fail; 124 } 125 monmap->fsid = fsid; 126 monmap->epoch = epoch; 127 monmap->num_mon = num_mon; 128 129 /* legacy_mon_addr map or mon_info map */ 130 for (i = 0; i < num_mon; i++) { 131 struct ceph_entity_inst *inst = &monmap->mon_inst[i]; 132 133 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 134 inst->name.type = CEPH_ENTITY_TYPE_MON; 135 inst->name.num = cpu_to_le64(i); 136 137 if (struct_v >= 6) 138 ret = decode_mon_info(p, end, msgr2, &inst->addr); 139 else 140 ret = ceph_decode_entity_addr(p, end, &inst->addr); 141 if (ret) 142 goto fail; 143 144 dout("%s mon%d addr %s\n", __func__, i, 145 ceph_pr_addr(&inst->addr)); 146 } 147 148 return monmap; 149 150e_inval: 151 ret = -EINVAL; 152fail: 153 kfree(monmap); 154 return ERR_PTR(ret); 155} 156 157/* 158 * return true if *addr is included in the monmap. 159 */ 160int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 161{ 162 int i; 163 164 for (i = 0; i < m->num_mon; i++) { 165 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr)) 166 return 1; 167 } 168 169 return 0; 170} 171 172/* 173 * Send an auth request. 174 */ 175static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 176{ 177 monc->pending_auth = 1; 178 monc->m_auth->front.iov_len = len; 179 monc->m_auth->hdr.front_len = cpu_to_le32(len); 180 ceph_msg_revoke(monc->m_auth); 181 ceph_msg_get(monc->m_auth); /* keep our ref */ 182 ceph_con_send(&monc->con, monc->m_auth); 183} 184 185/* 186 * Close monitor session, if any. 187 */ 188static void __close_session(struct ceph_mon_client *monc) 189{ 190 dout("__close_session closing mon%d\n", monc->cur_mon); 191 ceph_msg_revoke(monc->m_auth); 192 ceph_msg_revoke_incoming(monc->m_auth_reply); 193 ceph_msg_revoke(monc->m_subscribe); 194 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 195 ceph_con_close(&monc->con); 196 197 monc->pending_auth = 0; 198 ceph_auth_reset(monc->auth); 199} 200 201/* 202 * Pick a new monitor at random and set cur_mon. If we are repicking 203 * (i.e. cur_mon is already set), be sure to pick a different one. 204 */ 205static void pick_new_mon(struct ceph_mon_client *monc) 206{ 207 int old_mon = monc->cur_mon; 208 209 BUG_ON(monc->monmap->num_mon < 1); 210 211 if (monc->monmap->num_mon == 1) { 212 monc->cur_mon = 0; 213 } else { 214 int max = monc->monmap->num_mon; 215 int o = -1; 216 int n; 217 218 if (monc->cur_mon >= 0) { 219 if (monc->cur_mon < monc->monmap->num_mon) 220 o = monc->cur_mon; 221 if (o >= 0) 222 max--; 223 } 224 225 n = prandom_u32() % max; 226 if (o >= 0 && n >= o) 227 n++; 228 229 monc->cur_mon = n; 230 } 231 232 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 233 monc->cur_mon, monc->monmap->num_mon); 234} 235 236/* 237 * Open a session with a new monitor. 238 */ 239static void __open_session(struct ceph_mon_client *monc) 240{ 241 int ret; 242 243 pick_new_mon(monc); 244 245 monc->hunting = true; 246 if (monc->had_a_connection) { 247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 250 } 251 252 monc->sub_renew_after = jiffies; /* i.e., expired */ 253 monc->sub_renew_sent = 0; 254 255 dout("%s opening mon%d\n", __func__, monc->cur_mon); 256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 257 &monc->monmap->mon_inst[monc->cur_mon].addr); 258 259 /* 260 * Queue a keepalive to ensure that in case of an early fault 261 * the messenger doesn't put us into STANDBY state and instead 262 * retries. This also ensures that our timestamp is valid by 263 * the time we finish hunting and delayed_work() checks it. 264 */ 265 ceph_con_keepalive(&monc->con); 266 if (ceph_msgr2(monc->client)) { 267 monc->pending_auth = 1; 268 return; 269 } 270 271 /* initiate authentication handshake */ 272 ret = ceph_auth_build_hello(monc->auth, 273 monc->m_auth->front.iov_base, 274 monc->m_auth->front_alloc_len); 275 BUG_ON(ret <= 0); 276 __send_prepared_auth_request(monc, ret); 277} 278 279static void reopen_session(struct ceph_mon_client *monc) 280{ 281 if (!monc->hunting) 282 pr_info("mon%d %s session lost, hunting for new mon\n", 283 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 284 285 __close_session(monc); 286 __open_session(monc); 287} 288 289void ceph_monc_reopen_session(struct ceph_mon_client *monc) 290{ 291 mutex_lock(&monc->mutex); 292 reopen_session(monc); 293 mutex_unlock(&monc->mutex); 294} 295 296static void un_backoff(struct ceph_mon_client *monc) 297{ 298 monc->hunt_mult /= 2; /* reduce by 50% */ 299 if (monc->hunt_mult < 1) 300 monc->hunt_mult = 1; 301 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 302} 303 304/* 305 * Reschedule delayed work timer. 306 */ 307static void __schedule_delayed(struct ceph_mon_client *monc) 308{ 309 unsigned long delay; 310 311 if (monc->hunting) 312 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 313 else 314 delay = CEPH_MONC_PING_INTERVAL; 315 316 dout("__schedule_delayed after %lu\n", delay); 317 mod_delayed_work(system_wq, &monc->delayed_work, 318 round_jiffies_relative(delay)); 319} 320 321const char *ceph_sub_str[] = { 322 [CEPH_SUB_MONMAP] = "monmap", 323 [CEPH_SUB_OSDMAP] = "osdmap", 324 [CEPH_SUB_FSMAP] = "fsmap.user", 325 [CEPH_SUB_MDSMAP] = "mdsmap", 326}; 327 328/* 329 * Send subscribe request for one or more maps, according to 330 * monc->subs. 331 */ 332static void __send_subscribe(struct ceph_mon_client *monc) 333{ 334 struct ceph_msg *msg = monc->m_subscribe; 335 void *p = msg->front.iov_base; 336 void *const end = p + msg->front_alloc_len; 337 int num = 0; 338 int i; 339 340 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 341 342 BUG_ON(monc->cur_mon < 0); 343 344 if (!monc->sub_renew_sent) 345 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 346 347 msg->hdr.version = cpu_to_le16(2); 348 349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 350 if (monc->subs[i].want) 351 num++; 352 } 353 BUG_ON(num < 1); /* monmap sub is always there */ 354 ceph_encode_32(&p, num); 355 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 356 char buf[32]; 357 int len; 358 359 if (!monc->subs[i].want) 360 continue; 361 362 len = sprintf(buf, "%s", ceph_sub_str[i]); 363 if (i == CEPH_SUB_MDSMAP && 364 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 365 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 366 367 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 368 le64_to_cpu(monc->subs[i].item.start), 369 monc->subs[i].item.flags); 370 ceph_encode_string(&p, end, buf, len); 371 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 372 p += sizeof(monc->subs[i].item); 373 } 374 375 BUG_ON(p > end); 376 msg->front.iov_len = p - msg->front.iov_base; 377 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 378 ceph_msg_revoke(msg); 379 ceph_con_send(&monc->con, ceph_msg_get(msg)); 380} 381 382static void handle_subscribe_ack(struct ceph_mon_client *monc, 383 struct ceph_msg *msg) 384{ 385 unsigned int seconds; 386 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 387 388 if (msg->front.iov_len < sizeof(*h)) 389 goto bad; 390 seconds = le32_to_cpu(h->duration); 391 392 mutex_lock(&monc->mutex); 393 if (monc->sub_renew_sent) { 394 /* 395 * This is only needed for legacy (infernalis or older) 396 * MONs -- see delayed_work(). 397 */ 398 monc->sub_renew_after = monc->sub_renew_sent + 399 (seconds >> 1) * HZ - 1; 400 dout("%s sent %lu duration %d renew after %lu\n", __func__, 401 monc->sub_renew_sent, seconds, monc->sub_renew_after); 402 monc->sub_renew_sent = 0; 403 } else { 404 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 405 monc->sub_renew_sent, monc->sub_renew_after); 406 } 407 mutex_unlock(&monc->mutex); 408 return; 409bad: 410 pr_err("got corrupt subscribe-ack msg\n"); 411 ceph_msg_dump(msg); 412} 413 414/* 415 * Register interest in a map 416 * 417 * @sub: one of CEPH_SUB_* 418 * @epoch: X for "every map since X", or 0 for "just the latest" 419 */ 420static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 421 u32 epoch, bool continuous) 422{ 423 __le64 start = cpu_to_le64(epoch); 424 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 425 426 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 427 epoch, continuous); 428 429 if (monc->subs[sub].want && 430 monc->subs[sub].item.start == start && 431 monc->subs[sub].item.flags == flags) 432 return false; 433 434 monc->subs[sub].item.start = start; 435 monc->subs[sub].item.flags = flags; 436 monc->subs[sub].want = true; 437 438 return true; 439} 440 441bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 442 bool continuous) 443{ 444 bool need_request; 445 446 mutex_lock(&monc->mutex); 447 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 448 mutex_unlock(&monc->mutex); 449 450 return need_request; 451} 452EXPORT_SYMBOL(ceph_monc_want_map); 453 454/* 455 * Keep track of which maps we have 456 * 457 * @sub: one of CEPH_SUB_* 458 */ 459static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 460 u32 epoch) 461{ 462 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 463 464 if (monc->subs[sub].want) { 465 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 466 monc->subs[sub].want = false; 467 else 468 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 469 } 470 471 monc->subs[sub].have = epoch; 472} 473 474void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 475{ 476 mutex_lock(&monc->mutex); 477 __ceph_monc_got_map(monc, sub, epoch); 478 mutex_unlock(&monc->mutex); 479} 480EXPORT_SYMBOL(ceph_monc_got_map); 481 482void ceph_monc_renew_subs(struct ceph_mon_client *monc) 483{ 484 mutex_lock(&monc->mutex); 485 __send_subscribe(monc); 486 mutex_unlock(&monc->mutex); 487} 488EXPORT_SYMBOL(ceph_monc_renew_subs); 489 490/* 491 * Wait for an osdmap with a given epoch. 492 * 493 * @epoch: epoch to wait for 494 * @timeout: in jiffies, 0 means "wait forever" 495 */ 496int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 497 unsigned long timeout) 498{ 499 unsigned long started = jiffies; 500 long ret; 501 502 mutex_lock(&monc->mutex); 503 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 504 mutex_unlock(&monc->mutex); 505 506 if (timeout && time_after_eq(jiffies, started + timeout)) 507 return -ETIMEDOUT; 508 509 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 510 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 511 ceph_timeout_jiffies(timeout)); 512 if (ret < 0) 513 return ret; 514 515 mutex_lock(&monc->mutex); 516 } 517 518 mutex_unlock(&monc->mutex); 519 return 0; 520} 521EXPORT_SYMBOL(ceph_monc_wait_osdmap); 522 523/* 524 * Open a session with a random monitor. Request monmap and osdmap, 525 * which are waited upon in __ceph_open_session(). 526 */ 527int ceph_monc_open_session(struct ceph_mon_client *monc) 528{ 529 mutex_lock(&monc->mutex); 530 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 531 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 532 __open_session(monc); 533 __schedule_delayed(monc); 534 mutex_unlock(&monc->mutex); 535 return 0; 536} 537EXPORT_SYMBOL(ceph_monc_open_session); 538 539static void ceph_monc_handle_map(struct ceph_mon_client *monc, 540 struct ceph_msg *msg) 541{ 542 struct ceph_client *client = monc->client; 543 struct ceph_monmap *monmap; 544 void *p, *end; 545 546 mutex_lock(&monc->mutex); 547 548 dout("handle_monmap\n"); 549 p = msg->front.iov_base; 550 end = p + msg->front.iov_len; 551 552 monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client)); 553 if (IS_ERR(monmap)) { 554 pr_err("problem decoding monmap, %d\n", 555 (int)PTR_ERR(monmap)); 556 ceph_msg_dump(msg); 557 goto out; 558 } 559 560 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 561 kfree(monmap); 562 goto out; 563 } 564 565 kfree(monc->monmap); 566 monc->monmap = monmap; 567 568 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 569 client->have_fsid = true; 570 571out: 572 mutex_unlock(&monc->mutex); 573 wake_up_all(&client->auth_wq); 574} 575 576/* 577 * generic requests (currently statfs, mon_get_version) 578 */ 579DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 580 581static void release_generic_request(struct kref *kref) 582{ 583 struct ceph_mon_generic_request *req = 584 container_of(kref, struct ceph_mon_generic_request, kref); 585 586 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 587 req->reply); 588 WARN_ON(!RB_EMPTY_NODE(&req->node)); 589 590 if (req->reply) 591 ceph_msg_put(req->reply); 592 if (req->request) 593 ceph_msg_put(req->request); 594 595 kfree(req); 596} 597 598static void put_generic_request(struct ceph_mon_generic_request *req) 599{ 600 if (req) 601 kref_put(&req->kref, release_generic_request); 602} 603 604static void get_generic_request(struct ceph_mon_generic_request *req) 605{ 606 kref_get(&req->kref); 607} 608 609static struct ceph_mon_generic_request * 610alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 611{ 612 struct ceph_mon_generic_request *req; 613 614 req = kzalloc(sizeof(*req), gfp); 615 if (!req) 616 return NULL; 617 618 req->monc = monc; 619 kref_init(&req->kref); 620 RB_CLEAR_NODE(&req->node); 621 init_completion(&req->completion); 622 623 dout("%s greq %p\n", __func__, req); 624 return req; 625} 626 627static void register_generic_request(struct ceph_mon_generic_request *req) 628{ 629 struct ceph_mon_client *monc = req->monc; 630 631 WARN_ON(req->tid); 632 633 get_generic_request(req); 634 req->tid = ++monc->last_tid; 635 insert_generic_request(&monc->generic_request_tree, req); 636} 637 638static void send_generic_request(struct ceph_mon_client *monc, 639 struct ceph_mon_generic_request *req) 640{ 641 WARN_ON(!req->tid); 642 643 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 644 req->request->hdr.tid = cpu_to_le64(req->tid); 645 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 646} 647 648static void __finish_generic_request(struct ceph_mon_generic_request *req) 649{ 650 struct ceph_mon_client *monc = req->monc; 651 652 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 653 erase_generic_request(&monc->generic_request_tree, req); 654 655 ceph_msg_revoke(req->request); 656 ceph_msg_revoke_incoming(req->reply); 657} 658 659static void finish_generic_request(struct ceph_mon_generic_request *req) 660{ 661 __finish_generic_request(req); 662 put_generic_request(req); 663} 664 665static void complete_generic_request(struct ceph_mon_generic_request *req) 666{ 667 if (req->complete_cb) 668 req->complete_cb(req); 669 else 670 complete_all(&req->completion); 671 put_generic_request(req); 672} 673 674static void cancel_generic_request(struct ceph_mon_generic_request *req) 675{ 676 struct ceph_mon_client *monc = req->monc; 677 struct ceph_mon_generic_request *lookup_req; 678 679 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 680 681 mutex_lock(&monc->mutex); 682 lookup_req = lookup_generic_request(&monc->generic_request_tree, 683 req->tid); 684 if (lookup_req) { 685 WARN_ON(lookup_req != req); 686 finish_generic_request(req); 687 } 688 689 mutex_unlock(&monc->mutex); 690} 691 692static int wait_generic_request(struct ceph_mon_generic_request *req) 693{ 694 int ret; 695 696 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 697 ret = wait_for_completion_interruptible(&req->completion); 698 if (ret) 699 cancel_generic_request(req); 700 else 701 ret = req->result; /* completed */ 702 703 return ret; 704} 705 706static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 707 struct ceph_msg_header *hdr, 708 int *skip) 709{ 710 struct ceph_mon_client *monc = con->private; 711 struct ceph_mon_generic_request *req; 712 u64 tid = le64_to_cpu(hdr->tid); 713 struct ceph_msg *m; 714 715 mutex_lock(&monc->mutex); 716 req = lookup_generic_request(&monc->generic_request_tree, tid); 717 if (!req) { 718 dout("get_generic_reply %lld dne\n", tid); 719 *skip = 1; 720 m = NULL; 721 } else { 722 dout("get_generic_reply %lld got %p\n", tid, req->reply); 723 *skip = 0; 724 m = ceph_msg_get(req->reply); 725 /* 726 * we don't need to track the connection reading into 727 * this reply because we only have one open connection 728 * at a time, ever. 729 */ 730 } 731 mutex_unlock(&monc->mutex); 732 return m; 733} 734 735/* 736 * statfs 737 */ 738static void handle_statfs_reply(struct ceph_mon_client *monc, 739 struct ceph_msg *msg) 740{ 741 struct ceph_mon_generic_request *req; 742 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 743 u64 tid = le64_to_cpu(msg->hdr.tid); 744 745 dout("%s msg %p tid %llu\n", __func__, msg, tid); 746 747 if (msg->front.iov_len != sizeof(*reply)) 748 goto bad; 749 750 mutex_lock(&monc->mutex); 751 req = lookup_generic_request(&monc->generic_request_tree, tid); 752 if (!req) { 753 mutex_unlock(&monc->mutex); 754 return; 755 } 756 757 req->result = 0; 758 *req->u.st = reply->st; /* struct */ 759 __finish_generic_request(req); 760 mutex_unlock(&monc->mutex); 761 762 complete_generic_request(req); 763 return; 764 765bad: 766 pr_err("corrupt statfs reply, tid %llu\n", tid); 767 ceph_msg_dump(msg); 768} 769 770/* 771 * Do a synchronous statfs(). 772 */ 773int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 774 struct ceph_statfs *buf) 775{ 776 struct ceph_mon_generic_request *req; 777 struct ceph_mon_statfs *h; 778 int ret = -ENOMEM; 779 780 req = alloc_generic_request(monc, GFP_NOFS); 781 if (!req) 782 goto out; 783 784 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 785 true); 786 if (!req->request) 787 goto out; 788 789 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 790 if (!req->reply) 791 goto out; 792 793 req->u.st = buf; 794 req->request->hdr.version = cpu_to_le16(2); 795 796 mutex_lock(&monc->mutex); 797 register_generic_request(req); 798 /* fill out request */ 799 h = req->request->front.iov_base; 800 h->monhdr.have_version = 0; 801 h->monhdr.session_mon = cpu_to_le16(-1); 802 h->monhdr.session_mon_tid = 0; 803 h->fsid = monc->monmap->fsid; 804 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 805 h->data_pool = cpu_to_le64(data_pool); 806 send_generic_request(monc, req); 807 mutex_unlock(&monc->mutex); 808 809 ret = wait_generic_request(req); 810out: 811 put_generic_request(req); 812 return ret; 813} 814EXPORT_SYMBOL(ceph_monc_do_statfs); 815 816static void handle_get_version_reply(struct ceph_mon_client *monc, 817 struct ceph_msg *msg) 818{ 819 struct ceph_mon_generic_request *req; 820 u64 tid = le64_to_cpu(msg->hdr.tid); 821 void *p = msg->front.iov_base; 822 void *end = p + msg->front_alloc_len; 823 u64 handle; 824 825 dout("%s msg %p tid %llu\n", __func__, msg, tid); 826 827 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 828 handle = ceph_decode_64(&p); 829 if (tid != 0 && tid != handle) 830 goto bad; 831 832 mutex_lock(&monc->mutex); 833 req = lookup_generic_request(&monc->generic_request_tree, handle); 834 if (!req) { 835 mutex_unlock(&monc->mutex); 836 return; 837 } 838 839 req->result = 0; 840 req->u.newest = ceph_decode_64(&p); 841 __finish_generic_request(req); 842 mutex_unlock(&monc->mutex); 843 844 complete_generic_request(req); 845 return; 846 847bad: 848 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 849 ceph_msg_dump(msg); 850} 851 852static struct ceph_mon_generic_request * 853__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 854 ceph_monc_callback_t cb, u64 private_data) 855{ 856 struct ceph_mon_generic_request *req; 857 858 req = alloc_generic_request(monc, GFP_NOIO); 859 if (!req) 860 goto err_put_req; 861 862 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 863 sizeof(u64) + sizeof(u32) + strlen(what), 864 GFP_NOIO, true); 865 if (!req->request) 866 goto err_put_req; 867 868 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 869 true); 870 if (!req->reply) 871 goto err_put_req; 872 873 req->complete_cb = cb; 874 req->private_data = private_data; 875 876 mutex_lock(&monc->mutex); 877 register_generic_request(req); 878 { 879 void *p = req->request->front.iov_base; 880 void *const end = p + req->request->front_alloc_len; 881 882 ceph_encode_64(&p, req->tid); /* handle */ 883 ceph_encode_string(&p, end, what, strlen(what)); 884 WARN_ON(p != end); 885 } 886 send_generic_request(monc, req); 887 mutex_unlock(&monc->mutex); 888 889 return req; 890 891err_put_req: 892 put_generic_request(req); 893 return ERR_PTR(-ENOMEM); 894} 895 896/* 897 * Send MMonGetVersion and wait for the reply. 898 * 899 * @what: one of "mdsmap", "osdmap" or "monmap" 900 */ 901int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 902 u64 *newest) 903{ 904 struct ceph_mon_generic_request *req; 905 int ret; 906 907 req = __ceph_monc_get_version(monc, what, NULL, 0); 908 if (IS_ERR(req)) 909 return PTR_ERR(req); 910 911 ret = wait_generic_request(req); 912 if (!ret) 913 *newest = req->u.newest; 914 915 put_generic_request(req); 916 return ret; 917} 918EXPORT_SYMBOL(ceph_monc_get_version); 919 920/* 921 * Send MMonGetVersion, 922 * 923 * @what: one of "mdsmap", "osdmap" or "monmap" 924 */ 925int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 926 ceph_monc_callback_t cb, u64 private_data) 927{ 928 struct ceph_mon_generic_request *req; 929 930 req = __ceph_monc_get_version(monc, what, cb, private_data); 931 if (IS_ERR(req)) 932 return PTR_ERR(req); 933 934 put_generic_request(req); 935 return 0; 936} 937EXPORT_SYMBOL(ceph_monc_get_version_async); 938 939static void handle_command_ack(struct ceph_mon_client *monc, 940 struct ceph_msg *msg) 941{ 942 struct ceph_mon_generic_request *req; 943 void *p = msg->front.iov_base; 944 void *const end = p + msg->front_alloc_len; 945 u64 tid = le64_to_cpu(msg->hdr.tid); 946 947 dout("%s msg %p tid %llu\n", __func__, msg, tid); 948 949 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 950 sizeof(u32), bad); 951 p += sizeof(struct ceph_mon_request_header); 952 953 mutex_lock(&monc->mutex); 954 req = lookup_generic_request(&monc->generic_request_tree, tid); 955 if (!req) { 956 mutex_unlock(&monc->mutex); 957 return; 958 } 959 960 req->result = ceph_decode_32(&p); 961 __finish_generic_request(req); 962 mutex_unlock(&monc->mutex); 963 964 complete_generic_request(req); 965 return; 966 967bad: 968 pr_err("corrupt mon_command ack, tid %llu\n", tid); 969 ceph_msg_dump(msg); 970} 971 972static __printf(2, 0) 973int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 974 va_list ap) 975{ 976 struct ceph_mon_generic_request *req; 977 struct ceph_mon_command *h; 978 int ret = -ENOMEM; 979 int len; 980 981 req = alloc_generic_request(monc, GFP_NOIO); 982 if (!req) 983 goto out; 984 985 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 986 if (!req->request) 987 goto out; 988 989 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 990 true); 991 if (!req->reply) 992 goto out; 993 994 mutex_lock(&monc->mutex); 995 register_generic_request(req); 996 h = req->request->front.iov_base; 997 h->monhdr.have_version = 0; 998 h->monhdr.session_mon = cpu_to_le16(-1); 999 h->monhdr.session_mon_tid = 0; 1000 h->fsid = monc->monmap->fsid; 1001 h->num_strs = cpu_to_le32(1); 1002 len = vsprintf(h->str, fmt, ap); 1003 h->str_len = cpu_to_le32(len); 1004 send_generic_request(monc, req); 1005 mutex_unlock(&monc->mutex); 1006 1007 ret = wait_generic_request(req); 1008out: 1009 put_generic_request(req); 1010 return ret; 1011} 1012 1013static __printf(2, 3) 1014int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 1015{ 1016 va_list ap; 1017 int ret; 1018 1019 va_start(ap, fmt); 1020 ret = do_mon_command_vargs(monc, fmt, ap); 1021 va_end(ap); 1022 return ret; 1023} 1024 1025int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 1026 struct ceph_entity_addr *client_addr) 1027{ 1028 int ret; 1029 1030 ret = do_mon_command(monc, 1031 "{ \"prefix\": \"osd blocklist\", \ 1032 \"blocklistop\": \"add\", \ 1033 \"addr\": \"%pISpc/%u\" }", 1034 &client_addr->in_addr, 1035 le32_to_cpu(client_addr->nonce)); 1036 if (ret == -EINVAL) { 1037 /* 1038 * The monitor returns EINVAL on an unrecognized command. 1039 * Try the legacy command -- it is exactly the same except 1040 * for the name. 1041 */ 1042 ret = do_mon_command(monc, 1043 "{ \"prefix\": \"osd blacklist\", \ 1044 \"blacklistop\": \"add\", \ 1045 \"addr\": \"%pISpc/%u\" }", 1046 &client_addr->in_addr, 1047 le32_to_cpu(client_addr->nonce)); 1048 } 1049 if (ret) 1050 return ret; 1051 1052 /* 1053 * Make sure we have the osdmap that includes the blocklist 1054 * entry. This is needed to ensure that the OSDs pick up the 1055 * new blocklist before processing any future requests from 1056 * this client. 1057 */ 1058 return ceph_wait_for_latest_osdmap(monc->client, 0); 1059} 1060EXPORT_SYMBOL(ceph_monc_blocklist_add); 1061 1062/* 1063 * Resend pending generic requests. 1064 */ 1065static void __resend_generic_request(struct ceph_mon_client *monc) 1066{ 1067 struct ceph_mon_generic_request *req; 1068 struct rb_node *p; 1069 1070 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 1071 req = rb_entry(p, struct ceph_mon_generic_request, node); 1072 ceph_msg_revoke(req->request); 1073 ceph_msg_revoke_incoming(req->reply); 1074 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1075 } 1076} 1077 1078/* 1079 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1080 * renew/retry subscription as needed (in case it is timing out, or we 1081 * got an ENOMEM). And keep the monitor connection alive. 1082 */ 1083static void delayed_work(struct work_struct *work) 1084{ 1085 struct ceph_mon_client *monc = 1086 container_of(work, struct ceph_mon_client, delayed_work.work); 1087 1088 dout("monc delayed_work\n"); 1089 mutex_lock(&monc->mutex); 1090 if (monc->hunting) { 1091 dout("%s continuing hunt\n", __func__); 1092 reopen_session(monc); 1093 } else { 1094 int is_auth = ceph_auth_is_authenticated(monc->auth); 1095 if (ceph_con_keepalive_expired(&monc->con, 1096 CEPH_MONC_PING_TIMEOUT)) { 1097 dout("monc keepalive timeout\n"); 1098 is_auth = 0; 1099 reopen_session(monc); 1100 } 1101 1102 if (!monc->hunting) { 1103 ceph_con_keepalive(&monc->con); 1104 __validate_auth(monc); 1105 un_backoff(monc); 1106 } 1107 1108 if (is_auth && 1109 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1110 unsigned long now = jiffies; 1111 1112 dout("%s renew subs? now %lu renew after %lu\n", 1113 __func__, now, monc->sub_renew_after); 1114 if (time_after_eq(now, monc->sub_renew_after)) 1115 __send_subscribe(monc); 1116 } 1117 } 1118 __schedule_delayed(monc); 1119 mutex_unlock(&monc->mutex); 1120} 1121 1122/* 1123 * On startup, we build a temporary monmap populated with the IPs 1124 * provided by mount(2). 1125 */ 1126static int build_initial_monmap(struct ceph_mon_client *monc) 1127{ 1128 __le32 my_type = ceph_msgr2(monc->client) ? 1129 CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY; 1130 struct ceph_options *opt = monc->client->options; 1131 int num_mon = opt->num_mon; 1132 int i; 1133 1134 /* build initial monmap */ 1135 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1136 GFP_KERNEL); 1137 if (!monc->monmap) 1138 return -ENOMEM; 1139 1140 for (i = 0; i < num_mon; i++) { 1141 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i]; 1142 1143 memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr, 1144 sizeof(inst->addr.in_addr)); 1145 inst->addr.type = my_type; 1146 inst->addr.nonce = 0; 1147 inst->name.type = CEPH_ENTITY_TYPE_MON; 1148 inst->name.num = cpu_to_le64(i); 1149 } 1150 monc->monmap->num_mon = num_mon; 1151 return 0; 1152} 1153 1154int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1155{ 1156 int err; 1157 1158 dout("init\n"); 1159 memset(monc, 0, sizeof(*monc)); 1160 monc->client = cl; 1161 mutex_init(&monc->mutex); 1162 1163 err = build_initial_monmap(monc); 1164 if (err) 1165 goto out; 1166 1167 /* connection */ 1168 /* authentication */ 1169 monc->auth = ceph_auth_init(cl->options->name, cl->options->key, 1170 cl->options->con_modes); 1171 if (IS_ERR(monc->auth)) { 1172 err = PTR_ERR(monc->auth); 1173 goto out_monmap; 1174 } 1175 monc->auth->want_keys = 1176 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1177 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1178 1179 /* msgs */ 1180 err = -ENOMEM; 1181 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1182 sizeof(struct ceph_mon_subscribe_ack), 1183 GFP_KERNEL, true); 1184 if (!monc->m_subscribe_ack) 1185 goto out_auth; 1186 1187 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1188 GFP_KERNEL, true); 1189 if (!monc->m_subscribe) 1190 goto out_subscribe_ack; 1191 1192 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1193 GFP_KERNEL, true); 1194 if (!monc->m_auth_reply) 1195 goto out_subscribe; 1196 1197 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1198 monc->pending_auth = 0; 1199 if (!monc->m_auth) 1200 goto out_auth_reply; 1201 1202 ceph_con_init(&monc->con, monc, &mon_con_ops, 1203 &monc->client->msgr); 1204 1205 monc->cur_mon = -1; 1206 monc->had_a_connection = false; 1207 monc->hunt_mult = 1; 1208 1209 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1210 monc->generic_request_tree = RB_ROOT; 1211 monc->last_tid = 0; 1212 1213 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1214 1215 return 0; 1216 1217out_auth_reply: 1218 ceph_msg_put(monc->m_auth_reply); 1219out_subscribe: 1220 ceph_msg_put(monc->m_subscribe); 1221out_subscribe_ack: 1222 ceph_msg_put(monc->m_subscribe_ack); 1223out_auth: 1224 ceph_auth_destroy(monc->auth); 1225out_monmap: 1226 kfree(monc->monmap); 1227out: 1228 return err; 1229} 1230EXPORT_SYMBOL(ceph_monc_init); 1231 1232void ceph_monc_stop(struct ceph_mon_client *monc) 1233{ 1234 dout("stop\n"); 1235 cancel_delayed_work_sync(&monc->delayed_work); 1236 1237 mutex_lock(&monc->mutex); 1238 __close_session(monc); 1239 monc->cur_mon = -1; 1240 mutex_unlock(&monc->mutex); 1241 1242 /* 1243 * flush msgr queue before we destroy ourselves to ensure that: 1244 * - any work that references our embedded con is finished. 1245 * - any osd_client or other work that may reference an authorizer 1246 * finishes before we shut down the auth subsystem. 1247 */ 1248 ceph_msgr_flush(); 1249 1250 ceph_auth_destroy(monc->auth); 1251 1252 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1253 1254 ceph_msg_put(monc->m_auth); 1255 ceph_msg_put(monc->m_auth_reply); 1256 ceph_msg_put(monc->m_subscribe); 1257 ceph_msg_put(monc->m_subscribe_ack); 1258 1259 kfree(monc->monmap); 1260} 1261EXPORT_SYMBOL(ceph_monc_stop); 1262 1263static void finish_hunting(struct ceph_mon_client *monc) 1264{ 1265 if (monc->hunting) { 1266 dout("%s found mon%d\n", __func__, monc->cur_mon); 1267 monc->hunting = false; 1268 monc->had_a_connection = true; 1269 un_backoff(monc); 1270 __schedule_delayed(monc); 1271 } 1272} 1273 1274static void finish_auth(struct ceph_mon_client *monc, int auth_err, 1275 bool was_authed) 1276{ 1277 dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed); 1278 WARN_ON(auth_err > 0); 1279 1280 monc->pending_auth = 0; 1281 if (auth_err) { 1282 monc->client->auth_err = auth_err; 1283 wake_up_all(&monc->client->auth_wq); 1284 return; 1285 } 1286 1287 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) { 1288 dout("%s authenticated, starting session global_id %llu\n", 1289 __func__, monc->auth->global_id); 1290 1291 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1292 monc->client->msgr.inst.name.num = 1293 cpu_to_le64(monc->auth->global_id); 1294 1295 __send_subscribe(monc); 1296 __resend_generic_request(monc); 1297 1298 pr_info("mon%d %s session established\n", monc->cur_mon, 1299 ceph_pr_addr(&monc->con.peer_addr)); 1300 } 1301} 1302 1303static void handle_auth_reply(struct ceph_mon_client *monc, 1304 struct ceph_msg *msg) 1305{ 1306 bool was_authed; 1307 int ret; 1308 1309 mutex_lock(&monc->mutex); 1310 was_authed = ceph_auth_is_authenticated(monc->auth); 1311 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1312 msg->front.iov_len, 1313 monc->m_auth->front.iov_base, 1314 monc->m_auth->front_alloc_len); 1315 if (ret > 0) { 1316 __send_prepared_auth_request(monc, ret); 1317 } else { 1318 finish_auth(monc, ret, was_authed); 1319 finish_hunting(monc); 1320 } 1321 mutex_unlock(&monc->mutex); 1322} 1323 1324static int __validate_auth(struct ceph_mon_client *monc) 1325{ 1326 int ret; 1327 1328 if (monc->pending_auth) 1329 return 0; 1330 1331 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1332 monc->m_auth->front_alloc_len); 1333 if (ret <= 0) 1334 return ret; /* either an error, or no need to authenticate */ 1335 __send_prepared_auth_request(monc, ret); 1336 return 0; 1337} 1338 1339int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1340{ 1341 int ret; 1342 1343 mutex_lock(&monc->mutex); 1344 ret = __validate_auth(monc); 1345 mutex_unlock(&monc->mutex); 1346 return ret; 1347} 1348EXPORT_SYMBOL(ceph_monc_validate_auth); 1349 1350static int mon_get_auth_request(struct ceph_connection *con, 1351 void *buf, int *buf_len, 1352 void **authorizer, int *authorizer_len) 1353{ 1354 struct ceph_mon_client *monc = con->private; 1355 int ret; 1356 1357 mutex_lock(&monc->mutex); 1358 ret = ceph_auth_get_request(monc->auth, buf, *buf_len); 1359 mutex_unlock(&monc->mutex); 1360 if (ret < 0) 1361 return ret; 1362 1363 *buf_len = ret; 1364 *authorizer = NULL; 1365 *authorizer_len = 0; 1366 return 0; 1367} 1368 1369static int mon_handle_auth_reply_more(struct ceph_connection *con, 1370 void *reply, int reply_len, 1371 void *buf, int *buf_len, 1372 void **authorizer, int *authorizer_len) 1373{ 1374 struct ceph_mon_client *monc = con->private; 1375 int ret; 1376 1377 mutex_lock(&monc->mutex); 1378 ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len, 1379 buf, *buf_len); 1380 mutex_unlock(&monc->mutex); 1381 if (ret < 0) 1382 return ret; 1383 1384 *buf_len = ret; 1385 *authorizer = NULL; 1386 *authorizer_len = 0; 1387 return 0; 1388} 1389 1390static int mon_handle_auth_done(struct ceph_connection *con, 1391 u64 global_id, void *reply, int reply_len, 1392 u8 *session_key, int *session_key_len, 1393 u8 *con_secret, int *con_secret_len) 1394{ 1395 struct ceph_mon_client *monc = con->private; 1396 bool was_authed; 1397 int ret; 1398 1399 mutex_lock(&monc->mutex); 1400 WARN_ON(!monc->hunting); 1401 was_authed = ceph_auth_is_authenticated(monc->auth); 1402 ret = ceph_auth_handle_reply_done(monc->auth, global_id, 1403 reply, reply_len, 1404 session_key, session_key_len, 1405 con_secret, con_secret_len); 1406 finish_auth(monc, ret, was_authed); 1407 if (!ret) 1408 finish_hunting(monc); 1409 mutex_unlock(&monc->mutex); 1410 return 0; 1411} 1412 1413static int mon_handle_auth_bad_method(struct ceph_connection *con, 1414 int used_proto, int result, 1415 const int *allowed_protos, int proto_cnt, 1416 const int *allowed_modes, int mode_cnt) 1417{ 1418 struct ceph_mon_client *monc = con->private; 1419 bool was_authed; 1420 1421 mutex_lock(&monc->mutex); 1422 WARN_ON(!monc->hunting); 1423 was_authed = ceph_auth_is_authenticated(monc->auth); 1424 ceph_auth_handle_bad_method(monc->auth, used_proto, result, 1425 allowed_protos, proto_cnt, 1426 allowed_modes, mode_cnt); 1427 finish_auth(monc, -EACCES, was_authed); 1428 mutex_unlock(&monc->mutex); 1429 return 0; 1430} 1431 1432/* 1433 * handle incoming message 1434 */ 1435static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1436{ 1437 struct ceph_mon_client *monc = con->private; 1438 int type = le16_to_cpu(msg->hdr.type); 1439 1440 switch (type) { 1441 case CEPH_MSG_AUTH_REPLY: 1442 handle_auth_reply(monc, msg); 1443 break; 1444 1445 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1446 handle_subscribe_ack(monc, msg); 1447 break; 1448 1449 case CEPH_MSG_STATFS_REPLY: 1450 handle_statfs_reply(monc, msg); 1451 break; 1452 1453 case CEPH_MSG_MON_GET_VERSION_REPLY: 1454 handle_get_version_reply(monc, msg); 1455 break; 1456 1457 case CEPH_MSG_MON_COMMAND_ACK: 1458 handle_command_ack(monc, msg); 1459 break; 1460 1461 case CEPH_MSG_MON_MAP: 1462 ceph_monc_handle_map(monc, msg); 1463 break; 1464 1465 case CEPH_MSG_OSD_MAP: 1466 ceph_osdc_handle_map(&monc->client->osdc, msg); 1467 break; 1468 1469 default: 1470 /* can the chained handler handle it? */ 1471 if (monc->client->extra_mon_dispatch && 1472 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1473 break; 1474 1475 pr_err("received unknown message type %d %s\n", type, 1476 ceph_msg_type_name(type)); 1477 } 1478 ceph_msg_put(msg); 1479} 1480 1481/* 1482 * Allocate memory for incoming message 1483 */ 1484static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1485 struct ceph_msg_header *hdr, 1486 int *skip) 1487{ 1488 struct ceph_mon_client *monc = con->private; 1489 int type = le16_to_cpu(hdr->type); 1490 int front_len = le32_to_cpu(hdr->front_len); 1491 struct ceph_msg *m = NULL; 1492 1493 *skip = 0; 1494 1495 switch (type) { 1496 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1497 m = ceph_msg_get(monc->m_subscribe_ack); 1498 break; 1499 case CEPH_MSG_STATFS_REPLY: 1500 case CEPH_MSG_MON_COMMAND_ACK: 1501 return get_generic_reply(con, hdr, skip); 1502 case CEPH_MSG_AUTH_REPLY: 1503 m = ceph_msg_get(monc->m_auth_reply); 1504 break; 1505 case CEPH_MSG_MON_GET_VERSION_REPLY: 1506 if (le64_to_cpu(hdr->tid) != 0) 1507 return get_generic_reply(con, hdr, skip); 1508 1509 /* 1510 * Older OSDs don't set reply tid even if the original 1511 * request had a non-zero tid. Work around this weirdness 1512 * by allocating a new message. 1513 */ 1514 fallthrough; 1515 case CEPH_MSG_MON_MAP: 1516 case CEPH_MSG_MDS_MAP: 1517 case CEPH_MSG_OSD_MAP: 1518 case CEPH_MSG_FS_MAP_USER: 1519 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1520 if (!m) 1521 return NULL; /* ENOMEM--return skip == 0 */ 1522 break; 1523 } 1524 1525 if (!m) { 1526 pr_info("alloc_msg unknown type %d\n", type); 1527 *skip = 1; 1528 } else if (front_len > m->front_alloc_len) { 1529 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1530 front_len, m->front_alloc_len, 1531 (unsigned int)con->peer_name.type, 1532 le64_to_cpu(con->peer_name.num)); 1533 ceph_msg_put(m); 1534 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1535 } 1536 1537 return m; 1538} 1539 1540/* 1541 * If the monitor connection resets, pick a new monitor and resubmit 1542 * any pending requests. 1543 */ 1544static void mon_fault(struct ceph_connection *con) 1545{ 1546 struct ceph_mon_client *monc = con->private; 1547 1548 mutex_lock(&monc->mutex); 1549 dout("%s mon%d\n", __func__, monc->cur_mon); 1550 if (monc->cur_mon >= 0) { 1551 if (!monc->hunting) { 1552 dout("%s hunting for new mon\n", __func__); 1553 reopen_session(monc); 1554 __schedule_delayed(monc); 1555 } else { 1556 dout("%s already hunting\n", __func__); 1557 } 1558 } 1559 mutex_unlock(&monc->mutex); 1560} 1561 1562/* 1563 * We can ignore refcounting on the connection struct, as all references 1564 * will come from the messenger workqueue, which is drained prior to 1565 * mon_client destruction. 1566 */ 1567static struct ceph_connection *mon_get_con(struct ceph_connection *con) 1568{ 1569 return con; 1570} 1571 1572static void mon_put_con(struct ceph_connection *con) 1573{ 1574} 1575 1576static const struct ceph_connection_operations mon_con_ops = { 1577 .get = mon_get_con, 1578 .put = mon_put_con, 1579 .alloc_msg = mon_alloc_msg, 1580 .dispatch = mon_dispatch, 1581 .fault = mon_fault, 1582 .get_auth_request = mon_get_auth_request, 1583 .handle_auth_reply_more = mon_handle_auth_reply_more, 1584 .handle_auth_done = mon_handle_auth_done, 1585 .handle_auth_bad_method = mon_handle_auth_bad_method, 1586};