sync_stress_consumer.c (5380B)
1/* 2 * sync stress test: producer/consumer 3 * Copyright 2015-2016 Collabora Ltd. 4 * 5 * Based on the implementation from the Android Open Source Project, 6 * 7 * Copyright 2012 Google, Inc 8 * 9 * Permission is hereby granted, free of charge, to any person obtaining a 10 * copy of this software and associated documentation files (the "Software"), 11 * to deal in the Software without restriction, including without limitation 12 * the rights to use, copy, modify, merge, publish, distribute, sublicense, 13 * and/or sell copies of the Software, and to permit persons to whom the 14 * Software is furnished to do so, subject to the following conditions: 15 * 16 * The above copyright notice and this permission notice shall be included in 17 * all copies or substantial portions of the Software. 18 * 19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 22 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 23 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 24 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 25 * OTHER DEALINGS IN THE SOFTWARE. 26 */ 27 28#include <pthread.h> 29 30#include "sync.h" 31#include "sw_sync.h" 32#include "synctest.h" 33 34/* IMPORTANT NOTE: if you see this test failing on your system, it may be 35 * due to a shortage of file descriptors. Please ensure your system has 36 * a sensible limit for this test to finish correctly. 37 */ 38 39/* Returns 1 on error, 0 on success */ 40static int busy_wait_on_fence(int fence) 41{ 42 int error, active; 43 44 do { 45 error = sync_fence_count_with_status(fence, FENCE_STATUS_ERROR); 46 ASSERT(error == 0, "Error occurred on fence\n"); 47 active = sync_fence_count_with_status(fence, 48 FENCE_STATUS_ACTIVE); 49 } while (active); 50 51 return 0; 52} 53 54static struct { 55 int iterations; 56 int threads; 57 int counter; 58 int consumer_timeline; 59 int *producer_timelines; 60 pthread_mutex_t lock; 61} test_data_mpsc; 62 63static int mpsc_producer_thread(void *d) 64{ 65 int id = (long)d; 66 int fence, valid, i; 67 int *producer_timelines = test_data_mpsc.producer_timelines; 68 int consumer_timeline = test_data_mpsc.consumer_timeline; 69 int iterations = test_data_mpsc.iterations; 70 71 for (i = 0; i < iterations; i++) { 72 fence = sw_sync_fence_create(consumer_timeline, "fence", i); 73 valid = sw_sync_fence_is_valid(fence); 74 ASSERT(valid, "Failure creating fence\n"); 75 76 /* 77 * Wait for the consumer to finish. Use alternate 78 * means of waiting on the fence 79 */ 80 81 if ((iterations + id) % 8 != 0) { 82 ASSERT(sync_wait(fence, -1) > 0, 83 "Failure waiting on fence\n"); 84 } else { 85 ASSERT(busy_wait_on_fence(fence) == 0, 86 "Failure waiting on fence\n"); 87 } 88 89 /* 90 * Every producer increments the counter, the consumer 91 * checks and erases it 92 */ 93 pthread_mutex_lock(&test_data_mpsc.lock); 94 test_data_mpsc.counter++; 95 pthread_mutex_unlock(&test_data_mpsc.lock); 96 97 ASSERT(sw_sync_timeline_inc(producer_timelines[id], 1) == 0, 98 "Error advancing producer timeline\n"); 99 100 sw_sync_fence_destroy(fence); 101 } 102 103 return 0; 104} 105 106static int mpcs_consumer_thread(void) 107{ 108 int fence, merged, tmp, valid, it, i; 109 int *producer_timelines = test_data_mpsc.producer_timelines; 110 int consumer_timeline = test_data_mpsc.consumer_timeline; 111 int iterations = test_data_mpsc.iterations; 112 int n = test_data_mpsc.threads; 113 114 for (it = 1; it <= iterations; it++) { 115 fence = sw_sync_fence_create(producer_timelines[0], "name", it); 116 for (i = 1; i < n; i++) { 117 tmp = sw_sync_fence_create(producer_timelines[i], 118 "name", it); 119 merged = sync_merge("name", tmp, fence); 120 sw_sync_fence_destroy(tmp); 121 sw_sync_fence_destroy(fence); 122 fence = merged; 123 } 124 125 valid = sw_sync_fence_is_valid(fence); 126 ASSERT(valid, "Failure merging fences\n"); 127 128 /* 129 * Make sure we see an increment from every producer thread. 130 * Vary the means by which we wait. 131 */ 132 if (iterations % 8 != 0) { 133 ASSERT(sync_wait(fence, -1) > 0, 134 "Producers did not increment as expected\n"); 135 } else { 136 ASSERT(busy_wait_on_fence(fence) == 0, 137 "Producers did not increment as expected\n"); 138 } 139 140 ASSERT(test_data_mpsc.counter == n * it, 141 "Counter value mismatch!\n"); 142 143 /* Release the producer threads */ 144 ASSERT(sw_sync_timeline_inc(consumer_timeline, 1) == 0, 145 "Failure releasing producer threads\n"); 146 147 sw_sync_fence_destroy(fence); 148 } 149 150 return 0; 151} 152 153int test_consumer_stress_multi_producer_single_consumer(void) 154{ 155 int iterations = 1 << 12; 156 int n = 5; 157 long i, ret; 158 int producer_timelines[n]; 159 int consumer_timeline; 160 pthread_t threads[n]; 161 162 consumer_timeline = sw_sync_timeline_create(); 163 for (i = 0; i < n; i++) 164 producer_timelines[i] = sw_sync_timeline_create(); 165 166 test_data_mpsc.producer_timelines = producer_timelines; 167 test_data_mpsc.consumer_timeline = consumer_timeline; 168 test_data_mpsc.iterations = iterations; 169 test_data_mpsc.threads = n; 170 test_data_mpsc.counter = 0; 171 pthread_mutex_init(&test_data_mpsc.lock, NULL); 172 173 for (i = 0; i < n; i++) { 174 pthread_create(&threads[i], NULL, (void * (*)(void *)) 175 mpsc_producer_thread, (void *)i); 176 } 177 178 /* Consumer thread runs here */ 179 ret = mpcs_consumer_thread(); 180 181 for (i = 0; i < n; i++) 182 pthread_join(threads[i], NULL); 183 184 return ret; 185}