Producer-consumer problem
In the producer-consumer problem, we have two classes of threads, producers and consumers and a buffer containing a fixed number of slots. A producer thread attempts to put something into the next empty buffer slot, a consumer thread attempts to take something out of the next occupied buffer slot. The synchronization conditions are that producers cannot proceed unless there are empty slots and consumers cannot proceed unless there are occupied slots. The problem occurs because of the different rates at which producers deposit and consumers exhaust data.
This is a classic, but frequently occurring synchronization problem. For example, the heart of the implementation of UNIX pipes is an instance of this problem.
Ring buffer
Consider a single, fixed-size buffer as if it were connected end-to-end, such that the oldest entry is processed first. This is a circular FIFO queue.
What do we use SPSC FIFO queues for? In the industry, you often have a pipeline of processes. For example, you have one thread reading from sockets, another thread that handles the messages from the sockets and maybe processes them and produces a result and a third thread writes a response to the network. Those can be connected by SPSC FIFO queues. There’s a couple of advantages to this. All these advantages and disadvantages are subject to measurement, so always measure. It may improve the throughput over just a single thread doing all \(3\) of these operations, in fact, I’ll be surprised if it didn’t. It also should improve the resiliency of the application to spikes in message traffic. Some of the disadvantages are that you have to manage 3 threads and it probably uses more memory, because each of the FIFO queues needs place to store its messages.
Basic functionalities to expect from a thread-safe queue
#include <gtest/gtest.h>
#include <thread>
#include <vector>
#include "threadsafe_queue.h"
// Test default constructor
TEST(ThreadSafeQueueTest, DefaultConstructorTest) {
dev::threadsafe_queue<int> queue;
EXPECT_EQ(queue.empty(), true);
EXPECT_EQ(queue.size(), 0);
}
// Test push and front
TEST(ThreadSafeQueueTest, PushAndFrontTest) {
dev::threadsafe_queue<int> queue;
queue.push(42);
EXPECT_EQ(queue.front(), 42);
EXPECT_EQ(queue.size(), 1);
EXPECT_EQ(queue.empty(), false);
}
// Test push and back
TEST(ThreadSafeQueueTest, PushAndBackTest) {
dev::threadsafe_queue<int> queue;
queue.push(10);
queue.push(20);
EXPECT_EQ(queue.back(), 20);
EXPECT_EQ(queue.size(), 2);
}
// Test try_pop (non-blocking)
TEST(ThreadSafeQueueTest, TryPopTest) {
dev::threadsafe_queue<int> queue;
queue.push(42);
auto item = queue.try_pop();
EXPECT_TRUE(item.has_value());
EXPECT_EQ(item.value(), 42);
EXPECT_EQ(queue.empty(), true);
}
// Test pop (blocking)
TEST(ThreadSafeQueueTest, BlockingPopTest) {
dev::threadsafe_queue<int> queue;
std::thread producer([&queue]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
queue.push(42);
});
auto item = queue.pop();
EXPECT_EQ(item, 42);
EXPECT_EQ(queue.empty(), true);
producer.join();
}
// Test emplace
TEST(ThreadSafeQueueTest, EmplaceTest) {
struct Point {
int x, y;
Point(int a, int b) : x(a), y(b) {}
};
dev::threadsafe_queue<Point> queue;
queue.emplace(1, 2);
EXPECT_EQ(queue.front().x, 1);
EXPECT_EQ(queue.front().y, 2);
}
// Test thread safety with multiple producers and consumers
TEST(ThreadSafeQueueTest, MultiThreadedTest) {
dev::threadsafe_queue<int> queue;
const int num_items = 100;
std::thread producer1([&queue]() {
for (int i = 0; i < num_items; ++i) {
queue.push(i);
}
});
std::thread producer2([&queue]() {
for (int i = num_items; i < 2 * num_items; ++i) {
queue.push(i);
}
});
std::vector<int> consumed_items;
std::mutex mtx;
std::thread consumer1([&queue, &consumed_items, &mtx]() {
for (int i = 0; i < num_items; ++i) {
{
std::unique_lock<std::mutex> unique_lck(mtx);
consumed_items.push_back(queue.pop());
}
}
});
std::thread consumer2([&queue, &consumed_items, &mtx]() {
for (int i = 0; i < num_items; ++i) {
{
std::unique_lock<std::mutex> unique_lck(mtx);
consumed_items.push_back(queue.pop());
}
}
});
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
EXPECT_EQ(consumed_items.size(), 2 * num_items);
EXPECT_EQ(queue.empty(), true);
}
// Test empty queue behavior
TEST(ThreadSafeQueueTest, EmptyQueueTest) {
dev::threadsafe_queue<int> queue;
EXPECT_EQ(queue.empty(), true);
EXPECT_EQ(queue.size(), 0);
auto item = queue.try_pop();
EXPECT_FALSE(item.has_value());
}
// Test copy constructor
TEST(ThreadSafeQueueTest, CopyConstructorTest) {
dev::threadsafe_queue<int> queue1;
queue1.push(42);
queue1.push(17);
dev::threadsafe_queue<int> queue2(queue1);
EXPECT_EQ(queue2.size(), 2);
EXPECT_EQ(queue2.front(), 42);
EXPECT_EQ(queue2.back(), 17);
}
// Test size
TEST(ThreadSafeQueueTest, SizeTest) {
dev::threadsafe_queue<int> queue;
queue.push(1);
queue.push(2);
queue.push(3);
EXPECT_EQ(queue.size(), 3);
}
Basic threadsafe_queue
implementation
// Ref: Asynchronous Programming with C++
// Javier Reguara Salgado, Juan Antonio Rufes
#include <iostream>
#include <shared_mutex>
#include <queue>
#include <condition_variable>
#include <optional>
#include <type_traits>
namespace dev{
template<typename T>
class threadsafe_queue{
private:
std::queue<T> m_queue;
mutable std::mutex m_mutex;
std::condition_variable not_empty_;
public:
using value_type = T;
using reference = T&;
using const_reference = const T&;
threadsafe_queue() = default;
threadsafe_queue(const threadsafe_queue& other){
std::unique_lock<std::mutex> unique_lck(other.m_mutex);
m_queue = other.m_queue;
}
threadsafe_queue& operator=(const threadsafe_queue&) = delete;
value_type front(){
std::unique_lock<std::mutex> unique_lck(m_mutex);
return m_queue.front();
}
value_type back(){
std::unique_lock<std::mutex> unique_lck(m_mutex);
return m_queue.back();
}
bool empty(){
std::unique_lock<std::mutex> unique_lck(m_mutex);
return m_queue.empty();
}
std::size_t size(){
std::unique_lock<std::mutex> unique_lck(m_mutex);
return m_queue.size();
}
// non-blocking
bool try_push(const_reference item){
std::unique_lock<std::mutex> unique_lck(m_mutex, std::try_to_lock);
if(!unique_lck)
return false;
if constexpr(std::is_nothrow_move_constructible_v<T>){
m_queue.push(std::move(item));
}else{
m_queue.push(item);
}
unique_lck.unlock();
not_empty_.notify_one();
return true;
}
// blocking
void push(const_reference item){
std::unique_lock<std::mutex> unique_lck(m_mutex);
m_queue.push(item);
unique_lck.unlock();
not_empty_.notify_one();
}
// non-blocking
std::optional<T> try_pop()
{
std::unique_lock<std::mutex> unique_lck(m_mutex, std::try_to_lock);
if(!unique_lck || m_queue.empty())
return std::nullopt;
value_type item;
if constexpr(std::is_nothrow_move_assignable_v<T>){
item = std::move(m_queue.front());
}
else{
item = m_queue.front();
}
m_queue.pop();
return item;
}
// blocking
value_type pop(){
std::unique_lock<std::mutex> unique_lck(m_mutex);
not_empty_.wait(unique_lck, [this](){ return !m_queue.empty(); });
value_type item;
if constexpr(std::is_nothrow_move_assignable_v<T>){
item = std::move(m_queue.front());
}
else{
item = m_queue.front();
}
m_queue.pop();
return item;
}
// blocking
template<typename... Args>
void emplace(Args&&... args){
std::unique_lock<std::mutex> unique_lck(m_mutex);
m_queue.emplace(std::forward<Args>(args)...);
unique_lck.unlock();
not_empty_.notify_one();
}
};
}
It would be The use of a mutex to protect the entire stack data-structure limits the concurrency supported by this queue; although multiple threads might be blocked on the queue in various member functions, only one thread can be doing any work at the time. This restriction comes from the use of std::queue<T>
in the implementation. If we write a detailed implementation of the data-structure, we can provide more fine-grained locking and allow a higher level of concurrency.
Semaphores
C++20 introduces new synchronization primitives to write multi-threaded applications .
A semaphore is a counter that manages the numberof permits available for accessing a share resource. Semaphores can be classified into two main types:
- Binary Semaphore. It has only \(2\) states: \(0\) and \(1\). Event though a binary semaphore is conceptually like a mutex, there are some differences between a binary semaphore and a mutex, that we’ll explore later.
- Counting Semaphore. It can have a value greater than \(1\) and is used to control access to a resource that has a limited number of instances.
C++20 implements both binary and counting semaphores.
Binary Semaphores
A binary semaphore is a synchronization primitive that can be used to control access to a shared resource. It has two states: \(0\) and \(1\). A semaphore with a value of \(0\) indicates that the resource is unavailable, while a semaphore with a value of \(1\) indicates that the resource is available.
The most significant difference between mutexes and semaphores is that threads that have acquired a mutex have exclusive ownership of it. Only the thread owning the mutex can release it. Semaphores can be signaled by any thread. A mutex is a locking mechanism for a critical section, whereas a semaphore is more like a signaling mechanism. For this reason, semaphores are commonly used for signaling rather than for mutual exlusion.
In C++20, std::binary_semaphore
is an alias for the specialization of std::counting_semaphore
with LeastMaxValue
being \(1\).
Binary semaphores must be initialized with either \(1\) or \(0\) as follows:
If the initial value is 0
, acquiring the semaphore will block the thread trying to acquire it, and before it can be acquired, it must be released by another thread. Acquiring the semaphore decreases the counter, and releasing it increases the counter.
Counting semaphores
A counting semaphore allows access to a shared resource by more than one thread. The counter can be initialized to an arbitrary number, and it will be decreased every time a thread acquires the semaphore.
We can design a thread-safe queue using semaphores instead of condition variables to synchronize access to the queue.
We code up an unbounded queue
implemented as a circular queue with doubling.
#include <iostream>
#include <shared_mutex>
#include <semaphore>
namespace dev {
/*
A queue implements a first-in-first-out data-structure allowing
enqueuing (adding) items to the rear and dequeuing(removing)
them from the front.
My implementation uses a circular queue with doubling - the simplest
and reasonably efficient choice.
The interface design conforms to the standard library std::queue<T>
specification.
*/
template<typename T>
class queue {
private:
enum{min_capacity = 8};
T* m_ring_buffer;
int m_front;
int m_rear;
int m_capacity;
public:
using value_type = T;
using reference = T&;
/* Constructors */
// Default constructor
queue() : queue(min_capacity){}
// Parametrized Constructor
queue(int capacity)
: m_ring_buffer{ nullptr }
, m_front{ 0 }
, m_rear{ 0 }
, m_capacity{ 0 }
{
m_ring_buffer = static_cast<T*>(operator new(capacity * sizeof(T)));
m_capacity = capacity;
}
/* Destructor */
~queue() {
clear();
operator delete(m_ring_buffer);
}
/* Copy constructor
* Perform a deep-copy of the contents of the queue
*/
queue(const queue& other)
: queue(other.m_capacity) // Allocation step
{
/* Call the copy constructor of T
* placing it directly into the pre-allocated
* storage at memory address &m_ring_buffer[i]
*/
for (int i{ 0 };i < other.size();++i) {
new (&m_ring_buffer[i]) T(other[i]);
++m_rear;
}
}
/* Swap the contents of lhs and rhs member-by-member */
friend void swap(queue& lhs, queue& rhs) {
std::swap(lhs.m_ring_buffer, rhs.m_ring_buffer);
std::swap(lhs.m_front, rhs.m_front);
std::swap(lhs.m_rear, rhs.m_rear);
std::swap(lhs.m_capacity, rhs.m_capacity);
}
/* Copy-assignment */
queue& operator=(const queue& other) {
queue temp{ other }; // Copy-construct
swap(*this, temp); // and swap idiom
return (*this);
}
/* Move constructor */
queue(queue&& other)
: m_ring_buffer{ std::move(other.m_ring_buffer) }
, m_front{ std::move(other.m_front) }
, m_rear{ std::move(other.m_rear) }
, m_capacity{ std::move(other.m_capacity) }
{
other.m_ring_buffer = nullptr;
}
/* Move assignment */
queue& operator=(queue&& other) {
queue temp{ std::move(other) };
std::swap(*this, temp);
return (*this);
}
/* Capacity*/
/* Checks whether the queue is empty */
bool empty()
{
return (m_rear == m_front);
}
bool full() {
return (m_rear == m_front + m_capacity);
}
/* Returns the number of elements in the queue*/
int size() const {
return (m_rear - m_front);
}
int capacity() const {
return m_capacity;
}
/* Modifiers */
/* Double the capacity of the queue */
void resize()
{
std::cout << "\n" << "Resizing the queue from " << m_capacity
<< " to " << 2 * m_capacity << " elements.";
// 1. Allocation step
T* new_array = static_cast<T*>(operator new(2 * m_capacity * sizeof(T)));
std::cout << "\n" << "Allocation complete.";
// 2. Copy over the elements of the queue to the newly
// allocated storage.
int new_size = size();
for (int i{ 0 };i < new_size;++i) {
new (&new_array[i]) T(std::move(m_ring_buffer[(i + m_front) % m_capacity]));
}
std::cout << "\n" << "Copy-construction complete";
// 3. Destroy the old array
clear();
operator delete(m_ring_buffer);
std::cout << "\n" << "Destruction of old array complete";
// Re-wire m_ring_buffer, set front, rear and capacity
m_ring_buffer = new_array;
m_front = 0;
m_rear = new_size;
m_capacity *= 2;
}
/* Push the given value to the end of the queue */
void push(const T& value) {
std::cout << "\n" << "Pushing " << value << " to the queue";
if (full())
resize();
new (&m_ring_buffer[(m_rear % m_capacity)]) T(value);
std::cout << "\n" << "Pushed " << value << " to the queue";
m_rear++;
}
void push(T&& value) {
std::cout << "\n" << "Pushing " << value << " to the queue";
if (full())
resize();
new(&m_ring_buffer[(m_rear % m_capacity)]) T(std::move(value));
m_rear++;
}
/* Removes an element from the front of the queue */
void pop() {
m_ring_buffer[m_front % m_capacity].~T();
m_front++;
}
/* Element access */
T& operator[](int i) {
return m_ring_buffer[(m_front + i) % m_capacity];
}
T& operator[](int i) const {
return m_ring_buffer[(m_front + i) % m_capacity];
}
/* Returns a reference to the first element in the queue */
T& front() {
return m_ring_buffer[m_front % m_capacity];
}
/* Return a reference to the last element in the queue */
T& back() {
return m_ring_buffer[(m_rear - 1) % m_capacity];
}
private:
/* Helper function to clear the queue */
void clear() {
for (int i{ 0 }; i < size(); ++i) {
m_ring_buffer[(m_front + i) % m_capacity].~T();
}
}
};
}
int main()
{
dev::queue<double> q;
std::cout << "\n" << "Queue capacity = " << q.capacity();
for (double i{ 1.0 };i <= 10.0;i++)
q.push(i);
std::cout << "\n" << "Queue capacity = " << q.capacity();
}
Next, we code up the semaphore_queue
class that uses semaphores instead of condition variables as the synchronization mechanism.
SPMC queues and coding up a ThreadPool
A thread-pool is a group of pre-instantiated, idle threads which stand ready to be given work. These are preferred over instantiating new threads for each task whenever there are a large number of short tasks to be done rather than a small number of long ones. This prevents having to incur the overhead of creating a thread a large number of times and starvation of threads.
The ThreadPool
class has a container for the worker threads as one of its member-variables.
When a thread-pool is handed a Task
, it is added to a SPMC queue. If a thread from the ThreadPool
is idle, it can pop()
the next task off the TaskQueue
and executes it. Once the execution is complete, the thread hands itself back to the pool to be put into the container for reuse and until the queue_not_empty_cond
condition is met, and the cycle repeats.