channel.c (17411B)
1/* 2 * QEMU I/O channels 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21#include "qemu/osdep.h" 22#include "io/channel.h" 23#include "qapi/error.h" 24#include "qemu/main-loop.h" 25#include "qemu/module.h" 26#include "qemu/iov.h" 27 28bool qio_channel_has_feature(QIOChannel *ioc, 29 QIOChannelFeature feature) 30{ 31 return ioc->features & (1 << feature); 32} 33 34 35void qio_channel_set_feature(QIOChannel *ioc, 36 QIOChannelFeature feature) 37{ 38 ioc->features |= (1 << feature); 39} 40 41 42void qio_channel_set_name(QIOChannel *ioc, 43 const char *name) 44{ 45 g_free(ioc->name); 46 ioc->name = g_strdup(name); 47} 48 49 50ssize_t qio_channel_readv_full(QIOChannel *ioc, 51 const struct iovec *iov, 52 size_t niov, 53 int **fds, 54 size_t *nfds, 55 Error **errp) 56{ 57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 58 59 if ((fds || nfds) && 60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 61 error_setg_errno(errp, EINVAL, 62 "Channel does not support file descriptor passing"); 63 return -1; 64 } 65 66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp); 67} 68 69 70ssize_t qio_channel_writev_full(QIOChannel *ioc, 71 const struct iovec *iov, 72 size_t niov, 73 int *fds, 74 size_t nfds, 75 Error **errp) 76{ 77 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 78 79 if ((fds || nfds) && 80 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { 81 error_setg_errno(errp, EINVAL, 82 "Channel does not support file descriptor passing"); 83 return -1; 84 } 85 86 return klass->io_writev(ioc, iov, niov, fds, nfds, errp); 87} 88 89 90int qio_channel_readv_all_eof(QIOChannel *ioc, 91 const struct iovec *iov, 92 size_t niov, 93 Error **errp) 94{ 95 return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp); 96} 97 98int qio_channel_readv_all(QIOChannel *ioc, 99 const struct iovec *iov, 100 size_t niov, 101 Error **errp) 102{ 103 return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp); 104} 105 106int qio_channel_readv_full_all_eof(QIOChannel *ioc, 107 const struct iovec *iov, 108 size_t niov, 109 int **fds, size_t *nfds, 110 Error **errp) 111{ 112 int ret = -1; 113 struct iovec *local_iov = g_new(struct iovec, niov); 114 struct iovec *local_iov_head = local_iov; 115 unsigned int nlocal_iov = niov; 116 int **local_fds = fds; 117 size_t *local_nfds = nfds; 118 bool partial = false; 119 120 if (nfds) { 121 *nfds = 0; 122 } 123 124 if (fds) { 125 *fds = NULL; 126 } 127 128 nlocal_iov = iov_copy(local_iov, nlocal_iov, 129 iov, niov, 130 0, iov_size(iov, niov)); 131 132 while ((nlocal_iov > 0) || local_fds) { 133 ssize_t len; 134 len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds, 135 local_nfds, errp); 136 if (len == QIO_CHANNEL_ERR_BLOCK) { 137 if (qemu_in_coroutine()) { 138 qio_channel_yield(ioc, G_IO_IN); 139 } else { 140 qio_channel_wait(ioc, G_IO_IN); 141 } 142 continue; 143 } 144 145 if (len == 0) { 146 if (local_nfds && *local_nfds) { 147 /* 148 * Got some FDs, but no data yet. This isn't an EOF 149 * scenario (yet), so carry on to try to read data 150 * on next loop iteration 151 */ 152 goto next_iter; 153 } else if (!partial) { 154 /* No fds and no data - EOF before any data read */ 155 ret = 0; 156 goto cleanup; 157 } else { 158 len = -1; 159 error_setg(errp, 160 "Unexpected end-of-file before all data were read"); 161 /* Fallthrough into len < 0 handling */ 162 } 163 } 164 165 if (len < 0) { 166 /* Close any FDs we previously received */ 167 if (nfds && fds) { 168 size_t i; 169 for (i = 0; i < (*nfds); i++) { 170 close((*fds)[i]); 171 } 172 g_free(*fds); 173 *fds = NULL; 174 *nfds = 0; 175 } 176 goto cleanup; 177 } 178 179 if (nlocal_iov) { 180 iov_discard_front(&local_iov, &nlocal_iov, len); 181 } 182 183next_iter: 184 partial = true; 185 local_fds = NULL; 186 local_nfds = NULL; 187 } 188 189 ret = 1; 190 191 cleanup: 192 g_free(local_iov_head); 193 return ret; 194} 195 196int qio_channel_readv_full_all(QIOChannel *ioc, 197 const struct iovec *iov, 198 size_t niov, 199 int **fds, size_t *nfds, 200 Error **errp) 201{ 202 int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp); 203 204 if (ret == 0) { 205 error_setg(errp, "Unexpected end-of-file before all data were read"); 206 return -1; 207 } 208 if (ret == 1) { 209 return 0; 210 } 211 212 return ret; 213} 214 215int qio_channel_writev_all(QIOChannel *ioc, 216 const struct iovec *iov, 217 size_t niov, 218 Error **errp) 219{ 220 return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp); 221} 222 223int qio_channel_writev_full_all(QIOChannel *ioc, 224 const struct iovec *iov, 225 size_t niov, 226 int *fds, size_t nfds, 227 Error **errp) 228{ 229 int ret = -1; 230 struct iovec *local_iov = g_new(struct iovec, niov); 231 struct iovec *local_iov_head = local_iov; 232 unsigned int nlocal_iov = niov; 233 234 nlocal_iov = iov_copy(local_iov, nlocal_iov, 235 iov, niov, 236 0, iov_size(iov, niov)); 237 238 while (nlocal_iov > 0) { 239 ssize_t len; 240 len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds, 241 errp); 242 if (len == QIO_CHANNEL_ERR_BLOCK) { 243 if (qemu_in_coroutine()) { 244 qio_channel_yield(ioc, G_IO_OUT); 245 } else { 246 qio_channel_wait(ioc, G_IO_OUT); 247 } 248 continue; 249 } 250 if (len < 0) { 251 goto cleanup; 252 } 253 254 iov_discard_front(&local_iov, &nlocal_iov, len); 255 256 fds = NULL; 257 nfds = 0; 258 } 259 260 ret = 0; 261 cleanup: 262 g_free(local_iov_head); 263 return ret; 264} 265 266ssize_t qio_channel_readv(QIOChannel *ioc, 267 const struct iovec *iov, 268 size_t niov, 269 Error **errp) 270{ 271 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp); 272} 273 274 275ssize_t qio_channel_writev(QIOChannel *ioc, 276 const struct iovec *iov, 277 size_t niov, 278 Error **errp) 279{ 280 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp); 281} 282 283 284ssize_t qio_channel_read(QIOChannel *ioc, 285 char *buf, 286 size_t buflen, 287 Error **errp) 288{ 289 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 290 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp); 291} 292 293 294ssize_t qio_channel_write(QIOChannel *ioc, 295 const char *buf, 296 size_t buflen, 297 Error **errp) 298{ 299 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 300 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp); 301} 302 303 304int qio_channel_read_all_eof(QIOChannel *ioc, 305 char *buf, 306 size_t buflen, 307 Error **errp) 308{ 309 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 310 return qio_channel_readv_all_eof(ioc, &iov, 1, errp); 311} 312 313 314int qio_channel_read_all(QIOChannel *ioc, 315 char *buf, 316 size_t buflen, 317 Error **errp) 318{ 319 struct iovec iov = { .iov_base = buf, .iov_len = buflen }; 320 return qio_channel_readv_all(ioc, &iov, 1, errp); 321} 322 323 324int qio_channel_write_all(QIOChannel *ioc, 325 const char *buf, 326 size_t buflen, 327 Error **errp) 328{ 329 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen }; 330 return qio_channel_writev_all(ioc, &iov, 1, errp); 331} 332 333 334int qio_channel_set_blocking(QIOChannel *ioc, 335 bool enabled, 336 Error **errp) 337{ 338 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 339 return klass->io_set_blocking(ioc, enabled, errp); 340} 341 342 343int qio_channel_close(QIOChannel *ioc, 344 Error **errp) 345{ 346 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 347 return klass->io_close(ioc, errp); 348} 349 350 351GSource *qio_channel_create_watch(QIOChannel *ioc, 352 GIOCondition condition) 353{ 354 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 355 GSource *ret = klass->io_create_watch(ioc, condition); 356 357 if (ioc->name) { 358 g_source_set_name(ret, ioc->name); 359 } 360 361 return ret; 362} 363 364 365void qio_channel_set_aio_fd_handler(QIOChannel *ioc, 366 AioContext *ctx, 367 IOHandler *io_read, 368 IOHandler *io_write, 369 void *opaque) 370{ 371 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 372 373 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); 374} 375 376guint qio_channel_add_watch_full(QIOChannel *ioc, 377 GIOCondition condition, 378 QIOChannelFunc func, 379 gpointer user_data, 380 GDestroyNotify notify, 381 GMainContext *context) 382{ 383 GSource *source; 384 guint id; 385 386 source = qio_channel_create_watch(ioc, condition); 387 388 g_source_set_callback(source, (GSourceFunc)func, user_data, notify); 389 390 id = g_source_attach(source, context); 391 g_source_unref(source); 392 393 return id; 394} 395 396guint qio_channel_add_watch(QIOChannel *ioc, 397 GIOCondition condition, 398 QIOChannelFunc func, 399 gpointer user_data, 400 GDestroyNotify notify) 401{ 402 return qio_channel_add_watch_full(ioc, condition, func, 403 user_data, notify, NULL); 404} 405 406GSource *qio_channel_add_watch_source(QIOChannel *ioc, 407 GIOCondition condition, 408 QIOChannelFunc func, 409 gpointer user_data, 410 GDestroyNotify notify, 411 GMainContext *context) 412{ 413 GSource *source; 414 guint id; 415 416 id = qio_channel_add_watch_full(ioc, condition, func, 417 user_data, notify, context); 418 source = g_main_context_find_source_by_id(context, id); 419 g_source_ref(source); 420 return source; 421} 422 423 424int qio_channel_shutdown(QIOChannel *ioc, 425 QIOChannelShutdown how, 426 Error **errp) 427{ 428 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 429 430 if (!klass->io_shutdown) { 431 error_setg(errp, "Data path shutdown not supported"); 432 return -1; 433 } 434 435 return klass->io_shutdown(ioc, how, errp); 436} 437 438 439void qio_channel_set_delay(QIOChannel *ioc, 440 bool enabled) 441{ 442 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 443 444 if (klass->io_set_delay) { 445 klass->io_set_delay(ioc, enabled); 446 } 447} 448 449 450void qio_channel_set_cork(QIOChannel *ioc, 451 bool enabled) 452{ 453 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 454 455 if (klass->io_set_cork) { 456 klass->io_set_cork(ioc, enabled); 457 } 458} 459 460 461off_t qio_channel_io_seek(QIOChannel *ioc, 462 off_t offset, 463 int whence, 464 Error **errp) 465{ 466 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); 467 468 if (!klass->io_seek) { 469 error_setg(errp, "Channel does not support random access"); 470 return -1; 471 } 472 473 return klass->io_seek(ioc, offset, whence, errp); 474} 475 476 477static void qio_channel_restart_read(void *opaque) 478{ 479 QIOChannel *ioc = opaque; 480 Coroutine *co = ioc->read_coroutine; 481 482 /* Assert that aio_co_wake() reenters the coroutine directly */ 483 assert(qemu_get_current_aio_context() == 484 qemu_coroutine_get_aio_context(co)); 485 aio_co_wake(co); 486} 487 488static void qio_channel_restart_write(void *opaque) 489{ 490 QIOChannel *ioc = opaque; 491 Coroutine *co = ioc->write_coroutine; 492 493 /* Assert that aio_co_wake() reenters the coroutine directly */ 494 assert(qemu_get_current_aio_context() == 495 qemu_coroutine_get_aio_context(co)); 496 aio_co_wake(co); 497} 498 499static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) 500{ 501 IOHandler *rd_handler = NULL, *wr_handler = NULL; 502 AioContext *ctx; 503 504 if (ioc->read_coroutine) { 505 rd_handler = qio_channel_restart_read; 506 } 507 if (ioc->write_coroutine) { 508 wr_handler = qio_channel_restart_write; 509 } 510 511 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); 512 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); 513} 514 515void qio_channel_attach_aio_context(QIOChannel *ioc, 516 AioContext *ctx) 517{ 518 assert(!ioc->read_coroutine); 519 assert(!ioc->write_coroutine); 520 ioc->ctx = ctx; 521} 522 523void qio_channel_detach_aio_context(QIOChannel *ioc) 524{ 525 ioc->read_coroutine = NULL; 526 ioc->write_coroutine = NULL; 527 qio_channel_set_aio_fd_handlers(ioc); 528 ioc->ctx = NULL; 529} 530 531void coroutine_fn qio_channel_yield(QIOChannel *ioc, 532 GIOCondition condition) 533{ 534 assert(qemu_in_coroutine()); 535 if (condition == G_IO_IN) { 536 assert(!ioc->read_coroutine); 537 ioc->read_coroutine = qemu_coroutine_self(); 538 } else if (condition == G_IO_OUT) { 539 assert(!ioc->write_coroutine); 540 ioc->write_coroutine = qemu_coroutine_self(); 541 } else { 542 abort(); 543 } 544 qio_channel_set_aio_fd_handlers(ioc); 545 qemu_coroutine_yield(); 546 547 /* Allow interrupting the operation by reentering the coroutine other than 548 * through the aio_fd_handlers. */ 549 if (condition == G_IO_IN && ioc->read_coroutine) { 550 ioc->read_coroutine = NULL; 551 qio_channel_set_aio_fd_handlers(ioc); 552 } else if (condition == G_IO_OUT && ioc->write_coroutine) { 553 ioc->write_coroutine = NULL; 554 qio_channel_set_aio_fd_handlers(ioc); 555 } 556} 557 558 559static gboolean qio_channel_wait_complete(QIOChannel *ioc, 560 GIOCondition condition, 561 gpointer opaque) 562{ 563 GMainLoop *loop = opaque; 564 565 g_main_loop_quit(loop); 566 return FALSE; 567} 568 569 570void qio_channel_wait(QIOChannel *ioc, 571 GIOCondition condition) 572{ 573 GMainContext *ctxt = g_main_context_new(); 574 GMainLoop *loop = g_main_loop_new(ctxt, TRUE); 575 GSource *source; 576 577 source = qio_channel_create_watch(ioc, condition); 578 579 g_source_set_callback(source, 580 (GSourceFunc)qio_channel_wait_complete, 581 loop, 582 NULL); 583 584 g_source_attach(source, ctxt); 585 586 g_main_loop_run(loop); 587 588 g_source_unref(source); 589 g_main_loop_unref(loop); 590 g_main_context_unref(ctxt); 591} 592 593 594static void qio_channel_finalize(Object *obj) 595{ 596 QIOChannel *ioc = QIO_CHANNEL(obj); 597 598 g_free(ioc->name); 599 600#ifdef _WIN32 601 if (ioc->event) { 602 CloseHandle(ioc->event); 603 } 604#endif 605} 606 607static const TypeInfo qio_channel_info = { 608 .parent = TYPE_OBJECT, 609 .name = TYPE_QIO_CHANNEL, 610 .instance_size = sizeof(QIOChannel), 611 .instance_finalize = qio_channel_finalize, 612 .abstract = true, 613 .class_size = sizeof(QIOChannelClass), 614}; 615 616 617static void qio_channel_register_types(void) 618{ 619 type_register_static(&qio_channel_info); 620} 621 622 623type_init(qio_channel_register_types);