Python
Async programming
- Single thread, concurrent execution, it uses cooperative multitasking.
- Coroutines are functions that can be paused and resumed.
- Coroutines are repurposed generators that take advantage of the peculiarities of generator methods.
- A coroutine is just the very fancy term for the thing returned by an async def function.
- Old
@asyncio.coroutinealso exist.
- Event loop is the main loop that drives the execution of coroutines.
- You can think of an event loop as something like a while True loop that monitors coroutines, taking feedback on what’s idle, and looking around for things that can be executed in the meantime. It is able to wake up an idle coroutine when whatever that coroutine is waiting on becomes available.
- Tasks are used to schedule coroutines concurrently.
- TaskGroup combines Tasks cleanly.
asyncio.sleep()is used to stand in for a non-blocking call (but one that also takes some time to complete)- A few methods to work it out
- Method 1: Nonblocking but total time 3 seconds cz they are not started simultaneously.
- Method 2: Two seconds run using Task.
- Method 3: Use TaskGroups to implicitly call await.
- Method 4:
asyncio.gather. Gather waits until all the coroutines are completed. However, useasyncio.as_completed()to get tasks as they are completed, in the order of completion.
- Method 1
- Method 2
- Method 3
- Method 4
import time
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "world"))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(say_after(1, "hello"))
tg.create_task(say_after(2, "world"))
print(f"started at {time.strftime('%X')}")
print(f"finished at {time.strftime('%X')}")
async def main():
print(f"started at {time.strftime('%X')}")
await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world'),
)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
-
Some AIO libs:
aiohttp aiofiles aiodns -
Async IO Design Patterns
- Chaining Coroutines
- Using a Queue
- Pattern 1: Chaining Coroutines
- Pattern 2: Using a Queue
import time
import asyncio
import random
async def step1(n):
t = random.randint(0, 10)
print(f" Step 1: {n} waiting {t} seconds")
await asyncio.sleep(t)
result = f"result{n}-1"
# print(f" Step 1: {n} | Returning ==> {result}.")
return result
async def step2(n, arg):
t = random.randint(0, 10)
print(f" Step 2: {n} waiting {t} seconds")
await asyncio.sleep(t)
result = f"result{n}-2 derived from {arg}"
# print(f" Step 2: {n} | Returning == {result}.")
return result
async def chain(n):
start = time.perf_counter()
value1 = await step1(n)
value2 = await step2(n, value1)
end = time.perf_counter()
print(f"> Chained result {n} took {end - start} seconds.")
return value2
async def main():
start = time.perf_counter()
values = await asyncio.gather(*[chain(n) for n in [3, 6, 9]])
end = time.perf_counter()
print("-------")
print(f"Total time: {end - start} seconds")
print(values)
asyncio.run(main())
import asyncio
import itertools as it
import os
import random
import time
async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()
async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()
async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join()
for c in consumers:
c.cancel()
if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")
- Consumer Initiation: Consumers are initiated by
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]. Theasyncio.create_task()function schedules theconsumecoroutine to run on the event loop immediately and concurrently. You do notawaitcreate_task()because it returns aTaskobject right away;awaitis used to wait for the completion of a task or coroutine, not its initiation. - Producer Completion:
await asyncio.gather(*producers)ensures that themainfunction waits until all producer tasks have completed adding items to the queue. - Queue Synchronization:
await q.join()is vital; it ensures thatmainwaits until all items added to the queue by producers have been received and processed by consumers, with eachq.task_done()call decrementing the count of unfinished tasks. This guarantees that all producer-generated work is fully handled before the program proceeds. - Consumer Cancellation:
for c in consumers: c.cancel()is necessary to gracefully stop the consumer tasks, as they run in an infinitewhile Trueloop and would otherwise continue indefinitely, even after all items are processed.
Neither asynchronous generators nor comprehensions make the iteration concurrent.
async def mygen(u: int = 10):
"""Yield powers of 2."""
i = 0
while i < u:
yield 2 ** i
i += 1
await asyncio.sleep(0.1)
async def main():
# This does *not* introduce concurrent execution
# It is meant to show syntax only
g = [i async for i in mygen()]
f = [j async for j in mygen() if not (j // 3 % 5)]
return g, f
References
Testing
Types of Testing
- Unit Testing
- Integration Testing
- System Testing
- Acceptance Testing
References
- Made with ML testing
- Pytest CI
- Fast API
- Fast API file structure
- fastapi pytest github template.