DIY asyncio

Python
Author

Quasar

Published

February 28, 2025

Basics

In single-core processors, the machine can only perform one task at a time, but can switch between many tasks many times per second. By doing a bit of one task and then a bit of another and so on, it appears that the tasks are happening concureently. This is called task switching. Because the task switches are so fast, it provides an illusion of concurrency to both the user and the applications.

On a single-core maching doing task switching, chunks from each task are interleaved. But, they are also spaced out a bit; in order to do the interleaving, the operating system has to perform a context switch every time it changes from one task to another, and this takes time. In order to perform a context switch, the OS has to save the CPU state and the instruction pointer for the currently running task, work out which task to switch to, and reload the CPU state for the task being switched to.

Multi-core processors are genuinely capable of running more than one task in parallel. This is called hardware concurrency.

Throughput and Latency

The rate of doing work (operations per second) is called throughput. The response time it takes for a system to process a request is called latency.

Synchronous vs Asynchronous

Synchronous execution is sequential.

def foo():
    print(f"Inside foo.")

def main():
    print(f"Starting work.")
    foo()
    print(f"Finishing work.")

main()
Starting work.
Inside foo.
Finishing work.

In the main() code-path, the call to foo() is a blocking call, the execution jumps to foo() and main() resumes when foo() returns.

Asynchronous(or async) execution refers to execution that doesn't block when invoking subroutines. It is a fire-and-forget technique. Any work package runs separately from the main application thread and notifies the calling thread of its completion, failure or progress.

Usually, such methods return an entity called future or promise that is the representation of an in-progress computation. The calling thread can query for the status of the computation via the returned future or promise and retrieve the result once completed.

Another pattern is to pass a callback function to the asynchronous functional call, which is invoked with the results when the asynchronous function is done processing.

Asynchronous programming is an execllent choice for applications that do extensive network or disk I/O and spend most of their time waiting.

I/O bound vs CPU bound

CPU bound

Programs that are compute-intensive are called CPU bound programs. This could involve numerical optimizations, Monte-Carlo simulations, data-crunching etc.

I/O bound

I/O bound programs spend most of their time doing network or main memory and file I/O operations. Since the CPU and main memory are separate, a bus exists between the two to transfer bits. Similarly, data needs to moved from the NIC to CPU/memory. Even though these physical distances are small, the time taken to transfer the data can waste a few thousand CPU cycles. This is why I/O bound programs show relatively lower CPU utilization than CPU bound programs.

Data race-conditions and thread safety

The most common cause of bugs in concurrent code is a race-condition.

import concurrent.futures
import logging
import time
import concurrent
import threading

class Account:
    def __init__(self):
        self.value = 0

    @property
    def value(self):
        return self._value
    
    @value.setter
    def value(self, x):
        self._value = x
    
    def credit(self, name : str, amount : float):
        logging.info("Thread %s: starting update", name)
        
        # ----- Critical section -----
        local_copy = self.value     
        local_copy += amount
        time.sleep(0.1)
        self.value = local_copy
        # ----- End of critical section -----

        logging.info("Thread %s: finishing update", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    account = Account()
    logging.info("Testing update. Starting value is %d.", account.value)

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(account.credit, index, 100)

    logging.info("Testing update. Ending value is %d", account.value)
06:36:06: Testing update. Starting value is 0.
06:36:06: Thread 0: starting update
06:36:06: Thread 1: starting update
06:36:07: Thread 0: finishing update
06:36:07: Thread 1: finishing update
06:36:07: Testing update. Ending value is 100

The above logic can be made thread-safe by fencing off the critical section using a mutex and enforcing that only a single thread can enter at a time.

Deadlocks

Imagine that you have a toy that comes in two parts, and you need both parts to play with it - a toy drum and a drumstick, for example. Now, imagine that you ave two small children, both of whom like playing with it. If one of them gets both the drum and the drumstick, that child can merrily play the drum until titing of it. If the other child wants to play, they have wait, however sad that makes them. Now, imagine one child has the drum and other has the drumstick. They’re stuck, unless one decides to be nice and let the other play, each will hold on to whatver they have and demand that they be given the other piece, so neither gets to play. This is a deadlock.

Imagine two threads arguing over locks on mutexes: each of a pair of threads needs to lock both of a pair of mutexes to perform some operation, and each thread has one mutex and is waiting for the other. Neither thread can proceed, because each is waiting for the other to release its mutex. This scenario is called deadlock.

import threading
import concurrent
import time

if __name__ == "__main__":
    drum = threading.Lock()
    drumstick = threading.Lock()

    def child1_plays_drums():
        print(f"\nChild-1 waiting for drums")
        drum.acquire()
        print(f"\nChild-1 acquired drums")
        print(f"\nChild-1 waiting for drumstick")
        drumstick.acquire()
        print(f"\nChild-1 is playing drums")

    def child2_plays_drums():
        print(f"\nChild-2 waiting for drumstick")
        drumstick.acquire()
        print(f"\nChild-2 acquired drumstick")
        print(f"\nChild-2 waiting for drums")
        drum.acquire()
        print(f"\nChild-2 acquired drums")
        print(f"\nChild-2 is playing drums")

    t1 = threading.Thread(target=child1_plays_drums)
    t2 = threading.Thread(target=child2_plays_drums)
    
    t1.start()
    t2.start()

    time.sleep(1)

Child-1 waiting for drums
Child-2 waiting for drumstick

Child-2 acquired drumstick

Child-2 waiting for drums

Child-2 acquired drums

Child-2 is playing drums

Mutexes and Semaphores

A mutex is an programming construct that allows only a single thread to access a shared resource or critical section. Once a thread acquires a mutex, all other threads attempting to acquire the same mutex are blocked until the thread releases the mutex.

A semaphore on the hand is used to limit access to a collection of resources. Think of semaphore as having a limited number of permits to give out. If a semaphore has given out all the permits it has, then any new thread that comes along requesting a permit will be blocked till an earlier thread with a permit returns it to the semaphore. A protoypical example is a ConnectionPool that hands out database connects to requesting threads.

A semaphore with a single permit is called a binary semaphore. Semaphores can also be used for signaling among threads. This is an important distinction as it allows threads to cooperatively work towards completing a task. A mutex on the other hand, is strictly limted to serializing access to shared data among competing threads.

When can a semaphore masquerade as a mutex?

A semaphore can potentially act as a mutex if the number of permits it can give is at most \(1\). However, the most important difference is that, the thread that calls acquire() on a mutex must subsequently release() the mutex. A mutex is owned by the thread acquiring it, upto the point the owning thread releases it. Whilst, in the case of a binary semaphore, different threads can call acquire() and release() on the semaphore.

Semaphore for signaling

Another distinction between a semaphore and a mutex is that semaphores can be used for signaling amongst threads. For example, in case of the classical producer-consumer problem, the producer thread can signal the consumer thread by incrementing the semaphore count to indicate to the consumer thread to read items from the queue. Threads can coordinate tasks using semaphores. A mutex, in contrast, only guards access to shared data.

threading module

Data-parallelism can be achieved using multi-threading.

import numpy as np
import threading
import typing

def accumulate(a : np.array, idx : int):
    result = np.sum(a)
    print(f"\nSum of the subarray {idx} = {result}")

if __name__ == "__main__":
    data = np.random.rand(1000000)
    num_chunks = 4
    chunk_size = int(len(data) / num_chunks)
    num_threads = num_chunks

    threads = []
    for i in range(num_threads):
        start = i * chunk_size
        end = start + chunk_size
        thread = threading.Thread(target=accumulate(data[start:end], i))
        threads.append(thread)

    for t in threads:
        t.start()

    for t in threads:
        t.join()

Sum of the subarray 0 = 125132.31791379408

Sum of the subarray 1 = 125003.60839306995

Sum of the subarray 2 = 125042.88767142383

Sum of the subarray 3 = 125115.72197185869

Another way to create threads is subclassing the threading.Thread class.

from threading import Thread
from threading import current_thread

class MyTask(Thread):

    def __init__(self):
        Thread.__init__(self, name="subClassThread", args=(2,3))

    def run(self):
        print(f"{current_thread().name} is executing")

myTask = MyTask()
myTask.start()  # start the thread
myTask.join()   # wait for the thread to complete

The important caveats to remember when subclassing Thread class are:

  • We can only override the run() method and the constructor of the Thread class.
  • Thread.__init__() must be invoked if the subclass chooses to override the constructor.

Daemon Thread

Daemon threads are background threads. When the main thread is about to exit, it cycles through all regular non-daemon threads and waits for them to complete. In the implementation of the threading module, the _shutdown() method iterates through non-daemon threads and invokes join() on each of them. join() is a blocking call, which returns when a thread’s work package is complete.

import threading
import time

def daemon_task():
    while(True):
        print(f"Executing daemon task")
        time.sleep(1)
    print(f"Completed daemon task")

if __name__ == "__main__":
    daemon_thread = threading.Thread(
        target=daemon_task,
        name="daemon thread",
        daemon=True
    )

    daemon_thread.start()
Executing daemon task

Implementation of a thread-safe LIFO stack

import threading
import time
from typing import Any, Optional

class StackFull(Exception):
    pass

class StackEmpty(Exception):
    pass
    
class Stack:
    def __init__(self, maxsize : int = None):
        self._mutex = threading.RLock()
        self.maxsize = maxsize
        self._data = list()

    @property
    def maxsize(self):
        with self._mutex:
            value = self._maxsize

        return value

    @maxsize.setter
    def maxsize(self, value : int):
        with self._mutex:
            self._maxsize = value

    def size(self) -> int:
        with self._mutex:
            size = len(self._data)
        
        return size

    def empty(self) -> bool:
        with self._mutex:
            isEmpty = len(self._data) == 0
        
        return isEmpty

    def full(self) -> bool:
        with self._mutex:
            if(self.maxsize is not None):
                isFull = len(self._data) == self.maxsize
            else:
                isFull = False
        
        return isFull

    def put(
        self,
        item : Any, 
        block : bool = True, 
        timeout : float = -1
    ) -> None:
        self._mutex.acquire(blocking=True,timeout=timeout)
        print(f"\nPushing item {item} to the stack")
        if self.full():
            print("Stack full!")
            self._mutex.release()
            raise StackFull("Stack full!")
        
        self._data.append(item)
        print(f"\nPush complete")
        print(f"stack : {self._data}")
        self._mutex.release()
    
    def put_nowait(self, item:Any):
        self.put(item, block=False)

    def get(self, block : bool = True, timeout : float = -1) -> Any:
        self._mutex.acquire(blocking=block, timeout=timeout)
        print(f"\nPopping from the stack")
        if self.empty():
            print("Stack empty!")
            self._mutex.release()
            raise StackEmpty("Stack empty!")
        
        value = self._data[self.size() - 1]
        del self._data[self.size() - 1]
        print(f"\nPopped item {value} from the stack")
        print(f"stack : {self._data}")
        self._mutex.release()

        return value

    def get_no_wait(self):
        return self.get(block=False)

    def top(self) -> Any:
        self._mutex.acquire()
        if self.empty():
            self._mutex.release()
            print("Stack empty!")
            raise StackEmpty("Stack empty!")  

        value = self._data[self.size() - 1]
        self._mutex.release()
        return value

def push_thread(stack : Stack):
    
    for i in range(10):
        try:
            stack.put(i)
            time.sleep(0.1)
        except Exception:
            pass

def pop_thread(stack: Stack):
    for i in range(10):
        try:
            item = stack.get()
            time.sleep(0.12)
        except Exception:
            pass

if __name__ == "__main__":
    stack = Stack()
    
    t1 = threading.Thread(target=push_thread, args=(stack,))
    t2 = threading.Thread(target=pop_thread, args=(stack,))
    
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    print("main() thread finished.")

Pushing item 0 to the stack

Push complete
stack : [0]

Popping from the stack

Popped item 0 from the stack
stack : []

Pushing item 1 to the stack

Push complete
stack : [1]

Popping from the stack

Popped item 1 from the stack
stack : []

Pushing item 2 to the stack

Push complete
stack : [2]

Popping from the stack

Popped item 2 from the stack
stack : []

Pushing item 3 to the stack

Push complete
stack : [3]

Popping from the stack

Popped item 3 from the stack
stack : []

Pushing item 4 to the stack

Push complete
stack : [4]

Popping from the stack

Popped item 4 from the stack
stack : []

Pushing item 5 to the stack

Push complete
stack : [5]

Popping from the stack

Popped item 5 from the stack
stack : []

Pushing item 6 to the stack

Push complete
stack : [6]

Pushing item 7 to the stack

Push complete
stack : [6, 7]

Popping from the stack

Popped item 7 from the stack
stack : [6]

Pushing item 8 to the stack

Push complete
stack : [6, 8]

Popping from the stack

Popped item 8 from the stack
stack : [6]

Pushing item 9 to the stack

Push complete
stack : [6, 9]

Popping from the stack

Popped item 9 from the stack
stack : [6]

Popping from the stack

Popped item 6 from the stack
stack : []
main() thread finished.

In the above implementation, I used RLock - a reentrant lock. If a thread acquires a RLock object, it can choose to reacquire it as many times as possible. It is implicit to call release() as many times as lock() was called.

Condition variables

We looked at various ways of protecting the data that’s shared between threads. But, sometimes we don’t just need to protect the data, we also need to synchronize actions on separate threads. One thread might need to wait for another thread to complete a task before the first thread can complete its own. In general, its common to want a thread to wait for a specific event to happen or a condition to be true. Although it would be possible to do this by periodically checking a task-complete flag or something like that, it is far from ideal. The need to synchronize operations between threads like this is a common scenario and the python standard standard library provides facilities to handle it, in the form of condition variables and futures.

A condition variable is always associated with some kind of lock; this can be passed in, or one will be created on the fly. Passing one in is useful when several condition variables must share the same lock. The two important methods of a condition variable are:

  • wait() - The wait() method releases the lock held, then block until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() reqacquires the lock and returns.
  • notify() - The notify() method arbitrarily wakes up any one of the threads waiting on the condition variable. The notify_all() method wakes up all the threads.

The typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interest in a particular change of state call wait() repeatedly until they see the desired state, while threads that modify the state call notify() or notify_all() when they change the state in such a way that it could possibly be a desired state for one of the waiters.

Note: The notify() and notify_all() methods don’t release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the waited-for thread finally relinquishes the ownership of the lock.

For example, the following code is a generic producer-consumer situation with unlimited buffer capacity:

# consumer
with cond_var:
    while item_is_not_available:
        cond_var.wait()
    
    get_the_available_item()

# producer
with cond_var:
    produce_an_item()
    cond_var.notify()

Implementation of a thread-based SPSC bounded ring-buffer

import threading
import time
from typing import Any, Optional
from threading import Condition

class QueueFull(Exception):
    pass

class QueueEmpty(Exception):
    pass
    
class Queue:
    def __init__(self, maxsize : int = None):
        self._lck = threading.RLock()
        self._queue_not_empty_condition = Condition(self._lck)
        self._queue_not_full_condition = Condition(self._lck)
        self.maxsize = maxsize
        self._data = list()

    @property
    def maxsize(self):
        with self._lck:
            value = self._maxsize

        return value

    @maxsize.setter
    def maxsize(self, value : int):
        with self._lck:
            self._maxsize = value

    def size(self) -> int:
        with self._lck:
            size = len(self._data)
        
        return size

    def empty(self) -> bool:
        with self._lck:
            isEmpty = len(self._data) == 0
        
        return isEmpty

    def full(self) -> bool:
        with self._lck:
            if(self.maxsize is not None):
                isFull = len(self._data) == self.maxsize
            else:
                isFull = False
        
        return isFull

    def put(
        self,
        item : Any, 
    ) -> None:
        print(f"\nPushing item {item} to the queue")
        
        self._queue_not_full_condition.acquire()
        
        while (self.full()):
            self._queue_not_full_condition.wait()
        
        self._data.append(item)

        print(f"\nPush complete")
        print(f"queue : {self._data}")
        
        self._queue_not_empty_condition.notify()

        self._queue_not_full_condition.release()
        return


    def get(self) -> Any:
        
        self._queue_not_empty_condition.acquire()

        while (self.empty()):
            self._queue_not_empty_condition.wait()

        print(f"\nPopping from the queue")
        
        value = self._data[0]
        del self._data[0]
        print(f"\nPopped item {value} from the queue")
        print(f"queue : {self._data}")

        self._queue_not_full_condition.notify()

        self._queue_not_empty_condition.release()
        return value

    def top(self) -> Any:
        self._lck.acquire()
        if self.empty():
            self._lck.release()
            raise QueueEmpty("queue empty!")  

        value = self._data[self.size() - 1]
        self._lck.release()
        return value

def push_thread(queue : Queue):
    
    for i in range(10):
        try:
            queue.put(i)
            time.sleep(0.07)
        except Exception:
            pass

def pop_thread(queue: Queue):
    for i in range(10):
        try:
            item = queue.get()
            time.sleep(0.1)
        except Exception:
            pass

if __name__ == "__main__":
    queue = Queue()
   
    
    t1 = threading.Thread(target=push_thread, args=(queue,))
    t2 = threading.Thread(target=pop_thread, args=(queue,))
    
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    print("main() thread finished.")

Pushing item 0 to the queue

Push complete
queue : [0]

Popping from the queue

Popped item 0 from the queue
queue : []

Pushing item 1 to the queue

Push complete
queue : [1]

Popping from the queue

Popped item 1 from the queue
queue : []

Pushing item 2 to the queue

Push complete
queue : [2]

Popping from the queue

Popped item 2 from the queue
queue : []

Pushing item 3 to the queue

Push complete
queue : [3]

Pushing item 4 to the queue

Push complete
queue : [3, 4]

Popping from the queue

Popped item 3 from the queue
queue : [4]

Pushing item 5 to the queue

Push complete
queue : [4, 5]

Popping from the queue

Popped item 4 from the queue
queue : [5]

Pushing item 6 to the queue

Push complete
queue : [5, 6]

Popping from the queue
Pushing item 7 to the queue


Popped item 5 from the queue
queue : [6]

Push complete
queue : [6, 7]

Pushing item 8 to the queue

Push complete
queue : [6, 7, 8]

Popping from the queue

Popped item 6 from the queue
queue : [7, 8]

Pushing item 9 to the queue

Push complete
queue : [7, 8, 9]

Popping from the queue

Popped item 7 from the queue
queue : [8, 9]

Popping from the queue

Popped item 8 from the queue
queue : [9]

Popping from the queue

Popped item 9 from the queue
queue : []
main() thread finished.

Semaphores

This is one of the oldest synchronization primitices in the history of CS, invented by the Dutch computer scientist Edsger W. Djikstra. A semaphore manages an internal counter which is decremented by each acquire() and incremented by each release() call.

Events

An Event object is one of the simplest primitives available for synchronization. Internally, the CPython implementation manages a flag that can be set to True with the set() method and reset to False using the clear() method. The wait() method blocks until the flag is True.

When the internal flag is set to True, all threads waiting on the Event are awakened. Threads that call wait() once the flag is True will not block at all.

When the internal flag is reset to False, threads calling wait() will block until set() is called to set the internal flag to True again.

The Global Interpreter Lock(GIL)

The Python interpreter maintains a reference count of each object in Python code. When references go out of scope, the reference count of the object is decremented and if the reference count equals \(0\), memory is deallocated(reclaimed). These reference counts are shared state and executing Python bytecode requires acquiring an exclusive lock on the interpreter (shared state). The implication is that the threading library does not offer true hardware concurrency even on multi-core CPUs.

asyncio from scratch

Generators

def fib(count: int):
    a, b = 1, 0
    for i in range(count):
        a, b = b, a + b
        yield b

def main():
    gen = fib(5)
    print(gen)
    while True:
        print(next(gen))

try:
    main()        
except StopIteration:
    print("Stop Iteration.")
<generator object fib at 0x10e793ca0>
1
1
2
3
5
Stop Iteration.

The fibonacci sequence is a staple of generator examples. Each time through the loop we add the previous two numbers together and yield that value resulting in the sequence \(\{1, 1, 2, 3, 5, \ldots \}\). But, when we call this function, we don’t get any of these values directly, instead we get a compiled version of the generator object. The actual code in our function hasn’t even started executing yet.

The generator object can then be iterated over just like a list and the standard next() function from the standard library can be used to iterate just once at a time. Each time we call next() on our generator object, it’s re-entering the function where we left off, preserving the full state and if the function yields another value we get that value as the result value or the return value from the next call. When the generator function completes or returns, it raises a StopIteration exception, just like any other iterator would.

It’s quite common to see generators that yield values out, but it’s also possible to communicate or send values back into the generator from the outside. To do this, we have to replace the use of the next() function with the generator’s send() function.

def counter(start = 0, stop = 10, step = 1):
    value = start
    while value < stop:
        value = yield value
        value += step
    yield value

def main():
    gen = counter()
    
    # prime the generator
    # advance to the next yield statement
    value = gen.send(None)
    print(f"sent None, got {value}")

    try:
        while(True):
            next_value = gen.send(value)
            print(f"sent {value}, got {next_value}")
            value = next_value
    except StopIteration:
        print("StopIteration.")

main()
sent None, got 0
sent 0, got 1
sent 1, got 2
sent 2, got 3
sent 3, got 4
sent 4, got 5
sent 5, got 6
sent 6, got 7
sent 7, got 8
sent 8, got 9
sent 9, got 10
StopIteration.

Congratulation, now you’ve just discovered coroutines. Python’s had them hiding in plain sight for years. But, how do we actually use this to run concurrent tasks?

We are going to write an event loop that calls send on each generator object. And rather than looking for a flag, we catch the StopIteration exception and mark those generators and tasks as completed. The StopIteration itself contains the return value from these generators. So, we save those for the final result. Lastly, we also capture intermediate yielded values and send them back on the next iteration, which enables coroutines to call other coroutines.

from typing import Generator, Any, List, Iterable
import time


def wait(tasks: Iterable[Generator]) -> List[Any]:
    pending = list(tasks)
    tasks = {task: None for task in pending}
    before = time.time()

    while pending:
        for gen in pending:
            try:
                tasks[gen] = gen.send(tasks[gen])
            except StopIteration as e:
                tasks[gen] = e.args[0]
                pending.remove(gen)

    print(f"duration = {time.time() - before:.3}")
    return list(tasks.values())

This means that we can now yield from another coroutine to call into it. Together, this makes our coroutines look and feel more like standard functions. But, they are still yielding control on their terms, and get to continue where they left off when its their turn again.

def sleep(duration: float):
    now = time.time()
    threshold = now + duration

    while now < threshold:
        yield
        now = time.time()

def bar():
    yield from sleep(0.1)
    return 123

def foo():
    value = yield from bar()
    return value

def main():
    tasks = [foo(), foo()]
    print(wait(tasks))

main()
duration = 0.1
[123, 123]

We can create a pair of coroutines from the foo() functions and pass them to the event loop. It will follow execution from foo into bar and then into the sleep coroutine. In there, it will continue yielding back into the event loop until the time duration is up. Then, on the next iteration, it will return control to bar() which returns the value back to foo() which finally completes and returns the value.

To be clear, at each yield(), our event loop is cycling to the next pending task, giving us the cooperative multitasking concurrency that we have been looking for.