Core objects

Stream base class

class aiostream.core.Stream(factory: Callable[[], AsyncIterable[T]])

Enhanced asynchronous iterable.

It provides the following features:

  • Operator pipe-lining - using pipe symbol |

  • Repeatability - every iteration creates a different iterator

  • Safe iteration context - using async with and the stream method

  • Simplified execution - get the last element from a stream using await

  • Slicing and indexing - using square brackets []

  • Concatenation - using addition symbol +

It is not meant to be instanciated directly. Use the stream operators instead.


xs = stream.count()    # xs is a stream object
ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
zs = ys[5:10:2]        # slice ys using start, stop and step

async with as streamer:  # stream zs in a safe context
    async for z in streamer:         # iterate the zs streamer
        print(z)                     # Prints 10, 12, 14

result = await zs  # await zs and return its last element
print(result)      # Prints 14
result = await zs  # zs can be used several times
print(result)      # Prints 14
stream() Streamer[T]

Return a streamer context for safe iteration.


xs = stream.count()
async with as streamer:
    async for item in streamer:

Stream context manager

aiostream.core.streamcontext(aiterable: AsyncIterable[T]) Streamer[T]

Return a stream context manager from an asynchronous iterable.

The context management makes sure the aclose asynchronous method of the corresponding iterator has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

It is safe to use with any asynchronous iterable and prevent asynchronous iterator context to be wrapped twice.

Correct usage:

ait = some_asynchronous_iterable()
async with streamcontext(ait) as streamer:
    async for item in streamer:

For streams objects, it is possible to use the stream method instead:

xs = stream.count()
async with as streamer:
    async for item in streamer:

Operator decorators

aiostream.core.operator(func: Callable[[P], AsyncIterator[T]] | None = None, pipable: bool | None = None) Operator[P, T]

Create a stream operator from an asynchronous generator (or any function returning an asynchronous iterable).

Decorator usage:

async def random(offset=0., width=1.):
    while True:
        yield offset + width * random.random()

The return value is a dynamically created callable. It has the same name, module and documentation as the original function.

A new stream is created by simply calling the operator:

xs = random()

The original function is called right away to check that the signatures match. Other methods are available:

  • original: the original function as a static method

  • raw: same as original with extra checking

The pipable argument is deprecated, use pipable_operator instead.

aiostream.core.pipable_operator(func: Callable[[Concatenate[AsyncIterable[X], P]], AsyncIterator[T]]) PipableOperator[X, P, T]

Create a pipable stream operator from an asynchronous generator (or any function returning an asynchronous iterable).

Decorator usage:

async def multiply(source, factor):
    async with streamcontext(source) as streamer:
         async for item in streamer:
             yield factor * item

The first argument is expected to be the asynchronous iteratable used for piping.

The return value is a dynamically created callable. It has the same name, module and documentation as the original function.

A new stream is created by simply calling the operator:

xs = random()
ys = multiply(xs, 2)

The original function is called right away (but not awaited) to check that signatures match. The sources are also checked for asynchronous iteration.

The operator also have a pipe method that can be used with the pipe synthax:

xs = random()
ys = xs | multiply.pipe(2)

This is strictly equivalent to the previous example.

Other methods are available:

  • original: the original function as a static method

  • raw: same as original with extra checking

The raw method is useful to create new operators from existing ones:

def double(source):
    return multiply.raw(source, 2)
aiostream.core.sources_operator(func: Callable[[P], AsyncIterator[T]]) SourcesOperator[P, T]

Create a pipable stream operator from an asynchronous generator (or any function returning an asynchronous iterable) that takes a variadic *args of sources as argument.

Decorator usage:

async def chain(*sources, repeat=1):
    for source in (sources * repeat):
        async with streamcontext(source) as streamer:
            async for item in streamer:
                yield item

Positional arguments are expected to be asynchronous iterables.

When used in a pipable context, the asynchronous iterable injected by the pipe operator is used as the first argument.

The return value is a dynamically created callable. It has the same name, module and documentation as the original function.

A new stream is created by simply calling the operator:

xs = chain()
ys = chain(random())
zs = chain(stream.just(0.0), stream.just(1.0), random())

The original function is called right away (but not awaited) to check that signatures match. The sources are also checked for asynchronous iteration.

The operator also have a pipe method that can be used with the pipe synthax:

just_zero = stream.just(0.0)
zs = just_zero | chain.pipe(stream.just(1.0), random())

This is strictly equivalent to the previous zs example.

Other methods are available:

  • original: the original function as a static method

  • raw: same as original with extra checking

The raw method is useful to create new operators from existing ones:

def chain_twice(*sources):
    return chain.raw(*sources, repeat=2)