Utilities

aiostream also provides utilites for general asynchronous iteration and asynchronous context management.

Asynchronous iteration

Utilities for asynchronous iteration.

class aiostream.aiter_utils.AsyncIteratorContext(aiterator: AsyncIterator[T])

Asynchronous iterator with context management.

The context management makes sure the aclose asynchronous method of the corresponding iterator has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

Correct usage:

ait = some_asynchronous_iterable()
async with AsyncIteratorContext(ait) as safe_ait:
    async for item in safe_ait:
        <block>

It is nonetheless not meant to use directly. Prefer aitercontext helper instead.

aiostream.aiter_utils.aiter(obj: AsyncIterable[T]) AsyncIterator[T]

Access aiter magic method.

aiostream.aiter_utils.aitercontext(aiterable: AsyncIterable[T]) AsyncIteratorContext[T]

Return an asynchronous context manager from an asynchronous iterable.

The context management makes sure the aclose asynchronous method has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

It is safe to use with any asynchronous iterable and prevent asynchronous iterator context to be wrapped twice.

Correct usage:

ait = some_asynchronous_iterable()
async with aitercontext(ait) as safe_ait:
    async for item in safe_ait:
        <block>
aiostream.aiter_utils.anext(obj: AsyncIterator[T]) Awaitable[T]

Access anext magic method.

aiostream.aiter_utils.assert_async_iterable(obj: object) None

Raise a TypeError if the given object is not an asynchronous iterable.

aiostream.aiter_utils.assert_async_iterator(obj: object) None

Raise a TypeError if the given object is not an asynchronous iterator.

aiostream.aiter_utils.async_(fn: Callable[P, Awaitable[T]]) Callable[P, Awaitable[T]]

Wrap the given function into a coroutine function.

async aiostream.aiter_utils.await_(obj: Awaitable[T]) T

Identity coroutine function.

aiostream.aiter_utils.is_async_iterable(obj: object) bool

Check if the given object is an asynchronous iterable.

aiostream.aiter_utils.is_async_iterator(obj: object) bool

Check if the given object is an asynchronous iterator.