test-aio-multithread.c (10867B)
1/* 2 * AioContext multithreading tests 3 * 4 * Copyright Red Hat, Inc. 2016 5 * 6 * Authors: 7 * Paolo Bonzini <pbonzini@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU LGPL, version 2 or later. 10 * See the COPYING.LIB file in the top-level directory. 11 */ 12 13#include "qemu/osdep.h" 14#include "block/aio.h" 15#include "qemu/coroutine.h" 16#include "qemu/thread.h" 17#include "qemu/error-report.h" 18#include "iothread.h" 19 20/* AioContext management */ 21 22#define NUM_CONTEXTS 5 23 24static IOThread *threads[NUM_CONTEXTS]; 25static AioContext *ctx[NUM_CONTEXTS]; 26static __thread int id = -1; 27 28static QemuEvent done_event; 29 30/* Run a function synchronously on a remote iothread. */ 31 32typedef struct CtxRunData { 33 QEMUBHFunc *cb; 34 void *arg; 35} CtxRunData; 36 37static void ctx_run_bh_cb(void *opaque) 38{ 39 CtxRunData *data = opaque; 40 41 data->cb(data->arg); 42 qemu_event_set(&done_event); 43} 44 45static void ctx_run(int i, QEMUBHFunc *cb, void *opaque) 46{ 47 CtxRunData data = { 48 .cb = cb, 49 .arg = opaque 50 }; 51 52 qemu_event_reset(&done_event); 53 aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data); 54 qemu_event_wait(&done_event); 55} 56 57/* Starting the iothreads. */ 58 59static void set_id_cb(void *opaque) 60{ 61 int *i = opaque; 62 63 id = *i; 64} 65 66static void create_aio_contexts(void) 67{ 68 int i; 69 70 for (i = 0; i < NUM_CONTEXTS; i++) { 71 threads[i] = iothread_new(); 72 ctx[i] = iothread_get_aio_context(threads[i]); 73 } 74 75 qemu_event_init(&done_event, false); 76 for (i = 0; i < NUM_CONTEXTS; i++) { 77 ctx_run(i, set_id_cb, &i); 78 } 79} 80 81/* Stopping the iothreads. */ 82 83static void join_aio_contexts(void) 84{ 85 int i; 86 87 for (i = 0; i < NUM_CONTEXTS; i++) { 88 aio_context_ref(ctx[i]); 89 } 90 for (i = 0; i < NUM_CONTEXTS; i++) { 91 iothread_join(threads[i]); 92 } 93 for (i = 0; i < NUM_CONTEXTS; i++) { 94 aio_context_unref(ctx[i]); 95 } 96 qemu_event_destroy(&done_event); 97} 98 99/* Basic test for the stuff above. */ 100 101static void test_lifecycle(void) 102{ 103 create_aio_contexts(); 104 join_aio_contexts(); 105} 106 107/* aio_co_schedule test. */ 108 109static Coroutine *to_schedule[NUM_CONTEXTS]; 110 111static bool now_stopping; 112 113static int count_retry; 114static int count_here; 115static int count_other; 116 117static bool schedule_next(int n) 118{ 119 Coroutine *co; 120 121 co = qatomic_xchg(&to_schedule[n], NULL); 122 if (!co) { 123 qatomic_inc(&count_retry); 124 return false; 125 } 126 127 if (n == id) { 128 qatomic_inc(&count_here); 129 } else { 130 qatomic_inc(&count_other); 131 } 132 133 aio_co_schedule(ctx[n], co); 134 return true; 135} 136 137static void finish_cb(void *opaque) 138{ 139 schedule_next(id); 140} 141 142static coroutine_fn void test_multi_co_schedule_entry(void *opaque) 143{ 144 g_assert(to_schedule[id] == NULL); 145 146 while (!qatomic_mb_read(&now_stopping)) { 147 int n; 148 149 n = g_test_rand_int_range(0, NUM_CONTEXTS); 150 schedule_next(n); 151 152 qatomic_mb_set(&to_schedule[id], qemu_coroutine_self()); 153 qemu_coroutine_yield(); 154 g_assert(to_schedule[id] == NULL); 155 } 156} 157 158 159static void test_multi_co_schedule(int seconds) 160{ 161 int i; 162 163 count_here = count_other = count_retry = 0; 164 now_stopping = false; 165 166 create_aio_contexts(); 167 for (i = 0; i < NUM_CONTEXTS; i++) { 168 Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL); 169 aio_co_schedule(ctx[i], co1); 170 } 171 172 g_usleep(seconds * 1000000); 173 174 qatomic_mb_set(&now_stopping, true); 175 for (i = 0; i < NUM_CONTEXTS; i++) { 176 ctx_run(i, finish_cb, NULL); 177 to_schedule[i] = NULL; 178 } 179 180 join_aio_contexts(); 181 g_test_message("scheduled %d, queued %d, retry %d, total %d", 182 count_other, count_here, count_retry, 183 count_here + count_other + count_retry); 184} 185 186static void test_multi_co_schedule_1(void) 187{ 188 test_multi_co_schedule(1); 189} 190 191static void test_multi_co_schedule_10(void) 192{ 193 test_multi_co_schedule(10); 194} 195 196/* CoMutex thread-safety. */ 197 198static uint32_t atomic_counter; 199static uint32_t running; 200static uint32_t counter; 201static CoMutex comutex; 202 203static void coroutine_fn test_multi_co_mutex_entry(void *opaque) 204{ 205 while (!qatomic_mb_read(&now_stopping)) { 206 qemu_co_mutex_lock(&comutex); 207 counter++; 208 qemu_co_mutex_unlock(&comutex); 209 210 /* Increase atomic_counter *after* releasing the mutex. Otherwise 211 * there is a chance (it happens about 1 in 3 runs) that the iothread 212 * exits before the coroutine is woken up, causing a spurious 213 * assertion failure. 214 */ 215 qatomic_inc(&atomic_counter); 216 } 217 qatomic_dec(&running); 218} 219 220static void test_multi_co_mutex(int threads, int seconds) 221{ 222 int i; 223 224 qemu_co_mutex_init(&comutex); 225 counter = 0; 226 atomic_counter = 0; 227 now_stopping = false; 228 229 create_aio_contexts(); 230 assert(threads <= NUM_CONTEXTS); 231 running = threads; 232 for (i = 0; i < threads; i++) { 233 Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL); 234 aio_co_schedule(ctx[i], co1); 235 } 236 237 g_usleep(seconds * 1000000); 238 239 qatomic_mb_set(&now_stopping, true); 240 while (running > 0) { 241 g_usleep(100000); 242 } 243 244 join_aio_contexts(); 245 g_test_message("%d iterations/second", counter / seconds); 246 g_assert_cmpint(counter, ==, atomic_counter); 247} 248 249/* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however 250 * is too contended (and the threads spend too much time in aio_poll) 251 * to actually stress the handoff protocol. 252 */ 253static void test_multi_co_mutex_1(void) 254{ 255 test_multi_co_mutex(NUM_CONTEXTS, 1); 256} 257 258static void test_multi_co_mutex_10(void) 259{ 260 test_multi_co_mutex(NUM_CONTEXTS, 10); 261} 262 263/* Testing with fewer threads stresses the handoff protocol too. Still, the 264 * case where the locker _can_ pick up a handoff is very rare, happening 265 * about 10 times in 1 million, so increase the runtime a bit compared to 266 * other "quick" testcases that only run for 1 second. 267 */ 268static void test_multi_co_mutex_2_3(void) 269{ 270 test_multi_co_mutex(2, 3); 271} 272 273static void test_multi_co_mutex_2_30(void) 274{ 275 test_multi_co_mutex(2, 30); 276} 277 278/* Same test with fair mutexes, for performance comparison. */ 279 280#ifdef CONFIG_LINUX 281#include "qemu/futex.h" 282 283/* The nodes for the mutex reside in this structure (on which we try to avoid 284 * false sharing). The head of the mutex is in the "mutex_head" variable. 285 */ 286static struct { 287 int next, locked; 288 int padding[14]; 289} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64))); 290 291static int mutex_head = -1; 292 293static void mcs_mutex_lock(void) 294{ 295 int prev; 296 297 nodes[id].next = -1; 298 nodes[id].locked = 1; 299 prev = qatomic_xchg(&mutex_head, id); 300 if (prev != -1) { 301 qatomic_set(&nodes[prev].next, id); 302 qemu_futex_wait(&nodes[id].locked, 1); 303 } 304} 305 306static void mcs_mutex_unlock(void) 307{ 308 int next; 309 if (qatomic_read(&nodes[id].next) == -1) { 310 if (qatomic_read(&mutex_head) == id && 311 qatomic_cmpxchg(&mutex_head, id, -1) == id) { 312 /* Last item in the list, exit. */ 313 return; 314 } 315 while (qatomic_read(&nodes[id].next) == -1) { 316 /* mcs_mutex_lock did the xchg, but has not updated 317 * nodes[prev].next yet. 318 */ 319 } 320 } 321 322 /* Wake up the next in line. */ 323 next = qatomic_read(&nodes[id].next); 324 nodes[next].locked = 0; 325 qemu_futex_wake(&nodes[next].locked, 1); 326} 327 328static void test_multi_fair_mutex_entry(void *opaque) 329{ 330 while (!qatomic_mb_read(&now_stopping)) { 331 mcs_mutex_lock(); 332 counter++; 333 mcs_mutex_unlock(); 334 qatomic_inc(&atomic_counter); 335 } 336 qatomic_dec(&running); 337} 338 339static void test_multi_fair_mutex(int threads, int seconds) 340{ 341 int i; 342 343 assert(mutex_head == -1); 344 counter = 0; 345 atomic_counter = 0; 346 now_stopping = false; 347 348 create_aio_contexts(); 349 assert(threads <= NUM_CONTEXTS); 350 running = threads; 351 for (i = 0; i < threads; i++) { 352 Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL); 353 aio_co_schedule(ctx[i], co1); 354 } 355 356 g_usleep(seconds * 1000000); 357 358 qatomic_mb_set(&now_stopping, true); 359 while (running > 0) { 360 g_usleep(100000); 361 } 362 363 join_aio_contexts(); 364 g_test_message("%d iterations/second", counter / seconds); 365 g_assert_cmpint(counter, ==, atomic_counter); 366} 367 368static void test_multi_fair_mutex_1(void) 369{ 370 test_multi_fair_mutex(NUM_CONTEXTS, 1); 371} 372 373static void test_multi_fair_mutex_10(void) 374{ 375 test_multi_fair_mutex(NUM_CONTEXTS, 10); 376} 377#endif 378 379/* Same test with pthread mutexes, for performance comparison and 380 * portability. */ 381 382static QemuMutex mutex; 383 384static void test_multi_mutex_entry(void *opaque) 385{ 386 while (!qatomic_mb_read(&now_stopping)) { 387 qemu_mutex_lock(&mutex); 388 counter++; 389 qemu_mutex_unlock(&mutex); 390 qatomic_inc(&atomic_counter); 391 } 392 qatomic_dec(&running); 393} 394 395static void test_multi_mutex(int threads, int seconds) 396{ 397 int i; 398 399 qemu_mutex_init(&mutex); 400 counter = 0; 401 atomic_counter = 0; 402 now_stopping = false; 403 404 create_aio_contexts(); 405 assert(threads <= NUM_CONTEXTS); 406 running = threads; 407 for (i = 0; i < threads; i++) { 408 Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL); 409 aio_co_schedule(ctx[i], co1); 410 } 411 412 g_usleep(seconds * 1000000); 413 414 qatomic_mb_set(&now_stopping, true); 415 while (running > 0) { 416 g_usleep(100000); 417 } 418 419 join_aio_contexts(); 420 g_test_message("%d iterations/second", counter / seconds); 421 g_assert_cmpint(counter, ==, atomic_counter); 422} 423 424static void test_multi_mutex_1(void) 425{ 426 test_multi_mutex(NUM_CONTEXTS, 1); 427} 428 429static void test_multi_mutex_10(void) 430{ 431 test_multi_mutex(NUM_CONTEXTS, 10); 432} 433 434/* End of tests. */ 435 436int main(int argc, char **argv) 437{ 438 init_clocks(NULL); 439 440 g_test_init(&argc, &argv, NULL); 441 g_test_add_func("/aio/multi/lifecycle", test_lifecycle); 442 if (g_test_quick()) { 443 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1); 444 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1); 445 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3); 446#ifdef CONFIG_LINUX 447 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1); 448#endif 449 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1); 450 } else { 451 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10); 452 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10); 453 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30); 454#ifdef CONFIG_LINUX 455 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10); 456#endif 457 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10); 458 } 459 return g_test_run(); 460}