import asyncio
from typing import AsyncIterator, Iterable, TypeVar
T = TypeVar("T")
async def aiter_sync[T](source: Iterable[T]) -> AsyncIterator[T]:
"""Pump a sync iterable through to_thread without blocking the loop."""
it = iter(source)
sentinel = object()
while True:
item = await asyncio.to_thread(next, it, sentinel)
if item is sentinel:
return
yield item
async def main():
# Stream a (blocking) file line-by-line in an async context
with open("/etc/hosts") as f:
async for line in aiter_sync(f):
print(line.rstrip())
asyncio.run(main())
Create a free account and build your private vault. Share publicly whenever you want.