A bounded SPSC lock-free queue implementation

C++
Author

Quasar

Published

June 8, 2025

Designing the spsc_queue data-structure

There are several important performance considerations, when implementing an SPSC lock-free queue.

read_index and write_index indices

A queue needs to keep track of the read_index (the head of the queue) and the write_index (the tail of the queue). These indices should be atomically incremented.

Why implementing growing lock-free queues is much harder?

Lock-free queues are generally bounded queues. Imagine a scenario where the underlying data-structure of a lock-free queue is std::vector<T> which can grow dynamically in size.

class spsc_queue{
    std::vector<int> m_buffer;
    std::atomic<std::size_t> m_write_index{0};
    std::atomic<std::size_t> m_read_index{0};
};

Now, a writer thread W that wants to write an element to the queue, performs push_back() on the vector. Assume that the vector’s size() == capacity(), so it is full. Internally the vector does the following steps:

  • A new heap memory allocation is performed with a larger capacity equal to two times the original capacity.

  • Move/copy construct elements of type T from the old memory block to the new memory block.

  • Free the old memory block.

m_buffer is reassigned to the new memory block. Since there is more room for the new element, it Will be copied/moved to m_buffer[write_index + 1].

At this point a reader thread R wants to pop an element off the queue.

The writer thread W, however, hasn’t synchronized with the reader thread R.

So, the reader thread R accesses m_buffer[read_index]. However, in this thread m_buffer still refers to the old memory block. This is UB(undefined behavior).

Consequently, lock-free queues are usually bounded. You can implement growing lock-free queues, it’s much harder and potentially has higher overhead compared to a fixed-capacity queue.

Cache lines and cache coherency.

A cache-line is typically \(64\)-bytes on most machines.

If cache line X is in the cache of multiple cores, and one of these cores mutates its data, then the change must be reflected in all the cores owning X via a cache coherency mechanism

For simplicity, suppose there are \(2\) cores - A and B. Further assume, that core A reads and writes to a variable a, core B reads and writes to a variable b, and a and b are close to each other in main memory - they are on the same cache line.

Core A first reads the value of a from main memory. It therefore loads the entire cache line and marks it as exclusive access as it is the only core operating on this cache line. Core B decides to read the value of b. Since, a and b are close and reside on the same cache line, Core B loads the same cache line and both cores tag their cache lines as shared access.

Now, let’s suppose core A decides to change the value of a. The core A stores this change only in its store buffer and marks its cache line as modified. It also communicates this change to core B, and this core in turn will mark its cache as invalidated.

That’s how different cores ensure their caches are coherent with each other.

CPU Cache line

False Sharing

False-sharing occurs when threads on different processors modify different variables residing on the same cache-line.

What happens now, when core B decides to re-read the value of b? Since, the cache line for core B is invalidated, it should now read the value of b from main memory again. This will force the core A to flush its store buffer, and then core B re-fetches the latest cache-line version from main memory. Both cores end up with the latest cache-line version in the shared state again.

So, this imposes a cache miss to one core and an early buffer flush to another one, even though the two cores weren’t operating on the same memory location. This is called false-sharing.

Keeping objects close in memory is often very desirable for performance reasons. False sharing is a potential risk for multithreaded applications that mutate data.

Avoiding false sharing

To avoid false-sharing, the std::hardware_destructive_interference_size constant defined in the thread header, is used to determine the cache-line size and is to be used with alignas().

So, we define our spspc_queue class as follows:

template<Queueable T, std::size_t N>
    class spsc_queue{
        private:
        using size_type = std::size_t;
        using value_type = T;
        using reference = T&;

        static constexpr std::size_t m_capacity {1 << N};
        T m_buffer[m_capacity];
        alignas(std::hardware_destructive_interference_size) std::atomic<std::size_t> m_read_index{ 0 };
        alignas(std::hardware_destructive_interference_size) std::atomic<std::size_t> m_write_index{ 0 };
    };

Implementing try_push and try_pop methods

#include <iostream>
#include <atomic>
#include <type_traits>
#include <concepts>
#include <vector>
#include <optional>
#include <thread>
#include <math.h>

namespace dev{
    
    template<typename T>
    concept Queueable = std::default_initializable<T> && std::move_constructible<T>;

    /**
     * @brief The `spsc_queue` class provides a single-reader, single-writer
     * fifo queue. 
     */
    template<Queueable T, std::size_t N>
    class spsc_queue{
        private:
        using size_type = std::size_t;
        using value_type = T;
        using reference = T&;

        static constexpr std::size_t m_capacity {(1 << N)};
        T m_buffer[m_capacity];
        alignas(std::hardware_destructive_interference_size) std::atomic<std::size_t> m_read_index{ 0 };
        alignas(std::hardware_destructive_interference_size) std::atomic<std::size_t> m_write_index{ 0 };

        public:
        spsc_queue() = default;
        spsc_queue(const spsc_queue&) = delete;
        spsc_queue& operator=(const spsc_queue&) = delete;
        spsc_queue(spsc_queue&&) = delete;
        spsc_queue& operator=(spsc_queue&&) = delete;

        /**
         * @brief pushes an element onto the ringbuffer.
         * @param `element` will be pushed to the queue unless the queue is not full
         */
        template<typename U>
        requires std::is_convertible_v<T,U>
        bool try_push(U&& element){
            const std::size_t write_index = m_write_index.load(std::memory_order_relaxed);
            const std::size_t next_write_index = (write_index + 1) & (m_capacity - 1);

            if(next_write_index != m_read_index.load(std::memory_order_acquire))
            {
                m_buffer[write_index] = std::forward<U>(element);
                m_write_index.store(next_write_index, std::memory_order_release);
                return true;
            }   
            return false;
        }
        
        std::optional<T> try_pop(){
            std::optional<T> result{std::nullopt};
            const std::size_t read_index = m_read_index.load(std::memory_order_relaxed);

            if(read_index == m_write_index.load(std::memory_order_acquire))
                return result;

            result = std::move(m_buffer[read_index]);
            m_read_index.store((read_index + 1 ) & (m_capacity - 1), std::memory_order_release);
            return result;
        }
    };
}