From e4ed1290535cdb5a6e7be7894949f9e62049270a Mon Sep 17 00:00:00 2001 From: Sam Lantinga Date: Tue, 25 Jan 2011 23:23:52 -0800 Subject: [PATCH] Added a FIFO test to the atomic test suite. This is really useful because we might be able to use something like this for the SDL event queue. --- include/SDL_atomic.h | 3 + test/testatomic.c | 393 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 396 insertions(+) diff --git a/include/SDL_atomic.h b/include/SDL_atomic.h index d45a78ce8..7259f7c44 100644 --- a/include/SDL_atomic.h +++ b/include/SDL_atomic.h @@ -44,6 +44,9 @@ * subtle issues that can arise here: * http://msdn.microsoft.com/en-us/library/ee418650%28v=vs.85%29.aspx * + * There's also lots of good information here: + * http://www.1024cores.net/home/lock-free-algorithms + * * These operations may or may not actually be implemented using * processor specific atomic operations. When possible they are * implemented as true processor specific atomic operations. When that diff --git a/test/testatomic.c b/test/testatomic.c index cce12aefa..065b1d2c6 100644 --- a/test/testatomic.c +++ b/test/testatomic.c @@ -231,10 +231,403 @@ void RunEpicTest() /* End atomic operation test */ /**************************************************************************/ +/**************************************************************************/ +/* Lock-free FIFO test */ + +#define NUM_READERS 4 +#define NUM_WRITERS 4 +#define EVENTS_PER_WRITER 1000000 + +/* A decent guess for the size of a cache line on this architecture */ +#define CACHELINE 64 + +/* The number of entries must be a power of 2 */ +#define MAX_ENTRIES 256 +#define WRAP_MASK (MAX_ENTRIES-1) + +typedef struct +{ + SDL_atomic_t sequence; + SDL_Event event; +} SDL_EventQueueEntry; + +typedef struct +{ + SDL_EventQueueEntry entries[MAX_ENTRIES]; + + char cache_pad1[CACHELINE-((sizeof(SDL_EventQueueEntry)*MAX_ENTRIES)%CACHELINE)]; + + SDL_atomic_t enqueue_pos; + + char cache_pad2[CACHELINE-sizeof(SDL_atomic_t)]; + + SDL_atomic_t dequeue_pos; + + char cache_pad3[CACHELINE-sizeof(SDL_atomic_t)]; + + SDL_bool active; + + /* Only needed for the mutex test */ + SDL_mutex *mutex; + +} SDL_EventQueue; + +static void InitEventQueue(SDL_EventQueue *queue) +{ + int i; + + for (i = 0; i < MAX_ENTRIES; ++i) { + SDL_AtomicSet(&queue->entries[i].sequence, i); + } + SDL_AtomicSet(&queue->enqueue_pos, 0); + SDL_AtomicSet(&queue->dequeue_pos, 0); + queue->active = SDL_TRUE; +} + +static SDL_bool EnqueueEvent_LockFree(SDL_EventQueue *queue, const SDL_Event *event) +{ + SDL_EventQueueEntry *entry; + unsigned queue_pos; + unsigned entry_seq; + int delta; + + queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos); + for ( ; ; ) { + entry = &queue->entries[queue_pos & WRAP_MASK]; + entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence); + + delta = (int)(entry_seq - queue_pos); + if (delta == 0) { + /* The entry and the queue position match, try to increment the queue position */ + if (SDL_AtomicCAS(&queue->enqueue_pos, (int)queue_pos, (int)(queue_pos+1))) { + break; + } + } else if (delta < 0) { + /* We ran into an old queue entry, which means it still needs to be dequeued */ + return SDL_FALSE; + } else { + /* We ran into a new queue entry, get the new queue position */ + queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos); + } + } + + /* We own the object, fill it! */ + entry->event = *event; + SDL_AtomicSet(&entry->sequence, (int)(queue_pos + 1)); + + return SDL_TRUE; +} + +static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event) +{ + SDL_EventQueueEntry *entry; + unsigned queue_pos; + unsigned entry_seq; + int delta; + + queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos); + for ( ; ; ) { + entry = &queue->entries[queue_pos & WRAP_MASK]; + entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence); + + delta = (int)(entry_seq - (queue_pos + 1)); + if (delta == 0) { + /* The entry and the queue position match, try to increment the queue position */ + if (SDL_AtomicCAS(&queue->dequeue_pos, (int)queue_pos, (int)(queue_pos+1))) { + break; + } + } else if (delta < 0) { + /* We ran into an old queue entry, which means we've hit empty */ + return SDL_FALSE; + } else { + /* We ran into a new queue entry, get the new queue position */ + queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos); + } + } + + /* We own the object, fill it! */ + *event = entry->event; + SDL_AtomicSet(&entry->sequence, (int)(queue_pos+MAX_ENTRIES)); + + return SDL_TRUE; +} + +static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event) +{ + SDL_EventQueueEntry *entry; + unsigned queue_pos; + unsigned entry_seq; + int delta; + + SDL_mutexP(queue->mutex); + + queue_pos = (unsigned)queue->enqueue_pos.value; + entry = &queue->entries[queue_pos & WRAP_MASK]; + entry_seq = (unsigned)entry->sequence.value; + + delta = (int)(entry_seq - queue_pos); + if (delta == 0) { + ++queue->enqueue_pos.value; + } else if (delta < 0) { + /* We ran into an old queue entry, which means it still needs to be dequeued */ + SDL_mutexV(queue->mutex); + return SDL_FALSE; + } else { + printf("ERROR: mutex failed!\n"); + } + + /* We own the object, fill it! */ + entry->event = *event; + entry->sequence.value = (int)(queue_pos + 1); + + SDL_mutexV(queue->mutex); + + return SDL_TRUE; +} + +static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event) +{ + SDL_EventQueueEntry *entry; + unsigned queue_pos; + unsigned entry_seq; + int delta; + + SDL_mutexP(queue->mutex); + + queue_pos = (unsigned)queue->dequeue_pos.value; + entry = &queue->entries[queue_pos & WRAP_MASK]; + entry_seq = (unsigned)entry->sequence.value; + + delta = (int)(entry_seq - (queue_pos + 1)); + if (delta == 0) { + ++queue->dequeue_pos.value; + } else if (delta < 0) { + /* We ran into an old queue entry, which means we've hit empty */ + SDL_mutexV(queue->mutex); + return SDL_FALSE; + } else { + printf("ERROR: mutex failed!\n"); + } + + /* We own the object, fill it! */ + *event = entry->event; + entry->sequence.value = (int)(queue_pos + MAX_ENTRIES); + + SDL_mutexV(queue->mutex); + + return SDL_TRUE; +} + +static SDL_sem *writersDone; +static SDL_sem *readersDone; +static SDL_atomic_t writersRunning; +static SDL_atomic_t readersRunning; + +typedef struct +{ + SDL_EventQueue *queue; + int index; + char padding1[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int))%CACHELINE]; + int waits; + SDL_bool lock_free; + char padding2[CACHELINE-sizeof(int)-sizeof(SDL_bool)]; +} WriterData; + +typedef struct +{ + SDL_EventQueue *queue; + int counters[NUM_WRITERS]; + int waits; + SDL_bool lock_free; + char padding[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int)*NUM_WRITERS+sizeof(int)+sizeof(SDL_bool))%CACHELINE]; +} ReaderData; + +static int FIFO_Writer(void* _data) +{ + WriterData *data = (WriterData *)_data; + SDL_EventQueue *queue = data->queue; + int index = data->index; + int i; + SDL_Event event; + + event.type = SDL_USEREVENT; + event.user.windowID = 0; + event.user.code = 0; + event.user.data1 = data; + event.user.data2 = NULL; + + if (data->lock_free) { + for (i = 0; i < EVENTS_PER_WRITER; ++i) { + event.user.code = i; + while (!EnqueueEvent_LockFree(queue, &event)) { + ++data->waits; + SDL_Delay(0); + } + } + } else { + for (i = 0; i < EVENTS_PER_WRITER; ++i) { + event.user.code = i; + while (!EnqueueEvent_Mutex(queue, &event)) { + ++data->waits; + SDL_Delay(0); + } + } + } + SDL_AtomicAdd(&writersRunning, -1); + SDL_SemPost(writersDone); + return 0; +} + +static int FIFO_Reader(void* _data) +{ + ReaderData *data = (ReaderData *)_data; + SDL_EventQueue *queue = data->queue; + SDL_Event event; + int index; + + if (data->lock_free) { + for ( ; ; ) { + if (DequeueEvent_LockFree(queue, &event)) { + WriterData *writer = (WriterData*)event.user.data1; + ++data->counters[writer->index]; + } else if (queue->active) { + ++data->waits; + SDL_Delay(0); + } else { + /* We drained the queue, we're done! */ + break; + } + } + } else { + for ( ; ; ) { + if (DequeueEvent_Mutex(queue, &event)) { + WriterData *writer = (WriterData*)event.user.data1; + ++data->counters[writer->index]; + } else if (queue->active) { + ++data->waits; + SDL_Delay(0); + } else { + /* We drained the queue, we're done! */ + break; + } + } + } + SDL_AtomicAdd(&readersRunning, -1); + SDL_SemPost(readersDone); + return 0; +} + +static void RunFIFOTest(SDL_bool lock_free) +{ + SDL_EventQueue queue; + WriterData writerData[NUM_WRITERS]; + ReaderData readerData[NUM_READERS]; + Uint32 start, end; + int i, j; + int grand_total; + + printf("\nFIFO test---------------------------------------\n\n"); + printf("Mode: %s\n", lock_free ? "LockFree" : "Mutex"); + + readersDone = SDL_CreateSemaphore(0); + writersDone = SDL_CreateSemaphore(0); + + SDL_memset(&queue, 0xff, sizeof(queue)); + + InitEventQueue(&queue); + if (!lock_free) { + queue.mutex = SDL_CreateMutex(); + } + + start = SDL_GetTicks(); + + /* Start the readers first */ + printf("Starting %d readers\n", NUM_READERS); + SDL_zero(readerData); + SDL_AtomicSet(&readersRunning, NUM_READERS); + for (i = 0; i < NUM_READERS; ++i) { + readerData[i].queue = &queue; + readerData[i].lock_free = lock_free; + SDL_CreateThread(FIFO_Reader, &readerData[i]); + } + + /* Start up the writers */ + printf("Starting %d writers\n", NUM_WRITERS); + SDL_zero(writerData); + SDL_AtomicSet(&writersRunning, NUM_WRITERS); + for (i = 0; i < NUM_WRITERS; ++i) { + writerData[i].queue = &queue; + writerData[i].index = i; + writerData[i].lock_free = lock_free; + SDL_CreateThread(FIFO_Writer, &writerData[i]); + } + + /* Wait for the writers */ + while (SDL_AtomicGet(&writersRunning) > 0) { + SDL_SemWait(writersDone); + } + + /* Shut down the queue so readers exit */ + queue.active = SDL_FALSE; + + /* Wait for the readers */ + while (SDL_AtomicGet(&readersRunning) > 0) { + SDL_SemWait(readersDone); + } + + end = SDL_GetTicks(); + + SDL_DestroySemaphore(readersDone); + SDL_DestroySemaphore(writersDone); + + if (!lock_free) { + SDL_DestroyMutex(queue.mutex); + } + + printf("Finished in %f sec\n", (end - start) / 1000.f); + + printf("\n"); + for (i = 0; i < NUM_WRITERS; ++i) { + printf("Writer %d wrote %d events, had %d waits\n", i, EVENTS_PER_WRITER, writerData[i].waits); + } + printf("Writers wrote %d total events\n", NUM_WRITERS*EVENTS_PER_WRITER); + + /* Print a breakdown of which readers read messages from which writer */ + printf("\n"); + grand_total = 0; + for (i = 0; i < NUM_READERS; ++i) { + int total = 0; + for (j = 0; j < NUM_WRITERS; ++j) { + total += readerData[i].counters[j]; + } + grand_total += total; + printf("Reader %d read %d events, had %d waits\n", i, total, readerData[i].waits); + printf(" { "); + for (j = 0; j < NUM_WRITERS; ++j) { + if (j > 0) { + printf(", "); + } + printf("%d", readerData[i].counters[j]); + } + printf(" }\n"); + } + printf("Readers read %d total events\n", grand_total); +} + +/* End FIFO test */ +/**************************************************************************/ + int main(int argc, char *argv[]) { RunBasicTest(); RunEpicTest(); +/* This test is really slow, so don't run it by default */ +#if 0 + RunFIFOTest(SDL_FALSE); +#endif + RunFIFOTest(SDL_TRUE); return 0; } + +/* vi: set ts=4 sw=4 expandtab: */