Utilities

aiostream also provides utilites for general asynchronous iteration and asynchronous context management.

Asynchronous iteration

Utilities for asynchronous iteration.

aiostream.aiter_utils.aiter(obj: AsyncIterable[T]) → AsyncIterator[T]

Access aiter magic method.

aiostream.aiter_utils.anext(obj: AsyncIterator[T]) → Awaitable[T]

Access anext magic method.

aiostream.aiter_utils.await_(obj: Awaitable[T]) → T

Identity coroutine function.

aiostream.aiter_utils.async_(fn: Callable[P, Awaitable[T]]) → Callable[P, Awaitable[T]]

Wrap the given function into a coroutine function.

aiostream.aiter_utils.is_async_iterable(obj: object) → bool

Check if the given object is an asynchronous iterable.

aiostream.aiter_utils.assert_async_iterable(obj: object) → None

Raise a TypeError if the given object is not an asynchronous iterable.

aiostream.aiter_utils.is_async_iterator(obj: object) → bool

Check if the given object is an asynchronous iterator.

aiostream.aiter_utils.assert_async_iterator(obj: object) → None

Raise a TypeError if the given object is not an asynchronous iterator.

class aiostream.aiter_utils.AsyncIteratorContext(aiterator: AsyncIterator[T])

Asynchronous iterator with context management.

The context management makes sure the aclose asynchronous method of the corresponding iterator has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

Correct usage:

ait = some_asynchronous_iterable()
async with AsyncIteratorContext(ait) as safe_ait:
    async for item in safe_ait:
        <block>

It is nonetheless not meant to use directly. Prefer aitercontext helper instead.

aiostream.aiter_utils.aitercontext(aiterable: AsyncIterable[T]) → aiostream.aiter_utils.AsyncIteratorContext[T]

Return an asynchronous context manager from an asynchronous iterable.

The context management makes sure the aclose asynchronous method has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

It is safe to use with any asynchronous iterable and prevent asynchronous iterator context to be wrapped twice.

Correct usage:

ait = some_asynchronous_iterable()
async with aitercontext(ait) as safe_ait:
    async for item in safe_ait:
        <block>