Producer-consumer problem
In the producer-consumer problem, we have two classes of threads, producers and consumers and a buffer containing a fixed number of slots. A producer thread attempts to put something into the next empty buffer slot, a consumer thread attempts to take something out of the next occupied buffer slot. The synchronization conditions are that producers cannot proceed unless there are empty slots and consumers cannot proceed unless there are occupied slots. The problem occurs because of the different rates at which producers deposit and consumers exhaust data.
This is a classic, but frequently occurring synchronization problem. For example, the heart of the implementation of UNIX pipes is an instance of this problem.
Ring buffer
Consider a single, fixed-size buffer as if it were connected end-to-end, such that the oldest entry is processed first. This is a circular FIFO queue.
What do we use SPSC FIFO queues for? In the industry, you often have a pipeline of processes. For example, you have one thread reading from sockets, another thread that handles the messages from the sockets and maybe processes them and produces a result and a third thread writes a response to the network. Those can be connected by SPSC FIFO queues. There’s a couple of advantages to this. All these advantages and disadvantages are subject to measurement, so always measure. It may improve the throughput over just a single thread doing all \(3\) of these operations, in fact, I’ll be surprised if it didn’t. It also should improve the resiliency of the application to spikes in message traffic. Some of the disadvantages are that you have to manage 3 threads and it probably uses more memory, because each of the FIFO queues needs place to store its messages.
We all have come across circular FIFO queues. We usually have two cursors - rear
and front
. Items are pushed to the rear
of the queue and popped off the front
of the queue.
When we push(42)
into the FIFO queue, the rear
cursor is incremented and each time we pop()
, the front
cursor is incremented. When the front
cursor and the rear
cursor are no longer equal, the FIFO queue is no longer empty. Eventually, we push so many values in, that the FIFO queue fills up. At this point, the rear
cursor is capacity
greater than the front
cursor.
The FIFO queue empty and queue full conditions use the remainder operator %
. Division uses \(20\) to \(30\) cycles so it is a bit expensive. Another approach is to constrain the buffer size to an integral power of \(2\), and use the bitwise &
operator and that’s a \(1\) cycle operation.
Implementation notes
#include <iostream>
#include <queue>
#include <thread>
#include <array>
#include <numeric>
#include <memory>
#include <condition_variable>
#include <mutex>
#include <shared_mutex>
namespace dev {
template<typename T>
class ring_buffer {
private:
enum {min_capacity = 128};
T* ring;
int m_front;
int m_rear;
int m_capacity;
public:
/* Default constructor*/
ring_buffer()
: m_front{0}
, m_rear{0}
, ring{nullptr}
, m_capacity{0}
{
ring = static_cast<T*>(operator new(min_capacity));
m_capacity = min_capacity;
}
ring_buffer(int capacity)
: m_front{ 0 }
, m_rear{ 0 }
, ring{ nullptr }
, m_capacity{ 0 }
{
ring = static_cast<T*>(operator new(capacity));
m_capacity = capacity;
}
/* Copy constructor - Perform a deep copy */
ring_buffer(const ring_buffer<T>& other)
: m_front{ 0 }
, m_rear{ 0 }
, ring{ nullptr }
, m_capacity{ 0 }
{
/* Allocation */
ring = static_cast<T*>(operator new(other.m_capacity));
m_capacity = other.m_capacity;
/* Construction */
for (int i{0}; i < other.size(); ++i)
{
new (&ring[i]) T(other[i]);
}
}
/* Swap */
friend void swap(ring_buffer<T>& lhs, ring_buffer<T>& rhs)
{
std::swap(lhs.m_front, rhs.m_front);
std::swap(lhs.m_rear, rhs.m_rear);
std::swap(lhs.m_capacity, rhs.m_capacity);
std::swap(lhs.ring, rhs.ring);
}
/* Copy assignment */
ring_buffer<T>& operator=(const ring_buffer<T>& other)
{
ring_buffer<T> temp{ other }; //Copy-construct
swap(*this, temp);
return *this;
}
T& front() {
if (empty())
throw std::exception("buffer is empty!");
return ring[m_front % m_capacity];
}
T& back() {
if (empty())
throw std::exception("buffer is empty!");
return ring[(m_rear - 1) % m_capacity];
}
T& operator[](int i) const {
if (empty())
throw std::exception("buffer is empty!");
return ring[(m_front + i) % m_capacity];
}
bool empty() const {
return m_front == m_rear;
}
bool full() const {
return size() == capacity();
}
void push(const T& value) {
if (full())
throw std::exception("buffer is full!");
new (&ring[m_rear % m_capacity]) T(value);
std::cout << "\n" << "pushed " << value << " to buffer";
++m_rear;
}
void push(T&& value) {
if (full())
throw std::exception("buffer is full!");
new (&ring[m_rear % m_capacity]) T(std::move(value));
std::cout << "\n" << "pushed " << value << " to buffer";
++m_rear;
}
void pop() {
if (empty())
throw std::exception("buffer is empty!");
T value = front();
ring[m_front % m_capacity].~T();
std::cout << "\n" << "popped " << value << " off buffer";
++m_front;
}
int capacity() const{
return m_capacity;
}
int size() const {
return (m_rear - m_front);
}
void print() {
for (int i{ 0 };i < size();++i) {
std::cout << "\n" << "ring[" << i << "] = " << (*this)[i];
}
}
};
}
Running the above code-snippet, we find that there are several occassions when there are buffer overflows or underflows, and there are also data races.
Building a thread-safe queue using condition variables
You essentially have three groups of operations : those that query the state of the whole queue(empty()
and size()
), those that query the elements of the queue(front()
and back()
) and those that modify the queue (push()
, pop()
and emplace()
). This is the same as we’ve seen for the stack container adapter, and we have the same issues regarding race conditions inherent in the interface. Consequently, we need to combine front()
and pop()
into a single function call, much as you combined pop()
and top()
for the stack.
namespace dev {
template<typename T>
class threadsafe_ring_buffer {
public:
threadsafe_ring_buffer() : m_ring_buffer{ring_buffer<T>()}
{}
threadsafe_ring_buffer(int capacity) : m_ring_buffer{ring_buffer<T>(capacity)}
{}
threadsafe_ring_buffer(const threadsafe_ring_buffer<T>& other)
{
std::shared_lock<std::shared_mutex> lck(mtx);
m_ring_buffer = other.m_ring_buffer;
}
threadsafe_ring_buffer<T>& operator=(const threadsafe_ring_buffer<T>& other) = delete;
void wait_and_push(T new_value) {
std::unique_lock<std::shared_mutex> lck(mtx);
queue_not_full_cond.wait(lck, [this]() {return !m_ring_buffer.full();});
m_ring_buffer.push(new_value);
queue_not_empty_cond.notify_one();
}
bool try_push(T new_value) {
std::unique_lock<std::shared_mutex> lck(mtx);
if (!m_ring_buffer.full())
return false;
m_ring_buffer.push(new_value);
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::shared_mutex> lck(mtx);
queue_not_empty_cond.wait(lck, [this]() { return !m_ring_buffer.empty();});
value = m_ring_buffer.front();
m_ring_buffer.pop();
queue_not_full_cond.notify_one();
}
void try_pop(T& value) {
std::unique_lock<std::shared_mutex> lck(mtx);
if (m_ring_buffer.empty())
return false;
value = m_ring_buffer.front();
m_ring_buffer.pop();
return true;
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::shared_mutex> lck(mtx);
queue_not_empty_cond.wait([this]() { return !empty(); });
std::shared_ptr<T> result{ std::make_shared<T>(m_ring_buffer.front()) };
m_ring_buffer.pop();
return result;
}
std::shared_ptr<T> try_pop() {
std::unique_lock<std::shared_mutex> lck(mtx);
if (m_ring_buffer.empty())
return nullptr;
std::shared_ptr<T> result{ std::make_shared<T>(m_ring_buffer.front()) };
m_ring_buffer.pop();
return result;
}
bool empty() const {
std::shared_lock<std::shared_mutex> lck(mtx);
return m_ring_buffer.empty();
}
bool full() const {
std::shared_lock<std::shared_mutex> lck(mtx);
return m_ring_buffer.full();
}
int size() const {
std::shared_lock<std::shared_mutex> lck(mtx);
return m_ring_buffer.size();
}
int capacity() const {
std::shared_lock<std::shared_mutex> lck(mtx);
return m_ring_buffer.capacity();
}
private:
ring_buffer<T> m_ring_buffer;
mutable std::shared_mutex mtx;
std::condition_variable_any queue_not_empty_cond;
std::condition_variable_any queue_not_full_cond;
};
}
int main()
{
dev::threadsafe_ring_buffer<int> buffer(64);
std::thread producer(
[&]() {
for (int i{ 1 };i <= 1000;++i)
{
try {
buffer.wait_and_push(i);
}
catch (std::exception e) {
std::cout << "\n" << "buffer full!";
}
}
}
);
std::thread consumer(
[&]() {
for (int i{ 1 };i <= 1000;++i)
{
try {
int value;
buffer.wait_and_pop(value);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
catch (std::exception e) {
std::cout << "\n" << "buffer empty!";
}
}
}
);
producer.join();
consumer.join();
std::cout << "\nFinished execution";
}
Semaphores
C++20 introduces new synchronization primitives to write multi-threaded applications .
A semaphore is a counter that manages the numberof permits available for accessing a share resource. Semaphores can be classified into two main types:
- Binary Semaphore. It has only \(2\) states: \(0\) and \(1\). Event though a binary semaphore is conceptually like a mutex, there are some differences between a binary semaphore and a mutex, that we’ll explore later.
- Counting Semaphore. It can have a value greater than \(1\) and is used to control access to a resource that has a limited number of instances.
C++20 implements both binary and counting semaphores.
Binary Semaphores
A binary semaphore is a synchronization primitive that can be used to control access to a shared resource. It has two states: \(0\) and \(1\). A semaphore with a value of \(0\) indicates that the resource is unavailable, while a semaphore with a value of \(1\) indicates that the resource is available.
The most significant difference between mutexes and semaphores is that threads that have acquired a mutex have exclusive ownership of it. Only the thread owning the mutex can release it. Semaphores can be signaled by any thread. A mutex is a locking mechanism for a critical section, whereas a semaphore is more like a signaling mechanism. For this reason, semaphores are commonly used for signaling rather than for mutual exlusion.
In C++20, std::binary_semaphore
is an alias for the specialization of std::counting_semaphore
with LeastMaxValue
being \(1\).
Binary semaphores must be initialized with either \(1\) or \(0\) as follows:
If the initial value is 0
, acquiring the semaphore will block the thread trying to acquire it, and before it can be acquired, it must be released by another thread. Acquiring the semaphore decreases the counter, and releasing it increases the counter.
Counting semaphores
A counting semaphore allows access to a shared resource by more than one thread. The counter can be initialized to an arbitrary number, and it will be decreased every time a thread acquires the semaphore.
We can design a thread-safe queue using semaphores instead of condition variables to synchronize access to the queue.
We code up an unbounded queue
implemented as a circular queue with doubling.
#include <iostream>
#include <shared_mutex>
#include <semaphore>
namespace dev {
/*
A queue implements a first-in-first-out data-structure allowing
enqueuing (adding) items to the rear and dequeuing(removing)
them from the front.
My implementation uses a circular queue with doubling - the simplest
and reasonably efficient choice.
The interface design conforms to the standard library std::queue<T>
specification.
*/
template<typename T>
class queue {
private:
enum{min_capacity = 8};
T* m_ring_buffer;
int m_front;
int m_rear;
int m_capacity;
public:
using value_type = T;
using reference = T&;
/* Constructors */
// Default constructor
queue() : queue(min_capacity){}
// Parametrized Constructor
queue(int capacity)
: m_ring_buffer{ nullptr }
, m_front{ 0 }
, m_rear{ 0 }
, m_capacity{ 0 }
{
m_ring_buffer = static_cast<T*>(operator new(capacity * sizeof(T)));
m_capacity = capacity;
}
/* Destructor */
~queue() {
clear();
operator delete(m_ring_buffer);
}
/* Copy constructor
* Perform a deep-copy of the contents of the queue
*/
queue(const queue& other)
: queue(other.m_capacity) // Allocation step
{
/* Call the copy constructor of T
* placing it directly into the pre-allocated
* storage at memory address &m_ring_buffer[i]
*/
for (int i{ 0 };i < other.size();++i) {
new (&m_ring_buffer[i]) T(other[i]);
++m_rear;
}
}
/* Swap the contents of lhs and rhs member-by-member */
friend void swap(queue& lhs, queue& rhs) {
std::swap(lhs.m_ring_buffer, rhs.m_ring_buffer);
std::swap(lhs.m_front, rhs.m_front);
std::swap(lhs.m_rear, rhs.m_rear);
std::swap(lhs.m_capacity, rhs.m_capacity);
}
/* Copy-assignment */
queue& operator=(const queue& other) {
queue temp{ other }; // Copy-construct
swap(*this, temp); // and swap idiom
return (*this);
}
/* Move constructor */
queue(queue&& other)
: m_ring_buffer{ std::move(other.m_ring_buffer) }
, m_front{ std::move(other.m_front) }
, m_rear{ std::move(other.m_rear) }
, m_capacity{ std::move(other.m_capacity) }
{
other.m_ring_buffer = nullptr;
}
/* Move assignment */
queue& operator=(queue&& other) {
queue temp{ std::move(other) };
std::swap(*this, temp);
return (*this);
}
/* Capacity*/
/* Checks whether the queue is empty */
bool empty()
{
return (m_rear == m_front);
}
bool full() {
return (m_rear == m_front + m_capacity);
}
/* Returns the number of elements in the queue*/
int size() const {
return (m_rear - m_front);
}
int capacity() const {
return m_capacity;
}
/* Modifiers */
/* Double the capacity of the queue */
void resize()
{
std::cout << "\n" << "Resizing the queue from " << m_capacity
<< " to " << 2 * m_capacity << " elements.";
// 1. Allocation step
T* new_array = static_cast<T*>(operator new(2 * m_capacity * sizeof(T)));
std::cout << "\n" << "Allocation complete.";
// 2. Copy over the elements of the queue to the newly
// allocated storage.
int new_size = size();
for (int i{ 0 };i < new_size;++i) {
new (&new_array[i]) T(std::move(m_ring_buffer[(i + m_front) % m_capacity]));
}
std::cout << "\n" << "Copy-construction complete";
// 3. Destroy the old array
clear();
operator delete(m_ring_buffer);
std::cout << "\n" << "Destruction of old array complete";
// Re-wire m_ring_buffer, set front, rear and capacity
m_ring_buffer = new_array;
m_front = 0;
m_rear = new_size;
m_capacity *= 2;
}
/* Push the given value to the end of the queue */
void push(const T& value) {
std::cout << "\n" << "Pushing " << value << " to the queue";
if (full())
resize();
new (&m_ring_buffer[(m_rear % m_capacity)]) T(value);
std::cout << "\n" << "Pushed " << value << " to the queue";
m_rear++;
}
void push(T&& value) {
std::cout << "\n" << "Pushing " << value << " to the queue";
if (full())
resize();
new(&m_ring_buffer[(m_rear % m_capacity)]) T(std::move(value));
m_rear++;
}
/* Removes an element from the front of the queue */
void pop() {
m_ring_buffer[m_front % m_capacity].~T();
m_front++;
}
/* Element access */
T& operator[](int i) {
return m_ring_buffer[(m_front + i) % m_capacity];
}
T& operator[](int i) const {
return m_ring_buffer[(m_front + i) % m_capacity];
}
/* Returns a reference to the first element in the queue */
T& front() {
return m_ring_buffer[m_front % m_capacity];
}
/* Return a reference to the last element in the queue */
T& back() {
return m_ring_buffer[(m_rear - 1) % m_capacity];
}
private:
/* Helper function to clear the queue */
void clear() {
for (int i{ 0 }; i < size(); ++i) {
m_ring_buffer[(m_front + i) % m_capacity].~T();
}
}
};
}
int main()
{
dev::queue<double> q;
std::cout << "\n" << "Queue capacity = " << q.capacity();
for (double i{ 1.0 };i <= 10.0;i++)
q.push(i);
std::cout << "\n" << "Queue capacity = " << q.capacity();
}
Next, we code up the semaphore_queue
class that uses semaphores instead of condition variables as the synchronization mechanism.
SPMC queues and coding up a ThreadPool
A thread-pool is a group of pre-instantiated, idle threads which stand ready to be given work. These are preferred over instantiating new threads for each task whenever there are a large number of short tasks to be done rather than a small number of long ones. This prevents having to incur the overhead of creating a thread a large number of times and starvation of threads.
The ThreadPool
class has a container for the worker threads as one of its member-variables.
When a thread-pool is handed a Task
, it is added to a SPMC queue. If a thread from the ThreadPool
is idle, it can pop()
the next task off the TaskQueue
and executes it. Once the execution is complete, the thread hands itself back to the pool to be put into the container for reuse and until the queue_not_empty_cond
condition is met, and the cycle repeats.