aio-posix.c (21293B)
1/* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16#include "qemu/osdep.h" 17#include "block/block.h" 18#include "qemu/main-loop.h" 19#include "qemu/rcu.h" 20#include "qemu/rcu_queue.h" 21#include "qemu/sockets.h" 22#include "qemu/cutils.h" 23#include "trace.h" 24#include "aio-posix.h" 25 26/* Stop userspace polling on a handler if it isn't active for some time */ 27#define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND) 28 29bool aio_poll_disabled(AioContext *ctx) 30{ 31 return qatomic_read(&ctx->poll_disable_cnt); 32} 33 34void aio_add_ready_handler(AioHandlerList *ready_list, 35 AioHandler *node, 36 int revents) 37{ 38 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 39 node->pfd.revents = revents; 40 QLIST_INSERT_HEAD(ready_list, node, node_ready); 41} 42 43static AioHandler *find_aio_handler(AioContext *ctx, int fd) 44{ 45 AioHandler *node; 46 47 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 48 if (node->pfd.fd == fd) { 49 if (!QLIST_IS_INSERTED(node, node_deleted)) { 50 return node; 51 } 52 } 53 } 54 55 return NULL; 56} 57 58static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) 59{ 60 /* If the GSource is in the process of being destroyed then 61 * g_source_remove_poll() causes an assertion failure. Skip 62 * removal in that case, because glib cleans up its state during 63 * destruction anyway. 64 */ 65 if (!g_source_is_destroyed(&ctx->source)) { 66 g_source_remove_poll(&ctx->source, &node->pfd); 67 } 68 69 node->pfd.revents = 0; 70 71 /* If the fd monitor has already marked it deleted, leave it alone */ 72 if (QLIST_IS_INSERTED(node, node_deleted)) { 73 return false; 74 } 75 76 /* If a read is in progress, just mark the node as deleted */ 77 if (qemu_lockcnt_count(&ctx->list_lock)) { 78 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 79 return false; 80 } 81 /* Otherwise, delete it for real. We can't just mark it as 82 * deleted because deleted nodes are only cleaned up while 83 * no one is walking the handlers list. 84 */ 85 QLIST_SAFE_REMOVE(node, node_poll); 86 QLIST_REMOVE(node, node); 87 return true; 88} 89 90void aio_set_fd_handler(AioContext *ctx, 91 int fd, 92 bool is_external, 93 IOHandler *io_read, 94 IOHandler *io_write, 95 AioPollFn *io_poll, 96 void *opaque) 97{ 98 AioHandler *node; 99 AioHandler *new_node = NULL; 100 bool is_new = false; 101 bool deleted = false; 102 int poll_disable_change; 103 104 qemu_lockcnt_lock(&ctx->list_lock); 105 106 node = find_aio_handler(ctx, fd); 107 108 /* Are we deleting the fd handler? */ 109 if (!io_read && !io_write && !io_poll) { 110 if (node == NULL) { 111 qemu_lockcnt_unlock(&ctx->list_lock); 112 return; 113 } 114 /* Clean events in order to unregister fd from the ctx epoll. */ 115 node->pfd.events = 0; 116 117 poll_disable_change = -!node->io_poll; 118 } else { 119 poll_disable_change = !io_poll - (node && !node->io_poll); 120 if (node == NULL) { 121 is_new = true; 122 } 123 /* Alloc and insert if it's not already there */ 124 new_node = g_new0(AioHandler, 1); 125 126 /* Update handler with latest information */ 127 new_node->io_read = io_read; 128 new_node->io_write = io_write; 129 new_node->io_poll = io_poll; 130 new_node->opaque = opaque; 131 new_node->is_external = is_external; 132 133 if (is_new) { 134 new_node->pfd.fd = fd; 135 } else { 136 new_node->pfd = node->pfd; 137 } 138 g_source_add_poll(&ctx->source, &new_node->pfd); 139 140 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); 141 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); 142 143 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); 144 } 145 146 /* No need to order poll_disable_cnt writes against other updates; 147 * the counter is only used to avoid wasting time and latency on 148 * iterated polling when the system call will be ultimately necessary. 149 * Changing handlers is a rare event, and a little wasted polling until 150 * the aio_notify below is not an issue. 151 */ 152 qatomic_set(&ctx->poll_disable_cnt, 153 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change); 154 155 ctx->fdmon_ops->update(ctx, node, new_node); 156 if (node) { 157 deleted = aio_remove_fd_handler(ctx, node); 158 } 159 qemu_lockcnt_unlock(&ctx->list_lock); 160 aio_notify(ctx); 161 162 if (deleted) { 163 g_free(node); 164 } 165} 166 167void aio_set_fd_poll(AioContext *ctx, int fd, 168 IOHandler *io_poll_begin, 169 IOHandler *io_poll_end) 170{ 171 AioHandler *node = find_aio_handler(ctx, fd); 172 173 if (!node) { 174 return; 175 } 176 177 node->io_poll_begin = io_poll_begin; 178 node->io_poll_end = io_poll_end; 179} 180 181void aio_set_event_notifier(AioContext *ctx, 182 EventNotifier *notifier, 183 bool is_external, 184 EventNotifierHandler *io_read, 185 AioPollFn *io_poll) 186{ 187 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, 188 (IOHandler *)io_read, NULL, io_poll, notifier); 189} 190 191void aio_set_event_notifier_poll(AioContext *ctx, 192 EventNotifier *notifier, 193 EventNotifierHandler *io_poll_begin, 194 EventNotifierHandler *io_poll_end) 195{ 196 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), 197 (IOHandler *)io_poll_begin, 198 (IOHandler *)io_poll_end); 199} 200 201static bool poll_set_started(AioContext *ctx, bool started) 202{ 203 AioHandler *node; 204 bool progress = false; 205 206 if (started == ctx->poll_started) { 207 return false; 208 } 209 210 ctx->poll_started = started; 211 212 qemu_lockcnt_inc(&ctx->list_lock); 213 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 214 IOHandler *fn; 215 216 if (QLIST_IS_INSERTED(node, node_deleted)) { 217 continue; 218 } 219 220 if (started) { 221 fn = node->io_poll_begin; 222 } else { 223 fn = node->io_poll_end; 224 } 225 226 if (fn) { 227 fn(node->opaque); 228 } 229 230 /* Poll one last time in case ->io_poll_end() raced with the event */ 231 if (!started) { 232 progress = node->io_poll(node->opaque) || progress; 233 } 234 } 235 qemu_lockcnt_dec(&ctx->list_lock); 236 237 return progress; 238} 239 240 241bool aio_prepare(AioContext *ctx) 242{ 243 /* Poll mode cannot be used with glib's event loop, disable it. */ 244 poll_set_started(ctx, false); 245 246 return false; 247} 248 249bool aio_pending(AioContext *ctx) 250{ 251 AioHandler *node; 252 bool result = false; 253 254 /* 255 * We have to walk very carefully in case aio_set_fd_handler is 256 * called while we're walking. 257 */ 258 qemu_lockcnt_inc(&ctx->list_lock); 259 260 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 261 int revents; 262 263 revents = node->pfd.revents & node->pfd.events; 264 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && 265 aio_node_check(ctx, node->is_external)) { 266 result = true; 267 break; 268 } 269 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && 270 aio_node_check(ctx, node->is_external)) { 271 result = true; 272 break; 273 } 274 } 275 qemu_lockcnt_dec(&ctx->list_lock); 276 277 return result; 278} 279 280static void aio_free_deleted_handlers(AioContext *ctx) 281{ 282 AioHandler *node; 283 284 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) { 285 return; 286 } 287 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 288 return; /* we are nested, let the parent do the freeing */ 289 } 290 291 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) { 292 QLIST_REMOVE(node, node); 293 QLIST_REMOVE(node, node_deleted); 294 QLIST_SAFE_REMOVE(node, node_poll); 295 g_free(node); 296 } 297 298 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 299} 300 301static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) 302{ 303 bool progress = false; 304 int revents; 305 306 revents = node->pfd.revents & node->pfd.events; 307 node->pfd.revents = 0; 308 309 /* 310 * Start polling AioHandlers when they become ready because activity is 311 * likely to continue. Note that starvation is theoretically possible when 312 * fdmon_supports_polling(), but only until the fd fires for the first 313 * time. 314 */ 315 if (!QLIST_IS_INSERTED(node, node_deleted) && 316 !QLIST_IS_INSERTED(node, node_poll) && 317 node->io_poll) { 318 trace_poll_add(ctx, node, node->pfd.fd, revents); 319 if (ctx->poll_started && node->io_poll_begin) { 320 node->io_poll_begin(node->opaque); 321 } 322 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 323 } 324 325 if (!QLIST_IS_INSERTED(node, node_deleted) && 326 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 327 aio_node_check(ctx, node->is_external) && 328 node->io_read) { 329 node->io_read(node->opaque); 330 331 /* aio_notify() does not count as progress */ 332 if (node->opaque != &ctx->notifier) { 333 progress = true; 334 } 335 } 336 if (!QLIST_IS_INSERTED(node, node_deleted) && 337 (revents & (G_IO_OUT | G_IO_ERR)) && 338 aio_node_check(ctx, node->is_external) && 339 node->io_write) { 340 node->io_write(node->opaque); 341 progress = true; 342 } 343 344 return progress; 345} 346 347/* 348 * If we have a list of ready handlers then this is more efficient than 349 * scanning all handlers with aio_dispatch_handlers(). 350 */ 351static bool aio_dispatch_ready_handlers(AioContext *ctx, 352 AioHandlerList *ready_list) 353{ 354 bool progress = false; 355 AioHandler *node; 356 357 while ((node = QLIST_FIRST(ready_list))) { 358 QLIST_REMOVE(node, node_ready); 359 progress = aio_dispatch_handler(ctx, node) || progress; 360 } 361 362 return progress; 363} 364 365/* Slower than aio_dispatch_ready_handlers() but only used via glib */ 366static bool aio_dispatch_handlers(AioContext *ctx) 367{ 368 AioHandler *node, *tmp; 369 bool progress = false; 370 371 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 372 progress = aio_dispatch_handler(ctx, node) || progress; 373 } 374 375 return progress; 376} 377 378void aio_dispatch(AioContext *ctx) 379{ 380 qemu_lockcnt_inc(&ctx->list_lock); 381 aio_bh_poll(ctx); 382 aio_dispatch_handlers(ctx); 383 aio_free_deleted_handlers(ctx); 384 qemu_lockcnt_dec(&ctx->list_lock); 385 386 timerlistgroup_run_timers(&ctx->tlg); 387} 388 389static bool run_poll_handlers_once(AioContext *ctx, 390 int64_t now, 391 int64_t *timeout) 392{ 393 bool progress = false; 394 AioHandler *node; 395 AioHandler *tmp; 396 397 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 398 if (aio_node_check(ctx, node->is_external) && 399 node->io_poll(node->opaque)) { 400 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 401 402 /* 403 * Polling was successful, exit try_poll_mode immediately 404 * to adjust the next polling time. 405 */ 406 *timeout = 0; 407 if (node->opaque != &ctx->notifier) { 408 progress = true; 409 } 410 } 411 412 /* Caller handles freeing deleted nodes. Don't do it here. */ 413 } 414 415 return progress; 416} 417 418static bool fdmon_supports_polling(AioContext *ctx) 419{ 420 return ctx->fdmon_ops->need_wait != aio_poll_disabled; 421} 422 423static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) 424{ 425 AioHandler *node; 426 AioHandler *tmp; 427 bool progress = false; 428 429 /* 430 * File descriptor monitoring implementations without userspace polling 431 * support suffer from starvation when a subset of handlers is polled 432 * because fds will not be processed in a timely fashion. Don't remove 433 * idle poll handlers. 434 */ 435 if (!fdmon_supports_polling(ctx)) { 436 return false; 437 } 438 439 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 440 if (node->poll_idle_timeout == 0LL) { 441 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 442 } else if (now >= node->poll_idle_timeout) { 443 trace_poll_remove(ctx, node, node->pfd.fd); 444 node->poll_idle_timeout = 0LL; 445 QLIST_SAFE_REMOVE(node, node_poll); 446 if (ctx->poll_started && node->io_poll_end) { 447 node->io_poll_end(node->opaque); 448 449 /* 450 * Final poll in case ->io_poll_end() races with an event. 451 * Nevermind about re-adding the handler in the rare case where 452 * this causes progress. 453 */ 454 progress = node->io_poll(node->opaque) || progress; 455 } 456 } 457 } 458 459 return progress; 460} 461 462/* run_poll_handlers: 463 * @ctx: the AioContext 464 * @max_ns: maximum time to poll for, in nanoseconds 465 * 466 * Polls for a given time. 467 * 468 * Note that the caller must have incremented ctx->list_lock. 469 * 470 * Returns: true if progress was made, false otherwise 471 */ 472static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) 473{ 474 bool progress; 475 int64_t start_time, elapsed_time; 476 477 assert(qemu_lockcnt_count(&ctx->list_lock) > 0); 478 479 trace_run_poll_handlers_begin(ctx, max_ns, *timeout); 480 481 /* 482 * Optimization: ->io_poll() handlers often contain RCU read critical 483 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 484 * -> rcu_read_lock() -> ... sequences with expensive memory 485 * synchronization primitives. Make the entire polling loop an RCU 486 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 487 * are cheap. 488 */ 489 RCU_READ_LOCK_GUARD(); 490 491 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 492 do { 493 progress = run_poll_handlers_once(ctx, start_time, timeout); 494 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; 495 max_ns = qemu_soonest_timeout(*timeout, max_ns); 496 assert(!(max_ns && progress)); 497 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); 498 499 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) { 500 *timeout = 0; 501 progress = true; 502 } 503 504 /* If time has passed with no successful polling, adjust *timeout to 505 * keep the same ending time. 506 */ 507 if (*timeout != -1) { 508 *timeout -= MIN(*timeout, elapsed_time); 509 } 510 511 trace_run_poll_handlers_end(ctx, progress, *timeout); 512 return progress; 513} 514 515/* try_poll_mode: 516 * @ctx: the AioContext 517 * @timeout: timeout for blocking wait, computed by the caller and updated if 518 * polling succeeds. 519 * 520 * Note that the caller must have incremented ctx->list_lock. 521 * 522 * Returns: true if progress was made, false otherwise 523 */ 524static bool try_poll_mode(AioContext *ctx, int64_t *timeout) 525{ 526 int64_t max_ns; 527 528 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) { 529 return false; 530 } 531 532 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); 533 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { 534 poll_set_started(ctx, true); 535 536 if (run_poll_handlers(ctx, max_ns, timeout)) { 537 return true; 538 } 539 } 540 541 if (poll_set_started(ctx, false)) { 542 *timeout = 0; 543 return true; 544 } 545 546 return false; 547} 548 549bool aio_poll(AioContext *ctx, bool blocking) 550{ 551 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 552 int ret = 0; 553 bool progress; 554 bool use_notify_me; 555 int64_t timeout; 556 int64_t start = 0; 557 558 /* 559 * There cannot be two concurrent aio_poll calls for the same AioContext (or 560 * an aio_poll concurrent with a GSource prepare/check/dispatch callback). 561 * We rely on this below to avoid slow locked accesses to ctx->notify_me. 562 * 563 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx 564 * is special in that it runs in the main thread, but that thread's context 565 * is qemu_aio_context. 566 */ 567 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? 568 qemu_get_aio_context() : ctx)); 569 570 qemu_lockcnt_inc(&ctx->list_lock); 571 572 if (ctx->poll_max_ns) { 573 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 574 } 575 576 timeout = blocking ? aio_compute_timeout(ctx) : 0; 577 progress = try_poll_mode(ctx, &timeout); 578 assert(!(timeout && progress)); 579 580 /* 581 * aio_notify can avoid the expensive event_notifier_set if 582 * everything (file descriptors, bottom halves, timers) will 583 * be re-evaluated before the next blocking poll(). This is 584 * already true when aio_poll is called with blocking == false; 585 * if blocking == true, it is only true after poll() returns, 586 * so disable the optimization now. 587 */ 588 use_notify_me = timeout != 0; 589 if (use_notify_me) { 590 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2); 591 /* 592 * Write ctx->notify_me before reading ctx->notified. Pairs with 593 * smp_mb in aio_notify(). 594 */ 595 smp_mb(); 596 597 /* Don't block if aio_notify() was called */ 598 if (qatomic_read(&ctx->notified)) { 599 timeout = 0; 600 } 601 } 602 603 /* If polling is allowed, non-blocking aio_poll does not need the 604 * system call---a single round of run_poll_handlers_once suffices. 605 */ 606 if (timeout || ctx->fdmon_ops->need_wait(ctx)) { 607 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout); 608 } 609 610 if (use_notify_me) { 611 /* Finish the poll before clearing the flag. */ 612 qatomic_store_release(&ctx->notify_me, 613 qatomic_read(&ctx->notify_me) - 2); 614 } 615 616 aio_notify_accept(ctx); 617 618 /* Adjust polling time */ 619 if (ctx->poll_max_ns) { 620 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; 621 622 if (block_ns <= ctx->poll_ns) { 623 /* This is the sweet spot, no adjustment needed */ 624 } else if (block_ns > ctx->poll_max_ns) { 625 /* We'd have to poll for too long, poll less */ 626 int64_t old = ctx->poll_ns; 627 628 if (ctx->poll_shrink) { 629 ctx->poll_ns /= ctx->poll_shrink; 630 } else { 631 ctx->poll_ns = 0; 632 } 633 634 trace_poll_shrink(ctx, old, ctx->poll_ns); 635 } else if (ctx->poll_ns < ctx->poll_max_ns && 636 block_ns < ctx->poll_max_ns) { 637 /* There is room to grow, poll longer */ 638 int64_t old = ctx->poll_ns; 639 int64_t grow = ctx->poll_grow; 640 641 if (grow == 0) { 642 grow = 2; 643 } 644 645 if (ctx->poll_ns) { 646 ctx->poll_ns *= grow; 647 } else { 648 ctx->poll_ns = 4000; /* start polling at 4 microseconds */ 649 } 650 651 if (ctx->poll_ns > ctx->poll_max_ns) { 652 ctx->poll_ns = ctx->poll_max_ns; 653 } 654 655 trace_poll_grow(ctx, old, ctx->poll_ns); 656 } 657 } 658 659 progress |= aio_bh_poll(ctx); 660 661 if (ret > 0) { 662 progress |= aio_dispatch_ready_handlers(ctx, &ready_list); 663 } 664 665 aio_free_deleted_handlers(ctx); 666 667 qemu_lockcnt_dec(&ctx->list_lock); 668 669 progress |= timerlistgroup_run_timers(&ctx->tlg); 670 671 return progress; 672} 673 674void aio_context_setup(AioContext *ctx) 675{ 676 ctx->fdmon_ops = &fdmon_poll_ops; 677 ctx->epollfd = -1; 678 679 /* Use the fastest fd monitoring implementation if available */ 680 if (fdmon_io_uring_setup(ctx)) { 681 return; 682 } 683 684 fdmon_epoll_setup(ctx); 685} 686 687void aio_context_destroy(AioContext *ctx) 688{ 689 fdmon_io_uring_destroy(ctx); 690 fdmon_epoll_disable(ctx); 691 aio_free_deleted_handlers(ctx); 692} 693 694void aio_context_use_g_source(AioContext *ctx) 695{ 696 /* 697 * Disable io_uring when the glib main loop is used because it doesn't 698 * support mixed glib/aio_poll() usage. It relies on aio_poll() being 699 * called regularly so that changes to the monitored file descriptors are 700 * submitted, otherwise a list of pending fd handlers builds up. 701 */ 702 fdmon_io_uring_destroy(ctx); 703 aio_free_deleted_handlers(ctx); 704} 705 706void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 707 int64_t grow, int64_t shrink, Error **errp) 708{ 709 /* No thread synchronization here, it doesn't matter if an incorrect value 710 * is used once. 711 */ 712 ctx->poll_max_ns = max_ns; 713 ctx->poll_ns = 0; 714 ctx->poll_grow = grow; 715 ctx->poll_shrink = shrink; 716 717 aio_notify(ctx); 718} 719 720void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch, 721 Error **errp) 722{ 723 /* 724 * No thread synchronization here, it doesn't matter if an incorrect value 725 * is used once. 726 */ 727 ctx->aio_max_batch = max_batch; 728 729 aio_notify(ctx); 730}