def foo():
print(f"Inside foo.")
def main():
print(f"Starting work.")
foo()print(f"Finishing work.")
main()
Starting work.
Inside foo.
Finishing work.
Quasar
February 28, 2025
In single-core processors, the machine can only perform one task at a time, but can switch between many tasks many times per second. By doing a bit of one task and then a bit of another and so on, it appears that the tasks are happening concureently. This is called task switching. Because the task switches are so fast, it provides an illusion of concurrency to both the user and the applications.
On a single-core maching doing task switching, chunks from each task are interleaved. But, they are also spaced out a bit; in order to do the interleaving, the operating system has to perform a context switch every time it changes from one task to another, and this takes time. In order to perform a context switch, the OS has to save the CPU state and the instruction pointer for the currently running task, work out which task to switch to, and reload the CPU state for the task being switched to.
Multi-core processors are genuinely capable of running more than one task in parallel. This is called hardware concurrency.
The rate of doing work (operations per second) is called throughput. The response time it takes for a system to process a request is called latency.
Synchronous execution is sequential.
def foo():
print(f"Inside foo.")
def main():
print(f"Starting work.")
foo()
print(f"Finishing work.")
main()
Starting work.
Inside foo.
Finishing work.
In the main()
code-path, the call to foo()
is a blocking call, the execution jumps to foo()
and main()
resumes when foo()
returns.
Asynchronous(or async) execution refers to execution that doesn't block when invoking subroutines. It is a fire-and-forget technique. Any work package runs separately from the main application thread and notifies the calling thread of its completion, failure or progress.
Usually, such methods return an entity called future
or promise
that is the representation of an in-progress computation. The calling thread can query for the status of the computation via the returned future or promise and retrieve the result once completed.
Another pattern is to pass a callback function to the asynchronous functional call, which is invoked with the results when the asynchronous function is done processing.
Asynchronous programming is an execllent choice for applications that do extensive network or disk I/O and spend most of their time waiting.
Programs that are compute-intensive are called CPU bound programs. This could involve numerical optimizations, Monte-Carlo simulations, data-crunching etc.
I/O bound programs spend most of their time doing network or main memory and file I/O operations. Since the CPU and main memory are separate, a bus exists between the two to transfer bits. Similarly, data needs to moved from the NIC to CPU/memory. Even though these physical distances are small, the time taken to transfer the data can waste a few thousand CPU cycles. This is why I/O bound programs show relatively lower CPU utilization than CPU bound programs.
The most common cause of bugs in concurrent code is a race-condition.
import concurrent.futures
import logging
import time
import concurrent
import threading
class Account:
def __init__(self):
self.value = 0
@property
def value(self):
return self._value
@value.setter
def value(self, x):
self._value = x
def credit(self, name : str, amount : float):
logging.info("Thread %s: starting update", name)
# ----- Critical section -----
local_copy = self.value
local_copy += amount
time.sleep(0.1)
self.value = local_copy
# ----- End of critical section -----
logging.info("Thread %s: finishing update", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
account = Account()
logging.info("Testing update. Starting value is %d.", account.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(account.credit, index, 100)
logging.info("Testing update. Ending value is %d", account.value)
06:36:06: Testing update. Starting value is 0.
06:36:06: Thread 0: starting update
06:36:06: Thread 1: starting update
06:36:07: Thread 0: finishing update
06:36:07: Thread 1: finishing update
06:36:07: Testing update. Ending value is 100
The above logic can be made thread-safe by fencing off the critical section using a mutex and enforcing that only a single thread can enter at a time.
Imagine that you have a toy that comes in two parts, and you need both parts to play with it - a toy drum and a drumstick, for example. Now, imagine that you ave two small children, both of whom like playing with it. If one of them gets both the drum and the drumstick, that child can merrily play the drum until titing of it. If the other child wants to play, they have wait, however sad that makes them. Now, imagine one child has the drum and other has the drumstick. They’re stuck, unless one decides to be nice and let the other play, each will hold on to whatver they have and demand that they be given the other piece, so neither gets to play. This is a deadlock.
Imagine two threads arguing over locks on mutexes: each of a pair of threads needs to lock both of a pair of mutexes to perform some operation, and each thread has one mutex and is waiting for the other. Neither thread can proceed, because each is waiting for the other to release its mutex. This scenario is called deadlock.
import threading
import concurrent
import time
if __name__ == "__main__":
drum = threading.Lock()
drumstick = threading.Lock()
def child1_plays_drums():
print(f"\nChild-1 waiting for drums")
drum.acquire()
print(f"\nChild-1 acquired drums")
print(f"\nChild-1 waiting for drumstick")
drumstick.acquire()
print(f"\nChild-1 is playing drums")
def child2_plays_drums():
print(f"\nChild-2 waiting for drumstick")
drumstick.acquire()
print(f"\nChild-2 acquired drumstick")
print(f"\nChild-2 waiting for drums")
drum.acquire()
print(f"\nChild-2 acquired drums")
print(f"\nChild-2 is playing drums")
t1 = threading.Thread(target=child1_plays_drums)
t2 = threading.Thread(target=child2_plays_drums)
t1.start()
t2.start()
time.sleep(1)
Child-1 waiting for drums
Child-2 waiting for drumstick
Child-2 acquired drumstick
Child-2 waiting for drums
Child-2 acquired drums
Child-2 is playing drums
A mutex is an programming construct that allows only a single thread to access a shared resource or critical section. Once a thread acquires a mutex, all other threads attempting to acquire the same mutex are blocked until the thread releases the mutex.
A semaphore on the hand is used to limit access to a collection of resources. Think of semaphore as having a limited number of permits to give out. If a semaphore has given out all the permits it has, then any new thread that comes along requesting a permit will be blocked till an earlier thread with a permit returns it to the semaphore. A protoypical example is a ConnectionPool
that hands out database connects to requesting threads.
A semaphore with a single permit is called a binary semaphore. Semaphores can also be used for signaling among threads. This is an important distinction as it allows threads to cooperatively work towards completing a task. A mutex on the other hand, is strictly limted to serializing access to shared data among competing threads.
A semaphore can potentially act as a mutex if the number of permits it can give is at most \(1\). However, the most important difference is that, the thread that calls acquire()
on a mutex must subsequently release()
the mutex. A mutex is owned by the thread acquiring it, upto the point the owning thread releases it. Whilst, in the case of a binary semaphore, different threads can call acquire()
and release()
on the semaphore.
Another distinction between a semaphore and a mutex is that semaphores can be used for signaling amongst threads. For example, in case of the classical producer-consumer problem, the producer thread can signal the consumer thread by incrementing the semaphore count to indicate to the consumer thread to read items from the queue. Threads can coordinate tasks using semaphores. A mutex, in contrast, only guards access to shared data.
threading
moduleData-parallelism can be achieved using multi-threading.
import numpy as np
import threading
import typing
def accumulate(a : np.array, idx : int):
result = np.sum(a)
print(f"\nSum of the subarray {idx} = {result}")
if __name__ == "__main__":
data = np.random.rand(1000000)
num_chunks = 4
chunk_size = int(len(data) / num_chunks)
num_threads = num_chunks
threads = []
for i in range(num_threads):
start = i * chunk_size
end = start + chunk_size
thread = threading.Thread(target=accumulate(data[start:end], i))
threads.append(thread)
for t in threads:
t.start()
for t in threads:
t.join()
Sum of the subarray 0 = 125132.31791379408
Sum of the subarray 1 = 125003.60839306995
Sum of the subarray 2 = 125042.88767142383
Sum of the subarray 3 = 125115.72197185869
Another way to create threads is subclassing the threading.Thread
class.
from threading import Thread
from threading import current_thread
class MyTask(Thread):
def __init__(self):
Thread.__init__(self, name="subClassThread", args=(2,3))
def run(self):
print(f"{current_thread().name} is executing")
myTask = MyTask()
myTask.start() # start the thread
myTask.join() # wait for the thread to complete
The important caveats to remember when subclassing Thread
class are:
run()
method and the constructor of the Thread
class.Thread.__init__()
must be invoked if the subclass chooses to override the constructor.Daemon threads are background threads. When the main
thread is about to exit, it cycles through all regular non-daemon threads and waits for them to complete. In the implementation of the threading
module, the _shutdown()
method iterates through non-daemon threads and invokes join()
on each of them. join()
is a blocking call, which returns when a thread’s work package is complete.
import threading
import time
from typing import Any, Optional
class StackFull(Exception):
pass
class StackEmpty(Exception):
pass
class Stack:
def __init__(self, maxsize : int = None):
self._mutex = threading.RLock()
self.maxsize = maxsize
self._data = list()
@property
def maxsize(self):
with self._mutex:
value = self._maxsize
return value
@maxsize.setter
def maxsize(self, value : int):
with self._mutex:
self._maxsize = value
def size(self) -> int:
with self._mutex:
size = len(self._data)
return size
def empty(self) -> bool:
with self._mutex:
isEmpty = len(self._data) == 0
return isEmpty
def full(self) -> bool:
with self._mutex:
if(self.maxsize is not None):
isFull = len(self._data) == self.maxsize
else:
isFull = False
return isFull
def put(
self,
item : Any,
block : bool = True,
timeout : float = -1
) -> None:
self._mutex.acquire(blocking=True,timeout=timeout)
print(f"\nPushing item {item} to the stack")
if self.full():
print("Stack full!")
self._mutex.release()
raise StackFull("Stack full!")
self._data.append(item)
print(f"\nPush complete")
print(f"stack : {self._data}")
self._mutex.release()
def put_nowait(self, item:Any):
self.put(item, block=False)
def get(self, block : bool = True, timeout : float = -1) -> Any:
self._mutex.acquire(blocking=block, timeout=timeout)
print(f"\nPopping from the stack")
if self.empty():
print("Stack empty!")
self._mutex.release()
raise StackEmpty("Stack empty!")
value = self._data[self.size() - 1]
del self._data[self.size() - 1]
print(f"\nPopped item {value} from the stack")
print(f"stack : {self._data}")
self._mutex.release()
return value
def get_no_wait(self):
return self.get(block=False)
def top(self) -> Any:
self._mutex.acquire()
if self.empty():
self._mutex.release()
print("Stack empty!")
raise StackEmpty("Stack empty!")
value = self._data[self.size() - 1]
self._mutex.release()
return value
def push_thread(stack : Stack):
for i in range(10):
try:
stack.put(i)
time.sleep(0.1)
except Exception:
pass
def pop_thread(stack: Stack):
for i in range(10):
try:
item = stack.get()
time.sleep(0.12)
except Exception:
pass
if __name__ == "__main__":
stack = Stack()
t1 = threading.Thread(target=push_thread, args=(stack,))
t2 = threading.Thread(target=pop_thread, args=(stack,))
t1.start()
t2.start()
t1.join()
t2.join()
print("main() thread finished.")
Pushing item 0 to the stack
Push complete
stack : [0]
Popping from the stack
Popped item 0 from the stack
stack : []
Pushing item 1 to the stack
Push complete
stack : [1]
Popping from the stack
Popped item 1 from the stack
stack : []
Pushing item 2 to the stack
Push complete
stack : [2]
Popping from the stack
Popped item 2 from the stack
stack : []
Pushing item 3 to the stack
Push complete
stack : [3]
Popping from the stack
Popped item 3 from the stack
stack : []
Pushing item 4 to the stack
Push complete
stack : [4]
Popping from the stack
Popped item 4 from the stack
stack : []
Pushing item 5 to the stack
Push complete
stack : [5]
Popping from the stack
Popped item 5 from the stack
stack : []
Pushing item 6 to the stack
Push complete
stack : [6]
Pushing item 7 to the stack
Push complete
stack : [6, 7]
Popping from the stack
Popped item 7 from the stack
stack : [6]
Pushing item 8 to the stack
Push complete
stack : [6, 8]
Popping from the stack
Popped item 8 from the stack
stack : [6]
Pushing item 9 to the stack
Push complete
stack : [6, 9]
Popping from the stack
Popped item 9 from the stack
stack : [6]
Popping from the stack
Popped item 6 from the stack
stack : []
main() thread finished.
In the above implementation, I used RLock
- a reentrant lock. If a thread acquires a RLock
object, it can choose to reacquire it as many times as possible. It is implicit to call release()
as many times as lock()
was called.
We looked at various ways of protecting the data that’s shared between threads. But, sometimes we don’t just need to protect the data, we also need to synchronize actions on separate threads. One thread might need to wait for another thread to complete a task before the first thread can complete its own. In general, its common to want a thread to wait for a specific event to happen or a condition to be true
. Although it would be possible to do this by periodically checking a task-complete flag or something like that, it is far from ideal. The need to synchronize operations between threads like this is a common scenario and the python standard standard library provides facilities to handle it, in the form of condition variables and futures.
A condition variable is always associated with some kind of lock; this can be passed in, or one will be created on the fly. Passing one in is useful when several condition variables must share the same lock. The two important methods of a condition variable are:
wait()
- The wait()
method releases the lock held, then block until another thread awakens it by calling notify()
or notify_all()
. Once awakened, wait()
reqacquires the lock and returns.notify()
- The notify()
method arbitrarily wakes up any one of the threads waiting on the condition variable. The notify_all()
method wakes up all the threads.The typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interest in a particular change of state call wait()
repeatedly until they see the desired state, while threads that modify the state call notify()
or notify_all()
when they change the state in such a way that it could possibly be a desired state for one of the waiters.
Note: The notify()
and notify_all()
methods don’t release the lock; this means that the thread or threads awakened will not return from their wait()
call immediately, but only when the waited-for thread finally relinquishes the ownership of the lock.
For example, the following code is a generic producer-consumer situation with unlimited buffer capacity:
import threading
import time
from typing import Any, Optional
from threading import Condition
class QueueFull(Exception):
pass
class QueueEmpty(Exception):
pass
class Queue:
def __init__(self, maxsize : int = None):
self._lck = threading.RLock()
self._queue_not_empty_condition = Condition(self._lck)
self._queue_not_full_condition = Condition(self._lck)
self.maxsize = maxsize
self._data = list()
@property
def maxsize(self):
with self._lck:
value = self._maxsize
return value
@maxsize.setter
def maxsize(self, value : int):
with self._lck:
self._maxsize = value
def size(self) -> int:
with self._lck:
size = len(self._data)
return size
def empty(self) -> bool:
with self._lck:
isEmpty = len(self._data) == 0
return isEmpty
def full(self) -> bool:
with self._lck:
if(self.maxsize is not None):
isFull = len(self._data) == self.maxsize
else:
isFull = False
return isFull
def put(
self,
item : Any,
) -> None:
print(f"\nPushing item {item} to the queue")
self._queue_not_full_condition.acquire()
while (self.full()):
self._queue_not_full_condition.wait()
self._data.append(item)
print(f"\nPush complete")
print(f"queue : {self._data}")
self._queue_not_empty_condition.notify()
self._queue_not_full_condition.release()
return
def get(self) -> Any:
self._queue_not_empty_condition.acquire()
while (self.empty()):
self._queue_not_empty_condition.wait()
print(f"\nPopping from the queue")
value = self._data[0]
del self._data[0]
print(f"\nPopped item {value} from the queue")
print(f"queue : {self._data}")
self._queue_not_full_condition.notify()
self._queue_not_empty_condition.release()
return value
def top(self) -> Any:
self._lck.acquire()
if self.empty():
self._lck.release()
raise QueueEmpty("queue empty!")
value = self._data[self.size() - 1]
self._lck.release()
return value
def push_thread(queue : Queue):
for i in range(10):
try:
queue.put(i)
time.sleep(0.07)
except Exception:
pass
def pop_thread(queue: Queue):
for i in range(10):
try:
item = queue.get()
time.sleep(0.1)
except Exception:
pass
if __name__ == "__main__":
queue = Queue()
t1 = threading.Thread(target=push_thread, args=(queue,))
t2 = threading.Thread(target=pop_thread, args=(queue,))
t1.start()
t2.start()
t1.join()
t2.join()
print("main() thread finished.")
Pushing item 0 to the queue
Push complete
queue : [0]
Popping from the queue
Popped item 0 from the queue
queue : []
Pushing item 1 to the queue
Push complete
queue : [1]
Popping from the queue
Popped item 1 from the queue
queue : []
Pushing item 2 to the queue
Push complete
queue : [2]
Popping from the queue
Popped item 2 from the queue
queue : []
Pushing item 3 to the queue
Push complete
queue : [3]
Pushing item 4 to the queue
Push complete
queue : [3, 4]
Popping from the queue
Popped item 3 from the queue
queue : [4]
Pushing item 5 to the queue
Push complete
queue : [4, 5]
Popping from the queue
Popped item 4 from the queue
queue : [5]
Pushing item 6 to the queue
Push complete
queue : [5, 6]
Popping from the queue
Pushing item 7 to the queue
Popped item 5 from the queue
queue : [6]
Push complete
queue : [6, 7]
Pushing item 8 to the queue
Push complete
queue : [6, 7, 8]
Popping from the queue
Popped item 6 from the queue
queue : [7, 8]
Pushing item 9 to the queue
Push complete
queue : [7, 8, 9]
Popping from the queue
Popped item 7 from the queue
queue : [8, 9]
Popping from the queue
Popped item 8 from the queue
queue : [9]
Popping from the queue
Popped item 9 from the queue
queue : []
main() thread finished.
This is one of the oldest synchronization primitices in the history of CS, invented by the Dutch computer scientist Edsger W. Djikstra. A semaphore manages an internal counter which is decremented by each acquire()
and incremented by each release()
call.
An Event
object is one of the simplest primitives available for synchronization. Internally, the CPython implementation manages a flag that can be set to True
with the set()
method and reset to False
using the clear()
method. The wait()
method blocks until the flag is True
.
When the internal flag is set to True
, all threads waiting on the Event
are awakened. Threads that call wait()
once the flag is True
will not block at all.
When the internal flag is reset to False
, threads calling wait()
will block until set()
is called to set the internal flag to True
again.
The Python interpreter maintains a reference count of each object in Python code. When references go out of scope, the reference count of the object is decremented and if the reference count equals \(0\), memory is deallocated(reclaimed). These reference counts are shared state and executing Python bytecode requires acquiring an exclusive lock on the interpreter (shared state). The implication is that the threading
library does not offer true hardware concurrency even on multi-core CPUs.
asyncio
from scratchdef fib(count: int):
a, b = 1, 0
for i in range(count):
a, b = b, a + b
yield b
def main():
gen = fib(5)
print(gen)
while True:
print(next(gen))
try:
main()
except StopIteration:
print("Stop Iteration.")
<generator object fib at 0x10e793ca0>
1
1
2
3
5
Stop Iteration.
The fibonacci sequence is a staple of generator examples. Each time through the loop we add the previous two numbers together and yield that value resulting in the sequence \(\{1, 1, 2, 3, 5, \ldots \}\). But, when we call this function, we don’t get any of these values directly, instead we get a compiled version of the generator object. The actual code in our function hasn’t even started executing yet.
The generator object can then be iterated over just like a list and the standard next()
function from the standard library can be used to iterate just once at a time. Each time we call next()
on our generator object, it’s re-entering the function where we left off, preserving the full state and if the function yields another value we get that value as the result value or the return value from the next call. When the generator function completes or returns, it raises a StopIteration
exception, just like any other iterator would.
It’s quite common to see generators that yield values out, but it’s also possible to communicate or send values back into the generator from the outside. To do this, we have to replace the use of the next()
function with the generator’s send()
function.
def counter(start = 0, stop = 10, step = 1):
value = start
while value < stop:
value = yield value
value += step
yield value
def main():
gen = counter()
# prime the generator
# advance to the next yield statement
value = gen.send(None)
print(f"sent None, got {value}")
try:
while(True):
next_value = gen.send(value)
print(f"sent {value}, got {next_value}")
value = next_value
except StopIteration:
print("StopIteration.")
main()
sent None, got 0
sent 0, got 1
sent 1, got 2
sent 2, got 3
sent 3, got 4
sent 4, got 5
sent 5, got 6
sent 6, got 7
sent 7, got 8
sent 8, got 9
sent 9, got 10
StopIteration.
Congratulation, now you’ve just discovered coroutines. Python’s had them hiding in plain sight for years. But, how do we actually use this to run concurrent tasks?
We are going to write an event loop that calls send
on each generator object. And rather than looking for a flag, we catch the StopIteration
exception and mark those generators and tasks as completed. The StopIteration
itself contains the return value from these generators. So, we save those for the final result. Lastly, we also capture intermediate yielded values and send them back on the next iteration, which enables coroutines to call other coroutines.
from typing import Generator, Any, List, Iterable
import time
def wait(tasks: Iterable[Generator]) -> List[Any]:
pending = list(tasks)
tasks = {task: None for task in pending}
before = time.time()
while pending:
for gen in pending:
try:
tasks[gen] = gen.send(tasks[gen])
except StopIteration as e:
tasks[gen] = e.args[0]
pending.remove(gen)
print(f"duration = {time.time() - before:.3}")
return list(tasks.values())
This means that we can now yield from
another coroutine to call into it. Together, this makes our coroutines look and feel more like standard functions. But, they are still yielding control on their terms, and get to continue where they left off when its their turn again.
def sleep(duration: float):
now = time.time()
threshold = now + duration
while now < threshold:
yield
now = time.time()
def bar():
yield from sleep(0.1)
return 123
def foo():
value = yield from bar()
return value
def main():
tasks = [foo(), foo()]
print(wait(tasks))
main()
duration = 0.1
[123, 123]
We can create a pair of coroutines from the foo()
functions and pass them to the event loop. It will follow execution from foo
into bar
and then into the sleep
coroutine. In there, it will continue yielding back into the event loop until the time duration is up. Then, on the next iteration, it will return
control to bar()
which returns the value back to foo()
which finally completes and returns the value.
To be clear, at each yield()
, our event loop is cycling to the next pending task, giving us the cooperative multitasking concurrency that we have been looking for.
---
title: "DIY asyncio"
author: "Quasar"
date: "2025-02-28"
categories: [Python]
image: "cpp.jpg"
toc: true
toc-depth: 3
format:
html:
code-tools: true
code-block-border-left: true
code-annotations: below
highlight-style: pygments
---
## Basics
In single-core processors, the machine can only perform one task at a time, but can switch between many tasks many times per second. By doing a bit of one task and then a bit of another and so on, it appears that the tasks are happening concureently. This is called *task switching*. Because the task switches are so fast, it provides an illusion of concurrency to both the user and the applications.
On a single-core maching doing task switching, chunks from each task are interleaved. But, they are also spaced out a bit; in order to do the interleaving, the operating system has to perform a *context switch* every time it changes from one task to another, and this takes time. In order to perform a context switch, the OS has to save the CPU state and the instruction pointer for the currently running task, work out which task to switch to, and reload the CPU state for the task being switched to.
Multi-core processors are genuinely capable of running more than one task in parallel. This is called *hardware concurrency*.
### Throughput and Latency
The rate of doing work (operations per second) is called *throughput*. The response time it takes for a system to process a request is called *latency*.
### Synchronous vs Asynchronous
Synchronous execution is sequential.
```{python}
def foo():
print(f"Inside foo.")
def main():
print(f"Starting work.")
foo()
print(f"Finishing work.")
main()
```
In the `main()` code-path, the call to `foo()` is a blocking call, the execution jumps to `foo()` and `main()` resumes when `foo()` returns.
Asynchronous(or *async*) execution refers to execution that doesn\'t block when invoking subroutines. It is a *fire-and-forget* technique. Any work package runs separately from the main application thread and notifies the calling thread of its completion, failure or progress.
Usually, such methods return an entity called `future` or `promise` that is the representation of an in-progress computation. The calling thread can query for the status of the computation via the returned future or promise and retrieve the result once completed.
Another pattern is to pass a callback function to the asynchronous functional call, which is invoked with the results when the asynchronous function is done processing.
Asynchronous programming is an execllent choice for applications that do extensive network or disk I/O and spend most of their time waiting.
### I/O bound vs CPU bound
#### CPU bound
Programs that are compute-intensive are called CPU bound programs. This could involve numerical optimizations, Monte-Carlo simulations, data-crunching etc.
#### I/O bound
I/O bound programs spend most of their time doing network or main memory and file I/O operations. Since the CPU and main memory are separate, a bus exists between the two to transfer bits. Similarly, data needs to moved from the NIC to CPU/memory. Even though these physical distances are small, the time taken to transfer the data can waste a few thousand CPU cycles. This is why I/O bound programs show relatively lower CPU utilization than CPU bound programs.
### Data race-conditions and thread safety
The most common cause of bugs in concurrent code is a *race-condition*.
```{python}
import concurrent.futures
import logging
import time
import concurrent
import threading
class Account:
def __init__(self):
self.value = 0
@property
def value(self):
return self._value
@value.setter
def value(self, x):
self._value = x
def credit(self, name : str, amount : float):
logging.info("Thread %s: starting update", name)
# ----- Critical section -----
local_copy = self.value
local_copy += amount
time.sleep(0.1)
self.value = local_copy
# ----- End of critical section -----
logging.info("Thread %s: finishing update", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
account = Account()
logging.info("Testing update. Starting value is %d.", account.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(account.credit, index, 100)
logging.info("Testing update. Ending value is %d", account.value)
```
The above logic can be made thread-safe by fencing off the critical section using a mutex and enforcing that only a single thread can enter at a time.
### Deadlocks
Imagine that you have a toy that comes in two parts, and you need both parts to play with it - a toy drum and a drumstick, for example. Now, imagine that you ave two small children, both of whom like playing with it. If one of them gets both the drum and the drumstick, that child can merrily play the drum until titing of it. If the other child wants to play, they have wait, however sad that makes them. Now, imagine one child has the drum and other has the drumstick. They're stuck, unless one decides to be nice and let the other play, each will hold on to whatver they have and demand that they be given the other piece, so neither gets to play. This is a deadlock.
Imagine two threads arguing over locks on mutexes: each of a pair of threads needs to lock both of a pair of mutexes to perform some operation, and each thread has one mutex and is waiting for the other. Neither thread can proceed, because each is waiting for the other to release its mutex. This scenario is called *deadlock*.
```{python}
import threading
import concurrent
import time
if __name__ == "__main__":
drum = threading.Lock()
drumstick = threading.Lock()
def child1_plays_drums():
print(f"\nChild-1 waiting for drums")
drum.acquire()
print(f"\nChild-1 acquired drums")
print(f"\nChild-1 waiting for drumstick")
drumstick.acquire()
print(f"\nChild-1 is playing drums")
def child2_plays_drums():
print(f"\nChild-2 waiting for drumstick")
drumstick.acquire()
print(f"\nChild-2 acquired drumstick")
print(f"\nChild-2 waiting for drums")
drum.acquire()
print(f"\nChild-2 acquired drums")
print(f"\nChild-2 is playing drums")
t1 = threading.Thread(target=child1_plays_drums)
t2 = threading.Thread(target=child2_plays_drums)
t1.start()
t2.start()
time.sleep(1)
```
### Mutexes and Semaphores
A mutex is an programming construct that allows only a single thread to access a shared resource or critical section. Once a thread acquires a mutex, all other threads attempting to acquire the same mutex are blocked until the thread releases the mutex.
A semaphore on the hand is used to limit access to a collection of resources. Think of semaphore as having a limited number of permits to give out. If a semaphore has given out all the permits it has, then any new thread that comes along requesting a permit will be blocked till an earlier thread with a permit returns it to the semaphore. A protoypical example is a `ConnectionPool` that hands out database connects to requesting threads.
A semaphore with a single permit is called a *binary semaphore*. Semaphores can also be used for signaling among threads. This is an important distinction as it allows threads to cooperatively work towards completing a task. A mutex on the other hand, is strictly limted to serializing access to shared data among competing threads.
#### When can a semaphore masquerade as a mutex?
A semaphore can potentially act as a mutex if the number of permits it can give is at most $1$. However, the most important difference is that, the thread that calls `acquire()` on a mutex must subsequently `release()` the mutex. A mutex is *owned* by the thread acquiring it, upto the point the owning thread releases it. Whilst, in the case of a binary semaphore, different threads can call `acquire()` and `release()` on the semaphore.
### Semaphore for signaling
Another distinction between a semaphore and a mutex is that semaphores can be used for signaling amongst threads. For example, in case of the classical [producer-consumer problem](https://quantdev.blog/posts/thread-safe-queues/), the producer thread can signal the consumer thread by incrementing the semaphore count to indicate to the consumer thread to read items from the queue. Threads can coordinate tasks using semaphores. A mutex, in contrast, only guards access to shared data.
## `threading` module
Data-parallelism can be achieved using multi-threading.
```{python}
import numpy as np
import threading
import typing
def accumulate(a : np.array, idx : int):
result = np.sum(a)
print(f"\nSum of the subarray {idx} = {result}")
if __name__ == "__main__":
data = np.random.rand(1000000)
num_chunks = 4
chunk_size = int(len(data) / num_chunks)
num_threads = num_chunks
threads = []
for i in range(num_threads):
start = i * chunk_size
end = start + chunk_size
thread = threading.Thread(target=accumulate(data[start:end], i))
threads.append(thread)
for t in threads:
t.start()
for t in threads:
t.join()
```
Another way to create threads is subclassing the `threading.Thread` class.
```{python}
from threading import Thread
from threading import current_thread
class MyTask(Thread):
def __init__(self):
Thread.__init__(self, name="subClassThread", args=(2,3))
def run(self):
print(f"{current_thread().name} is executing")
myTask = MyTask()
myTask.start() # start the thread
myTask.join() # wait for the thread to complete
```
The important caveats to remember when subclassing `Thread` class are:
- We can only override the `run()` method and the constructor of the `Thread` class.
- `Thread.__init__()` must be invoked if the subclass chooses to override the constructor.
### Daemon Thread
*Daemon* threads are background threads. When the `main` thread is about to exit, it cycles through all regular non-daemon threads and waits for them to complete. In the implementation of the `threading` module, the [`_shutdown()`](https://github.com/python/cpython/blob/df5cdc11123a35065bbf1636251447d0bfe789a5/Lib/threading.py#L1263) method iterates through non-daemon threads and invokes `join()` on each of them. `join()` is a blocking call, which returns when a thread's work package is complete.
```{python}
import threading
import time
def daemon_task():
while(True):
print(f"Executing daemon task")
time.sleep(1)
print(f"Completed daemon task")
if __name__ == "__main__":
daemon_thread = threading.Thread(
target=daemon_task,
name="daemon thread",
daemon=True
)
daemon_thread.start()
```
### Implementation of a thread-safe LIFO stack
```{python}
import threading
import time
from typing import Any, Optional
class StackFull(Exception):
pass
class StackEmpty(Exception):
pass
class Stack:
def __init__(self, maxsize : int = None):
self._mutex = threading.RLock()
self.maxsize = maxsize
self._data = list()
@property
def maxsize(self):
with self._mutex:
value = self._maxsize
return value
@maxsize.setter
def maxsize(self, value : int):
with self._mutex:
self._maxsize = value
def size(self) -> int:
with self._mutex:
size = len(self._data)
return size
def empty(self) -> bool:
with self._mutex:
isEmpty = len(self._data) == 0
return isEmpty
def full(self) -> bool:
with self._mutex:
if(self.maxsize is not None):
isFull = len(self._data) == self.maxsize
else:
isFull = False
return isFull
def put(
self,
item : Any,
block : bool = True,
timeout : float = -1
) -> None:
self._mutex.acquire(blocking=True,timeout=timeout)
print(f"\nPushing item {item} to the stack")
if self.full():
print("Stack full!")
self._mutex.release()
raise StackFull("Stack full!")
self._data.append(item)
print(f"\nPush complete")
print(f"stack : {self._data}")
self._mutex.release()
def put_nowait(self, item:Any):
self.put(item, block=False)
def get(self, block : bool = True, timeout : float = -1) -> Any:
self._mutex.acquire(blocking=block, timeout=timeout)
print(f"\nPopping from the stack")
if self.empty():
print("Stack empty!")
self._mutex.release()
raise StackEmpty("Stack empty!")
value = self._data[self.size() - 1]
del self._data[self.size() - 1]
print(f"\nPopped item {value} from the stack")
print(f"stack : {self._data}")
self._mutex.release()
return value
def get_no_wait(self):
return self.get(block=False)
def top(self) -> Any:
self._mutex.acquire()
if self.empty():
self._mutex.release()
print("Stack empty!")
raise StackEmpty("Stack empty!")
value = self._data[self.size() - 1]
self._mutex.release()
return value
def push_thread(stack : Stack):
for i in range(10):
try:
stack.put(i)
time.sleep(0.1)
except Exception:
pass
def pop_thread(stack: Stack):
for i in range(10):
try:
item = stack.get()
time.sleep(0.12)
except Exception:
pass
if __name__ == "__main__":
stack = Stack()
t1 = threading.Thread(target=push_thread, args=(stack,))
t2 = threading.Thread(target=pop_thread, args=(stack,))
t1.start()
t2.start()
t1.join()
t2.join()
print("main() thread finished.")
```
In the above implementation, I used `RLock` - a reentrant lock. If a thread acquires a `RLock` object, it can choose to reacquire it as many times as possible. It is implicit to call `release()` as many times as `lock()` was called.
### Condition variables
We looked at various ways of protecting the data that's shared between threads. But, sometimes we don't just need to protect the data, we also need to synchronize actions on separate threads. One thread might need to wait for another thread to complete a task before the first thread can complete its own. In general, its common to want a thread to wait for a specific event to happen or a condition to be `true`. Although it would be possible to do this by periodically checking a *task-complete* flag or something like that, it is far from ideal. The need to synchronize operations between threads like this is a common scenario and the python standard standard library provides facilities to handle it, in the form of *condition variables* and *futures*.
A condition variable is always associated with some kind of lock; this can be passed in, or one will be created on the fly. Passing one in is useful when several condition variables must share the same lock. The two important methods of a condition variable are:
- `wait()` - The `wait()` method releases the lock held, then block until another thread awakens it by calling `notify()` or `notify_all()`. Once awakened, `wait()` reqacquires the lock and returns.
- `notify()` - The `notify()` method arbitrarily wakes up any one of the threads waiting on the condition variable. The `notify_all()` method wakes up all the threads.
The typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interest in a particular change of state call `wait()` repeatedly until they see the desired state, while threads that modify the state call `notify()` or `notify_all()` when they change the state in such a way that it could possibly be a desired state for one of the waiters.
Note: The `notify()` and `notify_all()` methods don't release the lock; this means that the thread or threads awakened will not return from their `wait()` call immediately, but only when the waited-for thread finally relinquishes the ownership of the lock.
For example, the following code is a generic producer-consumer situation with unlimited buffer capacity:
```python
# consumer
with cond_var:
while item_is_not_available:
cond_var.wait()
get_the_available_item()
# producer
with cond_var:
produce_an_item()
cond_var.notify()
```
### Implementation of a thread-based SPSC bounded ring-buffer
```{python}
import threading
import time
from typing import Any, Optional
from threading import Condition
class QueueFull(Exception):
pass
class QueueEmpty(Exception):
pass
class Queue:
def __init__(self, maxsize : int = None):
self._lck = threading.RLock()
self._queue_not_empty_condition = Condition(self._lck)
self._queue_not_full_condition = Condition(self._lck)
self.maxsize = maxsize
self._data = list()
@property
def maxsize(self):
with self._lck:
value = self._maxsize
return value
@maxsize.setter
def maxsize(self, value : int):
with self._lck:
self._maxsize = value
def size(self) -> int:
with self._lck:
size = len(self._data)
return size
def empty(self) -> bool:
with self._lck:
isEmpty = len(self._data) == 0
return isEmpty
def full(self) -> bool:
with self._lck:
if(self.maxsize is not None):
isFull = len(self._data) == self.maxsize
else:
isFull = False
return isFull
def put(
self,
item : Any,
) -> None:
print(f"\nPushing item {item} to the queue")
self._queue_not_full_condition.acquire()
while (self.full()):
self._queue_not_full_condition.wait()
self._data.append(item)
print(f"\nPush complete")
print(f"queue : {self._data}")
self._queue_not_empty_condition.notify()
self._queue_not_full_condition.release()
return
def get(self) -> Any:
self._queue_not_empty_condition.acquire()
while (self.empty()):
self._queue_not_empty_condition.wait()
print(f"\nPopping from the queue")
value = self._data[0]
del self._data[0]
print(f"\nPopped item {value} from the queue")
print(f"queue : {self._data}")
self._queue_not_full_condition.notify()
self._queue_not_empty_condition.release()
return value
def top(self) -> Any:
self._lck.acquire()
if self.empty():
self._lck.release()
raise QueueEmpty("queue empty!")
value = self._data[self.size() - 1]
self._lck.release()
return value
def push_thread(queue : Queue):
for i in range(10):
try:
queue.put(i)
time.sleep(0.07)
except Exception:
pass
def pop_thread(queue: Queue):
for i in range(10):
try:
item = queue.get()
time.sleep(0.1)
except Exception:
pass
if __name__ == "__main__":
queue = Queue()
t1 = threading.Thread(target=push_thread, args=(queue,))
t2 = threading.Thread(target=pop_thread, args=(queue,))
t1.start()
t2.start()
t1.join()
t2.join()
print("main() thread finished.")
```
### Semaphores
This is one of the oldest synchronization primitices in the history of CS, invented by the Dutch computer scientist [Edsger W. Djikstra](https://en.wikipedia.org/wiki/Edsger_W._Dijkstra). A semaphore manages an internal counter which is decremented by each `acquire()` and incremented by each `release()` call.
## Events
An `Event` object is one of the simplest primitives available for synchronization. Internally, the CPython [implementation](https://github.com/python/cpython/blob/df5cdc11123a35065bbf1636251447d0bfe789a5/Lib/threading.py#L488) manages a flag that can be set to `True` with the `set()` method and reset to `False` using the `clear()` method. The `wait()` method blocks until the flag is `True`.
When the internal flag is set to `True`, all threads waiting on the `Event` are awakened. Threads that call `wait()` once the flag is `True` will not block at all.
When the internal flag is reset to `False`, threads calling `wait()` will block until `set()` is called to set the internal flag to `True` again.
## The Global Interpreter Lock(GIL)
The Python interpreter maintains a reference count of each object in Python code. When references go out of scope, the reference count of the object is decremented and if the reference count equals $0$, memory is deallocated(reclaimed). These reference counts are shared state and executing Python bytecode requires acquiring an exclusive lock on the interpreter (shared state). The implication is that the `threading` library does not offer true hardware concurrency even on multi-core CPUs.
## `asyncio` from scratch
### Generators
```{python}
def fib(count: int):
a, b = 1, 0
for i in range(count):
a, b = b, a + b
yield b
def main():
gen = fib(5)
print(gen)
while True:
print(next(gen))
try:
main()
except StopIteration:
print("Stop Iteration.")
```
The fibonacci sequence is a staple of generator examples. Each time through the loop we add the previous two numbers together and yield that value resulting in the sequence $\{1, 1, 2, 3, 5, \ldots \}$. But, when we call this function, we don't get any of these values directly, instead we get a compiled version of the generator object. The actual code in our function hasn't even started executing yet.
The generator object can then be iterated over just like a list and the standard `next()` function from the standard library can be used to iterate just once at a time. Each time we call `next()` on our generator object, it's re-entering the function where we left off, preserving the full state and if the function yields another value we get that value as the result value or the return value from the next call. When the generator function completes or returns, it raises a `StopIteration` exception, just like any other iterator would.
It's quite common to see generators that yield values out, but it's also possible to communicate or send values back into the generator from the outside. To do this, we have to replace the use of the `next()` function with the generator's `send()` function.
```{python}
def counter(start = 0, stop = 10, step = 1):
value = start
while value < stop:
value = yield value
value += step
yield value
def main():
gen = counter()
# prime the generator
# advance to the next yield statement
value = gen.send(None)
print(f"sent None, got {value}")
try:
while(True):
next_value = gen.send(value)
print(f"sent {value}, got {next_value}")
value = next_value
except StopIteration:
print("StopIteration.")
main()
```
Congratulation, now you've just discovered coroutines. Python's had them hiding in plain sight for years. But, how do we actually use this to run concurrent tasks?
We are going to write an event loop that calls `send` on each generator object. And rather than looking for a flag, we catch the `StopIteration` exception and mark those generators and tasks as completed. The `StopIteration` itself contains the return value from these generators. So, we save those for the final result. Lastly, we also capture intermediate yielded values and send them back on the next iteration, which enables coroutines to call other coroutines.
```{python}
from typing import Generator, Any, List, Iterable
import time
def wait(tasks: Iterable[Generator]) -> List[Any]:
pending = list(tasks)
tasks = {task: None for task in pending}
before = time.time()
while pending:
for gen in pending:
try:
tasks[gen] = gen.send(tasks[gen])
except StopIteration as e:
tasks[gen] = e.args[0]
pending.remove(gen)
print(f"duration = {time.time() - before:.3}")
return list(tasks.values())
```
This means that we can now `yield from` another coroutine to call into it. Together, this makes our coroutines look and feel more like standard functions. But, they are still yielding control on their terms, and get to continue where they left off when its their turn again.
```{python}
def sleep(duration: float):
now = time.time()
threshold = now + duration
while now < threshold:
yield
now = time.time()
def bar():
yield from sleep(0.1)
return 123
def foo():
value = yield from bar()
return value
def main():
tasks = [foo(), foo()]
print(wait(tasks))
main()
```
We can create a pair of coroutines from the `foo()` functions and pass them to the event loop. It will follow execution from `foo` into `bar` and then into the `sleep` coroutine. In there, it will continue yielding back into the event loop until the time duration is up. Then, on the next iteration, it will `return` control to `bar()` which returns the value back to `foo()` which finally completes and returns the value.
To be clear, at each `yield()`, our event loop is cycling to the next pending task, giving us the cooperative multitasking concurrency that we have been looking for.