A thread-safe queue implementation

C++
Author

Quasar

Published

February 23, 2025

Producer-consumer problem

In the producer-consumer problem, we have two classes of threads, producers and consumers and a buffer containing a fixed number of slots. A producer thread attempts to put something into the next empty buffer slot, a consumer thread attempts to take something out of the next occupied buffer slot. The synchronization conditions are that producers cannot proceed unless there are empty slots and consumers cannot proceed unless there are occupied slots. The problem occurs because of the different rates at which producers deposit and consumers exhaust data.

This is a classic, but frequently occurring synchronization problem. For example, the heart of the implementation of UNIX pipes is an instance of this problem.

Ring buffer

Consider a single, fixed-size buffer as if it were connected end-to-end, such that the oldest entry is processed first. This is a circular FIFO queue.

What do we use SPSC FIFO queues for? In the industry, you often have a pipeline of processes. For example, you have one thread reading from sockets, another thread that handles the messages from the sockets and maybe processes them and produces a result and a third thread writes a response to the network. Those can be connected by SPSC FIFO queues. There’s a couple of advantages to this. All these advantages and disadvantages are subject to measurement, so always measure. It may improve the throughput over just a single thread doing all \(3\) of these operations, in fact, I’ll be surprised if it didn’t. It also should improve the resiliency of the application to spikes in message traffic. Some of the disadvantages are that you have to manage 3 threads and it probably uses more memory, because each of the FIFO queues needs place to store its messages.

We all have come across circular FIFO queues. We usually have two cursors - rear and front. Items are pushed to the rear of the queue and popped off the front of the queue.

Circular FIFO Queue

When we push(42) into the FIFO queue, the rear cursor is incremented and each time we pop(), the front cursor is incremented. When the front cursor and the rear cursor are no longer equal, the FIFO queue is no longer empty. Eventually, we push so many values in, that the FIFO queue fills up. At this point, the rear cursor is capacity greater than the front cursor.

The FIFO queue empty and queue full conditions use the remainder operator %. Division uses \(20\) to \(30\) cycles so it is a bit expensive. Another approach is to constrain the buffer size to an integral power of \(2\), and use the bitwise & operator and that’s a \(1\) cycle operation.

Implementation notes

#include <iostream>
#include <queue>
#include <thread>
#include <array>
#include <numeric>
#include <memory>
#include <condition_variable>
#include <mutex>
#include <shared_mutex>

namespace dev {
    template<typename T>
    class ring_buffer {
    private:
        enum {min_capacity = 128};
        T* ring;
        int m_front;
        int m_rear;
        int m_capacity;

    public:
        /* Default constructor*/
        ring_buffer() 
            : m_front{0}
            , m_rear{0}
            , ring{nullptr}
            , m_capacity{0}
        { 
            ring = static_cast<T*>(operator new(min_capacity));
            m_capacity = min_capacity;
        }

        ring_buffer(int capacity)
            : m_front{ 0 }
            , m_rear{ 0 }
            , ring{ nullptr }
            , m_capacity{ 0 }
        {
            ring = static_cast<T*>(operator new(capacity));
            m_capacity = capacity;
        }

        /* Copy constructor - Perform a deep copy */
        ring_buffer(const ring_buffer<T>& other)
            : m_front{ 0 }
            , m_rear{ 0 }
            , ring{ nullptr }
            , m_capacity{ 0 }
        {
            /* Allocation */
            ring = static_cast<T*>(operator new(other.m_capacity));

            m_capacity = other.m_capacity;

            /* Construction */
            for (int i{0}; i < other.size(); ++i)
            {
                new (&ring[i]) T(other[i]);
            }
        }

        /* Swap */
        friend void swap(ring_buffer<T>& lhs, ring_buffer<T>& rhs)
        {
            std::swap(lhs.m_front, rhs.m_front);
            std::swap(lhs.m_rear, rhs.m_rear);
            std::swap(lhs.m_capacity, rhs.m_capacity);
            std::swap(lhs.ring, rhs.ring);
        }

        /* Copy assignment */
        ring_buffer<T>& operator=(const ring_buffer<T>& other)
        {
            ring_buffer<T> temp{ other };  //Copy-construct
            swap(*this, temp);
            return *this;
        }

        T& front() {
            if (empty())
                throw std::exception("buffer is empty!");

            return ring[m_front % m_capacity];
        }

        T& back() {
            if (empty())
                throw std::exception("buffer is empty!");

            return ring[(m_rear - 1) % m_capacity];
        }

        T& operator[](int i) const {
            if (empty())
                throw std::exception("buffer is empty!");

            return ring[(m_front + i) % m_capacity];
        }

        bool empty() const {
            return m_front == m_rear;
        }

        bool full() const {
            return size() == capacity();
        }

        void push(const T& value) {
            if (full())
                throw std::exception("buffer is full!");

            new (&ring[m_rear % m_capacity]) T(value);
            std::cout << "\n" << "pushed " << value << " to buffer";
            ++m_rear;
        }

        void push(T&& value) {
            if (full())
                throw std::exception("buffer is full!");

            new (&ring[m_rear % m_capacity]) T(std::move(value));
            std::cout << "\n" << "pushed " << value << " to buffer";
            ++m_rear;
        }

        void pop() {
            if (empty())
                throw std::exception("buffer is empty!");

            T value = front();
            ring[m_front % m_capacity].~T();
            std::cout << "\n" << "popped " << value << " off buffer";
            ++m_front;
        }

        int capacity() const{
            return m_capacity;
        }

        int size() const {
            return (m_rear - m_front);
        }

        void print() {
            for (int i{ 0 };i < size();++i) {
                std::cout << "\n" << "ring[" << i << "] = " << (*this)[i];
            }
        }
    };
}

Running the above code-snippet, we find that there are several occassions when there are buffer overflows or underflows, and there are also data races.

Building a thread-safe queue using condition variables

You essentially have three groups of operations : those that query the state of the whole queue(empty() and size()), those that query the elements of the queue(front() and back()) and those that modify the queue (push(), pop() and emplace()). This is the same as we’ve seen for the stack container adapter, and we have the same issues regarding race conditions inherent in the interface. Consequently, we need to combine front() and pop() into a single function call, much as you combined pop() and top() for the stack.

namespace dev {
    template<typename T>
    class threadsafe_ring_buffer {
    public:
        threadsafe_ring_buffer() : m_ring_buffer{ring_buffer<T>()}
        {}

        threadsafe_ring_buffer(int capacity) : m_ring_buffer{ring_buffer<T>(capacity)}
        {}

        threadsafe_ring_buffer(const threadsafe_ring_buffer<T>& other)
        {
            std::shared_lock<std::shared_mutex> lck(mtx);
            m_ring_buffer = other.m_ring_buffer;
        }
        threadsafe_ring_buffer<T>& operator=(const threadsafe_ring_buffer<T>& other) = delete;

        void wait_and_push(T new_value) {
            std::unique_lock<std::shared_mutex> lck(mtx);
            queue_not_full_cond.wait(lck, [this]() {return !m_ring_buffer.full();});
            m_ring_buffer.push(new_value);
            queue_not_empty_cond.notify_one();
        }

        bool try_push(T new_value) {
            std::unique_lock<std::shared_mutex> lck(mtx);
            if (!m_ring_buffer.full())
                return false;

            m_ring_buffer.push(new_value);
            return true;
        }

        void wait_and_pop(T& value) {
            std::unique_lock<std::shared_mutex> lck(mtx);
            queue_not_empty_cond.wait(lck, [this]() { return !m_ring_buffer.empty();});
            value = m_ring_buffer.front();
            m_ring_buffer.pop();
            queue_not_full_cond.notify_one();
        }

        void try_pop(T& value) {
            std::unique_lock<std::shared_mutex> lck(mtx);
            if (m_ring_buffer.empty())
                return false;

            value = m_ring_buffer.front();
            m_ring_buffer.pop();
            return true;
        }

        std::shared_ptr<T> wait_and_pop() {
            std::unique_lock<std::shared_mutex> lck(mtx);
            queue_not_empty_cond.wait([this]() { return !empty(); });
            std::shared_ptr<T> result{ std::make_shared<T>(m_ring_buffer.front()) };
            m_ring_buffer.pop();
            return result;
        }

        std::shared_ptr<T> try_pop() {
            std::unique_lock<std::shared_mutex> lck(mtx);
            if (m_ring_buffer.empty())
                return nullptr;

            std::shared_ptr<T> result{ std::make_shared<T>(m_ring_buffer.front()) };
            m_ring_buffer.pop();
            return result;
        }

        bool empty() const {
            std::shared_lock<std::shared_mutex> lck(mtx);
            return m_ring_buffer.empty();
        }

        bool full() const {
            std::shared_lock<std::shared_mutex> lck(mtx);
            return m_ring_buffer.full();
        }

        int size() const {
            std::shared_lock<std::shared_mutex> lck(mtx);
            return m_ring_buffer.size();
        }

        int capacity() const {
            std::shared_lock<std::shared_mutex> lck(mtx);
            return m_ring_buffer.capacity();
        }

    private:
        ring_buffer<T> m_ring_buffer;
        mutable std::shared_mutex mtx;
        std::condition_variable_any queue_not_empty_cond;
        std::condition_variable_any queue_not_full_cond;
    };
}

int main()
{
    dev::threadsafe_ring_buffer<int> buffer(64);

    std::thread producer(
        [&]() {
            for (int i{ 1 };i <= 1000;++i)
            {
                try {
                    buffer.wait_and_push(i);
                }
                catch (std::exception e) {
                    std::cout << "\n" << "buffer full!";
                }
            }
        }
    );

    std::thread consumer(
        [&]() {
            for (int i{ 1 };i <= 1000;++i)
            {
                try {
                    int value;
                    buffer.wait_and_pop(value);
                    std::this_thread::sleep_for(std::chrono::microseconds(1));
                }
                catch (std::exception e) {
                    std::cout << "\n" << "buffer empty!";
                }
            }
        }
    );

    producer.join();
    consumer.join();
    
    std::cout << "\nFinished execution";
}