iothread.c (3298B)
1/* 2 * Event loop thread implementation for unit tests 3 * 4 * Copyright Red Hat Inc., 2013, 2016 5 * 6 * Authors: 7 * Stefan Hajnoczi <stefanha@redhat.com> 8 * Paolo Bonzini <pbonzini@redhat.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or later. 11 * See the COPYING file in the top-level directory. 12 * 13 */ 14 15#include "qemu/osdep.h" 16#include "qapi/error.h" 17#include "block/aio.h" 18#include "qemu/main-loop.h" 19#include "qemu/rcu.h" 20#include "iothread.h" 21 22struct IOThread { 23 AioContext *ctx; 24 GMainContext *worker_context; 25 GMainLoop *main_loop; 26 27 QemuThread thread; 28 QemuMutex init_done_lock; 29 QemuCond init_done_cond; /* is thread initialization done? */ 30 bool stopping; 31}; 32 33static void iothread_init_gcontext(IOThread *iothread) 34{ 35 GSource *source; 36 37 iothread->worker_context = g_main_context_new(); 38 source = aio_get_g_source(iothread_get_aio_context(iothread)); 39 g_source_attach(source, iothread->worker_context); 40 g_source_unref(source); 41 iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE); 42} 43 44static void *iothread_run(void *opaque) 45{ 46 IOThread *iothread = opaque; 47 48 rcu_register_thread(); 49 50 qemu_mutex_lock(&iothread->init_done_lock); 51 iothread->ctx = aio_context_new(&error_abort); 52 qemu_set_current_aio_context(iothread->ctx); 53 54 /* 55 * We must connect the ctx to a GMainContext, because in older versions 56 * of glib the g_source_ref()/unref() functions are not threadsafe 57 * on sources without a context. 58 */ 59 iothread_init_gcontext(iothread); 60 61 /* 62 * g_main_context_push_thread_default() must be called before anything 63 * in this new thread uses glib. 64 */ 65 g_main_context_push_thread_default(iothread->worker_context); 66 67 qemu_cond_signal(&iothread->init_done_cond); 68 qemu_mutex_unlock(&iothread->init_done_lock); 69 70 while (!qatomic_read(&iothread->stopping)) { 71 aio_poll(iothread->ctx, true); 72 } 73 74 g_main_context_pop_thread_default(iothread->worker_context); 75 rcu_unregister_thread(); 76 return NULL; 77} 78 79static void iothread_stop_bh(void *opaque) 80{ 81 IOThread *iothread = opaque; 82 83 iothread->stopping = true; 84} 85 86void iothread_join(IOThread *iothread) 87{ 88 aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread); 89 qemu_thread_join(&iothread->thread); 90 g_main_context_unref(iothread->worker_context); 91 g_main_loop_unref(iothread->main_loop); 92 qemu_cond_destroy(&iothread->init_done_cond); 93 qemu_mutex_destroy(&iothread->init_done_lock); 94 aio_context_unref(iothread->ctx); 95 g_free(iothread); 96} 97 98IOThread *iothread_new(void) 99{ 100 IOThread *iothread = g_new0(IOThread, 1); 101 102 qemu_mutex_init(&iothread->init_done_lock); 103 qemu_cond_init(&iothread->init_done_cond); 104 qemu_thread_create(&iothread->thread, NULL, iothread_run, 105 iothread, QEMU_THREAD_JOINABLE); 106 107 /* Wait for initialization to complete */ 108 qemu_mutex_lock(&iothread->init_done_lock); 109 while (iothread->ctx == NULL) { 110 qemu_cond_wait(&iothread->init_done_cond, 111 &iothread->init_done_lock); 112 } 113 qemu_mutex_unlock(&iothread->init_done_lock); 114 return iothread; 115} 116 117AioContext *iothread_get_aio_context(IOThread *iothread) 118{ 119 return iothread->ctx; 120}