Multithreading : Lockless thread safe SPSC ring-buffer queue

1. Introduction : We need different thread safe data structures/containers in multithreaded programming. Even though many applications use MPMC ( multi producer multi consumer ) , sometimes we need single producer single consumer solutions.

In this article , I will show a SPSC thread safe queue implementation without using mutual exclusion/critical section based on ring buffer data structure.This implementation is based on a video I watched on that address : https://skillsmatter.com/skillscasts/6163-high-performance-single-producer-single-consumer-in-memory-queue

And based on the video , the original is implementation is based on LMAX Distruptor project , which is an opensource high performance library used in LMAX trading system : https://lmax-exchange.github.io/disruptor/

You can find source code for this article on :

https://github.com/akhin/benchmarks/tree/master/concurrency_spsc_ringbuffer_queue

2. What is a ring buffer ? : Ring buffer is a cyclic/circular buffer. It can easily be used as underlying data structure for FIFO ( First in First out ) queue implementations.

Below you can see its structure :

ring_buffer_structure

Below you can see how it is implemented on memory :

ring_buffer_memory

For the implementation you need to pointers pointing to head and tail. And you increment them as you make enqueue and dequeue operations. Below you can see a non-thread-safe ring buffer operating on 64 bit ints :

It is quite advantageous using it under memory constraints as you don`t need to allocate more memory. On the other hand one disadvantage is you have to know your maximum queue size at the beginning. Therefore it might lead to data overriding.

3. Lock based thread safe queue implementation : For a simple lock based thread safe implementation , we can use mutual exclusion. For this one we will add locks to push and pop functions :


4. Lockless thread safe queue implementation :  Rather than using locks, we will get help from atomic variables.  To make it thread safe, what we need to ensure is that head pointer never catches tail pointer from behind or vice versa. For the implementation we will introduce 2 more helper functions called try_push and try_pop :

As you can see above , try_push tries to see if head is about to catch the tail from its behind.And as for synchronisation we make an atomic load for the comparison. The same logic applies to try_pop. Actually by doing this we introduce a virtual limit to a ring buffer queue , therefore our unbounded queue somehow becomes a virtually bounded queue.

5. Benchmark code :

For the bechmark we can create only 1 producer and 1 consumer thread. However we can do as many operation as we want. For benchmarking I specified ring buffer size as 1000 and each thread are making 1 million operations :

 

6. Benchmark result – Lockless vs Locked queue : Here is my test system :

uname -a
Linux akhin-GS60-2PC-Ghost 3.16.0-40-generic #54~14.04.1-Ubuntu SMP Wed Jun 10 17:30:45 UTC 2015 x86_64 x86_64 x86_64 GNU/Lin

cat /proc/cpuinfo

model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz

For benchmarking, I have used “perf stat” on my Ubuntu 14.0.4 LTS :

Lock based queue  :  0.437855869 seconds time elapsed

Lockless queue : 0.109763995 seconds time elapsed

Note : Please note that the example classes are only for displaying features, they are not production ready as they are not even freeing memory allocated. For a more complete example , please see :

https://github.com/akhin/cpp_multithreaded_order_matching_engine/tree/master/source/concurrent

Advertisements

9 thoughts on “Multithreading : Lockless thread safe SPSC ring-buffer queue”

    1. Hi ,

      It is just a general convention when implementing a thread safe container , no specific goal for this post. So you provide both “push” and “try_push , and the caller will pick whichever he/she want , obviously non-try version is not mandatory as an external caller can wrap his/her own version so if you just want to try to see whether you can push at a specific time or if another thread locked the container, you use the try version. For example : This is not C++ , it is .Net however you can Microsoft`s try and non-try method implementations in their thread safe dictionary : https://msdn.microsoft.com/en-us/library/dd287191(v=vs.110).aspx

  1. Great article, however I think there is a slight problem with this lock free SPSC. The actual capacity is only ringbuffersize -1 instead of ringbuffersize.
    i.e. if you only has one thread to push, no pop thread; then you can only push items into buffer[0] ~ buffer[ringbuffersize-2].

    I have modified the code so the size now is ringbuffersize:

    #include
    #include
    #include
    #include

    #define RING_BUFFER_SIZE 200

    class ring_buffer_queue {

    private:
    std::atomic write;
    std::atomic read;
    uint64_t size = RING_BUFFER_SIZE;
    int64_t buffer[RING_BUFFER_SIZE];

    public:

    ring_buffer_queue()
    {
    write.store(0);
    read.store(0);
    }

    void push(int64_t val)
    {
    while (!tryPush(val));
    }

    int64_t pop()
    {
    int64_t ret = 0;
    while (!tryPop(&ret));
    return ret;
    }

    bool tryPush(int64_t val)
    {
    const auto current_tail = write.load();
    const auto next_tail = current_tail + 1;
    if (current_tail – read.load() > size – 1)
    return false;
    printf(“push %d\n”, val);
    buffer[current_tail % size] = val;
    write.store(next_tail);
    return true;
    }

    bool tryPop(int64_t *p)
    {
    const auto current_head = read.load();
    const auto next_head = current_head + 1;

    if (current_head == write.load())
    return false;
    *p = buffer[current_head];
    printf(“pop %d\n”, *p);
    read.store(next_head);
    return true;
    }
    };

    int main(int argc, char** argv)
    {
    ring_buffer_queue queue;

    std::thread write_thread([&]() {
    for (int i = 0; i<RING_BUFFER_SIZE; i++)
    {
    queue.push(i);
    }
    } // End of lambda expression
    );
    std::thread read_thread([&]() {
    for (int i = 0; i<RING_BUFFER_SIZE; i++)
    {
    queue.pop();
    }
    }
    );
    write_thread.join();
    read_thread.join();

    return 0;
    }

    1. Hi youchao ,

      Many thanks for your correction. As you said it wasn`t using the buffer to full capacity, I`ve updated the try_push and try_pop implementations. One thing in your try_pop is *p = buffer[current_head]; line , I used *p = buffer[current_head %size ]; instead

  2. Sorry but this code doesnt work with multiple producers and consumers.
    for example tryPush(): every producer loads the b.head into currentTail, which is for all producers 0, as the increment comes later but all consumers are already past the loading part. so all producers push to 0 and all consumers pop 0. atomic only makes the read/write to the value atomic, not the whole function.

    atomic ringbuffers only work if you have 1 producer and 1 consumer

    1. Hi Raphael , yes it will not work with multiple producer or consumers as it is meant only for single producer and single producer case as in SPSC.

    1. You can use tryPop method in order to avoid deadlocks and can implement your own waiting or yielding. Regarding the benchmark code, yes you are right as it is calling pop instead of tryPop , it would cause the consumer to hang if no messages were pushed to the ring buffer.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s