Coroutines
You’ve likely heard about this new C++20 feature, coroutines. I think that this is a really important subject and there are several cool use-cases for coroutines. A coroutine in the simplest terms is just a function that you can pause in the middle. Imagine that you are executing a function top-down and then just at some point in between, you’d like to say, I am going to hit the pause button here; going to return the control back to the caller. At a later point the caller will decide to resume the execution of the function right where you left off. Unlike a function therefore, coroutines are always stateful - you atleast need to remember where you left off in the function body. Usually, you also need to remember more than this; you need to remember the values of all of the local variables were at the point where you paused. As a consequence, you can think of the coroutine you are calling not as a function in the classical sense of the term, but more like a factory function that actually returns the coroutine object back to you. And this coroutine object is the one that holds all the state, and which you can resume to continue the computation at a future time.
Coroutines can simplify our code! Coroutines are a great tool, when it comes to implementing parsers.
Compared to functions, the control flow of coroutines looks as follows:
Show the code
%%itikz --temp-dir --tex-packages=tikz --tikz-libraries=arrows.meta --implicit-standalone
\begin{tikzpicture}[scale=1.5]
\draw[blue, rounded corners] (0,0) rectangle (3,5);
\node(A) at (1.5,4.75) {Caller};
\node(B) at (1.5,3.50) {};
\node(C) at (1.5,3.20) {};
\node(call) at (2.20,4.20) {call};
\node(D) at (1.5,0.20) {};
\node(F) at (5.5,4.75) {Function};
\node(Func) at (5.55,4.50) {};
\node(Return) at (5.55,0.25) {};
\node(return_back) at (6.2,0.50) {\texttt{return}};
\draw[dashed, -{Stealth[length=3mm]}] (A) -- (B);
\draw[dashed, -{Stealth[length=3mm]}] (B) -- (Func);
\draw[dashed, -{Stealth[length=3mm]}] (Func) -- (Return);
\draw[dashed, -{Stealth[length=3mm]}] (Return) -- (C);
\draw[dashed, -{Stealth[length=3mm]}] (C) -- (D);
\draw[blue, rounded corners] (4,0) rectangle (7,5);
\end{tikzpicture}Fig. Functions - control flow
Show the code
%%itikz --temp-dir --tex-packages=tikz --tikz-libraries=arrows.meta --implicit-standalone
\begin{tikzpicture}[scale=1.5]
\draw[blue, rounded corners] (0,0) rectangle (3,7);
\node(F) at (5.5,6.75) {Coroutine};
\node(A) at (1.5,6.75) {Caller};
\node(B) at (1.5,5.50) {};
\node(coro_A) at (5.55,6.50) {};
\node(coro_B) at (5.55,5.20) {};
\node(suspend_1) at (4.75,5.70) {suspend};
\node(coyield_1) at (6.50,5.25) {\texttt{co\_yield}};
\node(call_1) at (2.20,6.20) {call};
\node(C) at (1.5,5.20) {};
\node(D) at (1.5,3.70) {};
\node(coro_C) at (5.55,3.70) {};
\node(resume_1) at (4.75,4.10) {resume};
\node(coro_D) at (5.55,2.20) {};
\node(suspend_2) at (4.75,2.70) {suspend};
\node(coyield_2) at (6.50,2.25) {\texttt{co\_yield}};
\node(E) at (1.5, 2.20) {};
\node(F) at (1.5, 1.25) {};
\node(resume_2) at (4.75,1.55) {resume};
\node(coro_F) at (5.55, 1.25) {};
\node(coro_G) at (5.55, 0.25) {};
\node(return_back) at (6.2,0.25) {\texttt{co\_return}};
\node(H) at (1.5, 0.25) {};
\draw[dashed, -{Stealth[length=3mm]}] (A) -- (B);
\draw[dashed, -{Stealth[length=3mm]}] (B) -- (coro_A);
\draw[dashed, -{Stealth[length=3mm]}] (coro_A) -- (coro_B);
\draw[dashed, -{Stealth[length=3mm]}] (coro_B) -- (C);
\draw[dashed, -{Stealth[length=3mm]}] (C) -- (D);
\draw[dashed, -{Stealth[length=3mm]}] (D) -- (coro_C);
\draw[dashed, -{Stealth[length=3mm]}] (coro_C) -- (coro_D);
\draw[dashed, -{Stealth[length=3mm]}] (coro_D) -- (E);
\draw[dashed, -{Stealth[length=3mm]}] (E) -- (F);
\draw[dashed, -{Stealth[length=3mm]}] (F) -- (coro_F);
\draw[dashed, -{Stealth[length=3mm]}] (coro_F) -- (coro_G);
\draw[dashed, -{Stealth[length=3mm]}] (coro_G) -- (H);
\draw[blue, rounded corners] (4,0) rectangle (7,7);
\end{tikzpicture}Fig. Coroutines - control flow
Coroutine frames (state of all the local variables at the point where you paused) may be stored on the stack or on the dynamic heap. C++ implements the latter approach - stackless coroutines.
When using coroutines, we are talking about cooperative multi-tasking. As a quick 101, in preemptive multitasking, a special hardware timer interrupt is sent to the CPU periodically, the register state of the currently running process is saved to the main memory (in a data-structure called the process context) and the OS kernel acquires control of the CPU. The OS scheduler uses a scheduling policy such as round-robin to schedule another process from the process queue, copies its context from main memory to the CPU registers and transfers control to it. This is called a context switch. In cooperative multitasking, each process/thread, once running decides for how long to keep the CPU, and (crucially) when it is time to give it up so that another thread can use it.
A simple example
Let’s start with a simple example. Let’s say, you want to compute the Fibonacci sequence. First thing, that you’d do, is code up a function fibo(int) that returns a sequence in a vector.
// Returns a vector containing the
// first n elements of the Fibonacci
// series:
// 1, 1, 2, 3, 5, 8, 13, 21, ...
std::vector<int> fibo(int n)
{
std::vector<int> results{};
int a_prev{1}, a_curr{1};
int a_next{0};
results.push_back(a_prev);
results.push_back(a_curr);
for(int i=2;i<=n;++i)
{
a_next = a_prev + a_curr;
results.push_back(a_next);
a_prev = a_curr;
a_curr = a_next;
}
return results;
}This naive implementation has a number of disadvantages. For example, this function will always require \(O(n)\) storage. Fibonacci has a simple recursive equation, but if I have a complex computation, and I am only ever processing the numbers sequentially one at a time, then it makes no sense to pay for the entire \(O(n)\) storage. Another problem is that, if you are dealing with an infinite range, then it becomes a lot harder to handle.
A different way of implementing this would be as follows. Instead of our function returning a range in a vector, we are just going to return a generator object; this generator object has then a next() member function and then every call to the next() member function gives us back the next number in the sequence.
struct FiboGenerator
{
// Successive calls to next()
// return the numbers from the
// Fibonacci series
int next();
}
// Returns a new FiboGenerator object
// that will start from the first
// Fibonacci number
FiboGenerator makeFiboGenerator();The interesting question is: Is makeFiboGenerator() a coroutine? And we really can’t tell - it’s an implementation detail. All we see is an interface. This is actually the most important thing about coroutines. From the outside, they just look and behave like functions. From the outside, there is no way to tell, whether they’ve been implemented using a coroutine or not. The only thing that we need to know as the user of this function is the interface of the object FiboGenerator.
The coroutine return type
The initial call to the coroutine function will produce this return object of a certain ReturnType and hand it back to the caller. The interface of this type is what is going to determine what the coroutine is capable of. Since coroutines are super-flexible, we can do a whole lot with this return object. If you have some coroutine, and you want to understand what it’s doing, the first thing you should look at is the ReturnType, and what it’s interface is. The important thing here is, we design this ReturnType. If you are writing a coroutine, you can decide, what goes into this interface.
How to turn a function into a coroutine?
The compiler looks for one of the three keywords in the implementation: co_yield, co_await and co_return.
| Keyword | Action | State |
|---|---|---|
co_yield |
Output | Suspended |
co_return |
Output | Ended |
co_await |
Input | Suspended |
In the preceding table, we see that after co_yield and co_await, the coroutine suspends itself and after co_return, it is terminated (co_return is the equivalent of the return statement in the C++ function).
A classical function starts executing when it’s called and normally terminates with a return statement or just when the function’s end is reached. A function runs from the beginning to end. It may call another function (or even itself if it is recursive) and it may throw exceptions or have different return points. But it always runs from the beginning to the end.
A coroutine is different. A coroutine is a function that can suspend itself. The flow for a coroutine may be like the following pseudo-code:
ReturnType coroutine(){
do_something();
co_yield;
do_something_else();
co_yield;
do_more_work();
co_return;
}You should think of the coroutine function not as a function in the classical sense, but rather as a factory function that returns a coroutine object and this coroutine object is the one that holds all the state, and one which you can resume to continue the computation later on.
Use-cases for coroutines
Asynchronous computation. Suppose we are tasked with designing a simple echo server. We listen for incoming data from a client socket and we simply send it back to the client. At some point in our code for the echo server, we will have a piece of logic like below:
void session(Socket sock){
char buffer[1024];
int len = sock.read({buffer});
sock.write({buffer,len});
log(buffer);
}We create a buffer, then call the read that reads packets from the NIC and stores them in the buffer. The return value len will tell us how many bytes we read. Then, we shrink the buffer - we create a smaller one with len bytes and we write it back. Finally, we perform some logging. If we run it, it will probably work, but we certainly don’t want to write a server like this. Say one of the clients requests communication and we are in a session. They say, they are ready to send the data, so we are blocking on the read, but maybe they send us this data in 2 minutes, or 5 minutes or even more. And other clients keep waiting.
What we could do is start this session and run it in a separate thread. And this would work, because if this thread is blocked, and if there are other threads in this server, they would run in exchange during the wait time. If we have a threadpool, we’ll just grab a thread from the threadpool, tell whatever system manages the pool to run this session(Socket) function there.
This solution is still far from ideal. These are operating system emulated threads and the OS will have to manage which threads get CPU time. It will have to preempt one thread, run another. This preemption will occur at random moments, when we least expect it. We could have data-races and likely require protection of a critical section/resource using mutexes.
One alternative is to use an asynchronous framework and rewrite our code as follows:
void session(Socket sock){
struct State{ Socket sock; char buffer[1024]; };
// Heap allocate the state
auto state = std::make_shared<State>(sock, buffer);
auto on_read_finished_callback = /* ... */
// Perform an asynchronous read
state->socket.async_read( state->buffer,
on_read_finished_callback );
}The async_read() call is a request to the framework, which says that we are interested in data being read to our buffer at some point convenient to the client. When this succeeds, call our on_read_finished_callback. So, we are just making this association at this point and the server can move on to doing other stuff.
The read operation may fail for a number of reasons, so a good approach would be supply two arguments to the on_read_finished_callback: (i) An error code (ii) The number of bytes received, if there were no errors. We can try to implement it, by checking the error-code.
void session(Socket sock){
struct State{ Socket sock; char buffer[1024]; };
// Heap allocate the state
auto state = std::make_shared<State>(sock, buffer);
auto on_read_finished_callback = [&state](
error_code ec,
size_t len
)
{
auto done = /* ... */
if(!ec)
{
// Perform an asynchronous write
state->socket.async_write(
state->buffer,
done
);
}
}
// Perform an asynchronous read
state->socket.async_read( state->buffer,
on_read_finished_callback );
}