Stream operators

The stream operators produce objects of the Stream class.

They are separated in 7 categories:

creation

iterate, preserve, just, call, empty, throw, never, repeat, count, range

transformation

map, enumerate, starmap, cycle, chunks

selection

take, takelast, skip, skiplast, getitem, filter, until, takewhile, dropwhile

combination

map, zip, merge, chain, ziplatest

aggregation

accumulate, reduce, list

advanced

concat, flatten, switch, concatmap, flatmap, switchmap

timing

spaceout, timeout, delay

miscellaneous

action, print

They can be found in the aiostream.stream module.

Custom stream operators can be created using the @operator decorator.

Pipe-lining

Most of the operators have a pipe() method corresponding to their equivalent pipe operator. They are also gathered and accessible through the aiostream.pipe module. The pipe operators allow a 2-step instantiation.

For instance, the following stream:

ys = stream.map(xs, lambda x: x**2)

is strictly equivalent to:

ys = pipe.map(lambda x: x**2)(xs)

and can be written as:

ys = xs | pipe.map(lambda x: x**2)

This syntax comes in handy when several operators are chained:

ys = (xs
    | pipe.operator1(*args1)
    | pipe.operator2(*args2)
    | pipe.operator3(*args3))

Creation operators

Note

Those operators do not have a pipe equivalent.

class aiostream.stream.iterate(it: AsyncIterable[T] | Iterable[T])

Generate values from a sychronous or asynchronous iterable.

class aiostream.stream.preserve(ait: AsyncIterable[T])

Generate values from an asynchronous iterable without explicitly closing the corresponding iterator.

class aiostream.stream.just(value: T)

Await if possible, and generate a single value.

class aiostream.stream.call(func: SyncCallable[P, T] | AsyncCallable[P, T], *args: ~typing.~P, **kwargs: ~typing.~P)

Call the given function and generate a single value.

Await if the provided function is asynchronous.

class aiostream.stream.empty

Terminate without generating any value.

class aiostream.stream.throw(exc: Exception)

Throw an exception without generating any value.

class aiostream.stream.never

Hang forever without generating any value.

class aiostream.stream.repeat(value: T, times: int | None = None, *, interval: float = 0.0)

Generate the same value a given number of times.

If times is None, the value is repeated indefinitely. An optional interval can be given to space the values out.

class aiostream.stream.range(*args: ~typing.~P, interval: float = 0.0)

Generate a given range of numbers.

It supports the same arguments as the builtin function. An optional interval can be given to space the values out.

class aiostream.stream.count(start: int = 0, step: int = 1, *, interval: float = 0.0)

Generate consecutive numbers indefinitely.

Optional starting point and increment can be defined, respectively defaulting to 0 and 1.

An optional interval can be given to space the values out.

Transformation operators

class aiostream.stream.map(source: AsyncIterable[T], func: MapCallable[T, U], *more_sources: AsyncIterable[T], ordered: bool = True, task_limit: int | None = None)

Apply a given function to the elements of one or several asynchronous sequences.

Each element is used as a positional argument, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted. The function can either be synchronous or asynchronous (coroutine function).

The results can either be returned in or out of order, depending on the corresponding ordered argument. This argument is ignored if the provided function is synchronous.

The coroutines run concurrently but their amount can be limited using the task_limit argument. A value of 1 will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.

If more than one sequence is provided, they’re also awaited concurrently, so that their waiting times don’t add up.

It might happen that the provided function returns a coroutine but is not a coroutine function per se. In this case, one can wrap the function with aiostream.async_ in order to force map to await the resulting coroutine. The following example illustrates the use async_ with a lambda function:

from aiostream import stream, async_
...
ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))

Note

map is considered a combination operator if used with extra sources, and a transformation operator otherwise

class aiostream.stream.enumerate(source: AsyncIterable[T], start: int = 0, step: int = 1)

Generate (index, value) tuples from an asynchronous sequence.

This index is computed using a starting point and an increment, respectively defaulting to 0 and 1.

class aiostream.stream.starmap(source: AsyncIterable[tuple[T, ...]], func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U], ordered: bool = True, task_limit: int | None = None)

Apply a given function to the unpacked elements of an asynchronous sequence.

Each element is unpacked before applying the function. The given function can either be synchronous or asynchronous.

The results can either be returned in or out of order, depending on the corresponding ordered argument. This argument is ignored if the provided function is synchronous.

The coroutines run concurrently but their amount can be limited using the task_limit argument. A value of 1 will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.

class aiostream.stream.cycle(source: AsyncIterable[T])

Iterate indefinitely over an asynchronous sequence.

Note: it does not perform any buffering, but re-iterate over the same given sequence instead. If the sequence is not re-iterable, the generator might end up looping indefinitely without yielding any item.

class aiostream.stream.chunks(source: AsyncIterable[T], n: int)

Generate chunks of size n from an asynchronous sequence.

The chunks are lists, and the last chunk might contain less than n elements.

Selection operators

class aiostream.stream.take(source: AsyncIterable[T], n: int)

Forward the first n elements from an asynchronous sequence.

If n is negative, it simply terminates before iterating the source.

class aiostream.stream.takelast(source: AsyncIterable[T], n: int)

Forward the last n elements from an asynchronous sequence.

If n is negative, it simply terminates after iterating the source.

Note: it is required to reach the end of the source before the first element is generated.

class aiostream.stream.skip(source: AsyncIterable[T], n: int)

Forward an asynchronous sequence, skipping the first n elements.

If n is negative, no elements are skipped.

class aiostream.stream.skiplast(source: AsyncIterable[T], n: int)

Forward an asynchronous sequence, skipping the last n elements.

If n is negative, no elements are skipped.

Note: it is required to reach the n+1 th element of the source before the first element is generated.

class aiostream.stream.getitem(source: AsyncIterable[T], index: int | builtins.slice)

Forward one or several items from an asynchronous sequence.

The argument can either be a slice or an integer. See the slice and item operators for more information.

class aiostream.stream.filter(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]])

Filter an asynchronous sequence using an arbitrary function.

The function takes the item as an argument and returns True if it should be forwarded, False otherwise. The function can either be synchronous or asynchronous.

class aiostream.stream.until(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]])

Forward an asynchronous sequence until a condition is met.

Contrary to the takewhile operator, the last tested element is included in the sequence.

The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.

class aiostream.stream.takewhile(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]])

Forward an asynchronous sequence while a condition is met.

Contrary to the until operator, the last tested element is not included in the sequence.

The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.

class aiostream.stream.dropwhile(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]])

Discard the elements from an asynchronous sequence while a condition is met.

The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.

Combination operators

class aiostream.stream.map(source: AsyncIterable[T], func: MapCallable[T, U], *more_sources: AsyncIterable[T], ordered: bool = True, task_limit: int | None = None)

Apply a given function to the elements of one or several asynchronous sequences.

Each element is used as a positional argument, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted. The function can either be synchronous or asynchronous (coroutine function).

The results can either be returned in or out of order, depending on the corresponding ordered argument. This argument is ignored if the provided function is synchronous.

The coroutines run concurrently but their amount can be limited using the task_limit argument. A value of 1 will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.

If more than one sequence is provided, they’re also awaited concurrently, so that their waiting times don’t add up.

It might happen that the provided function returns a coroutine but is not a coroutine function per se. In this case, one can wrap the function with aiostream.async_ in order to force map to await the resulting coroutine. The following example illustrates the use async_ with a lambda function:

from aiostream import stream, async_
...
ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))

Note

map is considered a combination operator if used with extra sources, and a transformation operator otherwise

class aiostream.stream.zip(source: AsyncIterable[T], *more_sources: AsyncIterable[T])

Combine and forward the elements of several asynchronous sequences.

Each generated value is a tuple of elements, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted.

Note: the different sequences are awaited in parrallel, so that their waiting times don’t add up.

class aiostream.stream.merge(source: AsyncIterable[T], *more_sources: AsyncIterable[T])

Merge several asynchronous sequences together.

All the sequences are iterated simultaneously and their elements are forwarded as soon as they’re available. The generation continues until all the sequences are exhausted.

class aiostream.stream.chain(source: AsyncIterable[T], *more_sources: AsyncIterable[T])

Chain asynchronous sequences together, in the order they are given.

Note: the sequences are not iterated until it is required, so if the operation is interrupted, the remaining sequences will be left untouched.

class aiostream.stream.ziplatest(source: AsyncIterable[T], *more_sources: AsyncIterable[T], partial: bool = True, default: T | None = None)

Combine several asynchronous sequences together, producing a tuple with the lastest element of each sequence whenever a new element is received.

The value to use when a sequence has not procuded any element yet is given by the default argument (defaulting to None).

The producing of partial results can be disabled by setting the optional argument partial to False.

All the sequences are iterated simultaneously and their elements are forwarded as soon as they’re available. The generation continues until all the sequences are exhausted.

Aggregatation operators

class aiostream.stream.accumulate(source: AsyncIterable[T], func: Callable[[T, T], Awaitable[T] | T] = <built-in function add>, initializer: T | None = None)

Generate a series of accumulated sums (or other binary function) from an asynchronous sequence.

If initializer is present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty.

class aiostream.stream.reduce(source: AsyncIterable[T], func: Callable[[T, T], Awaitable[T] | T], initializer: T | None = None)

Apply a function of two arguments cumulatively to the items of an asynchronous sequence, reducing the sequence to a single value.

If initializer is present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty.

class aiostream.stream.list(source: AsyncIterable[T])

Build a list from an asynchronous sequence.

All the intermediate steps are generated, starting from the empty list.

This operator can be used to easily convert a stream into a list:

lst = await stream.list(x)

..note:: The same list object is produced at each step in order to avoid memory copies.

Advanced operators

Note

The concat, flatten and switch operators all take a stream of streams as an argument (also called stream of higher order) and return a flattened stream using their own merging strategy.

class aiostream.stream.concat(source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None)

Given an asynchronous sequence of sequences, generate the elements of the sequences in order.

The sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.

Errors raised in the source or an element sequence are propagated.

class aiostream.stream.flatten(source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None)

Given an asynchronous sequence of sequences, generate the elements of the sequences as soon as they’re received.

The sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.

Errors raised in the source or an element sequence are propagated.

class aiostream.stream.switch(source: AsyncIterable[AsyncIterable[T]])

Given an asynchronous sequence of sequences, generate the elements of the most recent sequence.

Element sequences are generated eagerly, and closed once they are superseded by a more recent sequence. Once the main sequence is finished, the last subsequence will be exhausted completely.

Errors raised in the source or an element sequence (that was not already closed) are propagated.

Note

The concatmap, flatmap and switchmap operators provide a simpler access to the three merging strategy listed above.

class aiostream.stream.concatmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T], task_limit: int | None = None)

Apply a given function that creates a sequence from the elements of one or several asynchronous sequences, and generate the elements of the created sequences in order.

The function is applied as described in map, and must return an asynchronous sequence. The returned sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.

class aiostream.stream.flatmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T], task_limit: int | None = None)

Apply a given function that creates a sequence from the elements of one or several asynchronous sequences, and generate the elements of the created sequences as soon as they arrive.

The function is applied as described in map, and must return an asynchronous sequence. The returned sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.

Errors raised in a source or output sequence are propagated.

class aiostream.stream.switchmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T])

Apply a given function that creates a sequence from the elements of one or several asynchronous sequences and generate the elements of the most recently created sequence.

The function is applied as described in map, and must return an asynchronous sequence. Errors raised in a source or output sequence (that was not already closed) are propagated.

Timing operators

class aiostream.stream.spaceout(source: AsyncIterable[T], interval: float)

Make sure the elements of an asynchronous sequence are separated in time by the given interval.

class aiostream.stream.timeout(source: AsyncIterable[T], timeout: float)

Raise a time-out if an element of the asynchronous sequence takes too long to arrive.

Note: the timeout is not global but specific to each step of the iteration.

class aiostream.stream.delay(source: AsyncIterable[T], delay: float)

Delay the iteration of an asynchronous sequence.

Miscellaneous operators

class aiostream.stream.action(source: AsyncIterable[T], func: Callable[[T], Awaitable[Any] | Any], ordered: bool = True, task_limit: int | None = None)

Perform an action for each element of an asynchronous sequence without modifying it.

The given function can be synchronous or asynchronous.

The results can either be returned in or out of order, depending on the corresponding ordered argument. This argument is ignored if the provided function is synchronous.

The coroutines run concurrently but their amount can be limited using the task_limit argument. A value of 1 will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.

class aiostream.stream.print(source: AsyncIterable[T], template: str = '{}', sep: str = ' ', end: str = '\n', file: Any | None = None, flush: bool = False)

Print each element of an asynchronous sequence without modifying it.

An optional template can be provided to be formatted with the elements. All the keyword arguments are forwarded to the builtin function print.