Stream operators
The stream operators produce objects of the Stream class.
They are separated in 7 categories:
creation |
|
transformation |
|
selection |
|
combination |
|
aggregation |
|
advanced |
|
timing |
|
miscellaneous |
They can be found in the aiostream.stream module.
Custom stream operators can be created using the @operator decorator.
Pipe-lining
Most of the operators have a pipe() method corresponding to their equivalent pipe operator.
They are also gathered and accessible through the aiostream.pipe module.
The pipe operators allow a 2-step instantiation.
For instance, the following stream:
ys = stream.map(xs, lambda x: x**2)
is strictly equivalent to:
ys = pipe.map(lambda x: x**2)(xs)
and can be written as:
ys = xs | pipe.map(lambda x: x**2)
This syntax comes in handy when several operators are chained:
ys = (xs
| pipe.operator1(*args1)
| pipe.operator2(*args2)
| pipe.operator3(*args3))
Creation operators
Note
Those operators do not have a pipe equivalent.
- aiostream.stream.iterate(it: AsyncIterable[T] | Iterable[T]) Stream[T]
Generate values from a sychronous or asynchronous iterable.
- aiostream.stream.preserve(ait: AsyncIterable[T]) Stream[T]
Generate values from an asynchronous iterable without explicitly closing the corresponding iterator.
- aiostream.stream.call(func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs) Stream[T]
Call the given function and generate a single value.
Await if the provided function is asynchronous.
- aiostream.stream.throw(exc: Exception | Type[Exception]) Stream[Never]
Throw an exception without generating any value.
- aiostream.stream.repeat(value: T, times: int | None = None, *, interval: float = 0.0) Stream[T]
Generate the same value a given number of times.
If
timesisNone, the value is repeated indefinitely. An optional interval can be given to space the values out.
Transformation operators
- aiostream.stream.map(source: AsyncIterable[T], func: MapCallable[T, U], *more_sources: AsyncIterable[T], ordered: bool = True, task_limit: int | None = None) Stream[U]
Apply a given function to the elements of one or several asynchronous sequences.
Each element is used as a positional argument, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted. The function can either be synchronous or asynchronous (coroutine function).
The results can either be returned in or out of order, depending on the corresponding
orderedargument. This argument is ignored if the provided function is synchronous.The coroutines run concurrently but their amount can be limited using the
task_limitargument. A value of1will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.If more than one sequence is provided, they’re also awaited concurrently, so that their waiting times don’t add up.
It might happen that the provided function returns a coroutine but is not a coroutine function per se. In this case, one can wrap the function with
aiostream.async_in order to forcemapto await the resulting coroutine. The following example illustrates the useasync_with a lambda function:from aiostream import stream, async_ ... ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
Note
mapis considered a combination operator if used with extra sources, and a transformation operator otherwise
- aiostream.stream.enumerate(source: AsyncIterable[T], start: int = 0, step: int = 1) Stream[tuple[int, T]]
Generate
(index, value)tuples from an asynchronous sequence.This index is computed using a starting point and an increment, respectively defaulting to
0and1.
- aiostream.stream.starmap(source: AsyncIterable[tuple[T, ...]], func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U], ordered: bool = True, task_limit: int | None = None) Stream[U]
Apply a given function to the unpacked elements of an asynchronous sequence.
Each element is unpacked before applying the function. The given function can either be synchronous or asynchronous.
The results can either be returned in or out of order, depending on the corresponding
orderedargument. This argument is ignored if the provided function is synchronous.The coroutines run concurrently but their amount can be limited using the
task_limitargument. A value of1will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.
- aiostream.stream.cycle(source: AsyncIterable[T]) Stream[T]
Iterate indefinitely over an asynchronous sequence.
Note: it does not perform any buffering, but re-iterate over the same given sequence instead. If the sequence is not re-iterable, the generator might end up looping indefinitely without yielding any item.
Selection operators
- aiostream.stream.take(source: AsyncIterable[T], n: int) Stream[T]
Forward the first
nelements from an asynchronous sequence.If
nis negative, it simply terminates before iterating the source.
- aiostream.stream.takelast(source: AsyncIterable[T], n: int) Stream[T]
Forward the last
nelements from an asynchronous sequence.If
nis negative, it simply terminates after iterating the source.Note: it is required to reach the end of the source before the first element is generated.
- aiostream.stream.skip(source: AsyncIterable[T], n: int) Stream[T]
Forward an asynchronous sequence, skipping the first
nelements.If
nis negative, no elements are skipped.
- aiostream.stream.skiplast(source: AsyncIterable[T], n: int) Stream[T]
Forward an asynchronous sequence, skipping the last
nelements.If
nis negative, no elements are skipped.Note: it is required to reach the
n+1th element of the source before the first element is generated.
- aiostream.stream.getitem(source: AsyncIterable[T], index: int | builtins.slice) Stream[T]
Forward one or several items from an asynchronous sequence.
The argument can either be a slice or an integer. See the slice and item operators for more information.
- aiostream.stream.filter(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]) Stream[T]
Filter an asynchronous sequence using an arbitrary function.
The function takes the item as an argument and returns
Trueif it should be forwarded,Falseotherwise. The function can either be synchronous or asynchronous.
- aiostream.stream.until(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]) Stream[T]
Forward an asynchronous sequence until a condition is met.
Contrary to the
takewhileoperator, the last tested element is included in the sequence.The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.
- aiostream.stream.takewhile(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]) Stream[T]
Forward an asynchronous sequence while a condition is met.
Contrary to the
untiloperator, the last tested element is not included in the sequence.The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.
- aiostream.stream.dropwhile(source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]) Stream[T]
Discard the elements from an asynchronous sequence while a condition is met.
The given function takes the item as an argument and returns a boolean corresponding to the condition to meet. The function can either be synchronous or asynchronous.
Combination operators
- aiostream.stream.map(source: AsyncIterable[T], func: MapCallable[T, U], *more_sources: AsyncIterable[T], ordered: bool = True, task_limit: int | None = None) Stream[U]
Apply a given function to the elements of one or several asynchronous sequences.
Each element is used as a positional argument, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted. The function can either be synchronous or asynchronous (coroutine function).
The results can either be returned in or out of order, depending on the corresponding
orderedargument. This argument is ignored if the provided function is synchronous.The coroutines run concurrently but their amount can be limited using the
task_limitargument. A value of1will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.If more than one sequence is provided, they’re also awaited concurrently, so that their waiting times don’t add up.
It might happen that the provided function returns a coroutine but is not a coroutine function per se. In this case, one can wrap the function with
aiostream.async_in order to forcemapto await the resulting coroutine. The following example illustrates the useasync_with a lambda function:from aiostream import stream, async_ ... ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
Note
mapis considered a combination operator if used with extra sources, and a transformation operator otherwise
- aiostream.stream.zip(*sources: AsyncIterable[T]) Stream[tuple[T, ...]]
Combine and forward the elements of several asynchronous sequences.
Each generated value is a tuple of elements, using the same order as their respective sources. The generation continues until the shortest sequence is exhausted.
Note: the different sequences are awaited in parrallel, so that their waiting times don’t add up.
- aiostream.stream.merge(*sources: AsyncIterable[T]) Stream[T]
Merge several asynchronous sequences together.
All the sequences are iterated simultaneously and their elements are forwarded as soon as they’re available. The generation continues until all the sequences are exhausted.
- aiostream.stream.chain(*sources: AsyncIterable[T]) Stream[T]
Chain asynchronous sequences together, in the order they are given.
Note: the sequences are not iterated until it is required, so if the operation is interrupted, the remaining sequences will be left untouched.
- aiostream.stream.ziplatest(*sources: AsyncIterable[T], partial: bool = True, default: T | None = None) Stream[tuple[T | None, ...]]
Combine several asynchronous sequences together, producing a tuple with the lastest element of each sequence whenever a new element is received.
The value to use when a sequence has not procuded any element yet is given by the
defaultargument (defaulting toNone).The producing of partial results can be disabled by setting the optional argument
partialtoFalse.All the sequences are iterated simultaneously and their elements are forwarded as soon as they’re available. The generation continues until all the sequences are exhausted.
Aggregatation operators
- aiostream.stream.accumulate(source: AsyncIterable[T], func: Callable[[T, T], Awaitable[T] | T] = <built-in function add>, initializer: T | None = None) Stream[T]
Generate a series of accumulated sums (or other binary function) from an asynchronous sequence.
If
initializeris present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty.
- aiostream.stream.reduce(source: AsyncIterable[T], func: Callable[[T, T], Awaitable[T] | T], initializer: T | None = None) Stream[T]
Apply a function of two arguments cumulatively to the items of an asynchronous sequence, reducing the sequence to a single value.
If
initializeris present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty.
- aiostream.stream.list(source: AsyncIterable[T]) Stream[builtins.list[T]]
Build a list from an asynchronous sequence.
All the intermediate steps are generated, starting from the empty list.
This operator can be used to easily convert a stream into a list:
lst = await stream.list(x)
..note:: The same list object is produced at each step in order to avoid memory copies.
Advanced operators
Note
The concat, flatten and switch operators
all take a stream of streams as an argument (also called stream of
higher order) and return a flattened stream using their own merging
strategy.
- aiostream.stream.concat(source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None) Stream[T]
Given an asynchronous sequence of sequences, generate the elements of the sequences in order.
The sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.
Errors raised in the source or an element sequence are propagated.
- aiostream.stream.flatten(source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None) Stream[T]
Given an asynchronous sequence of sequences, generate the elements of the sequences as soon as they’re received.
The sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.
Errors raised in the source or an element sequence are propagated.
- aiostream.stream.switch(source: AsyncIterable[AsyncIterable[T]]) Stream[T]
Given an asynchronous sequence of sequences, generate the elements of the most recent sequence.
Element sequences are generated eagerly, and closed once they are superseded by a more recent sequence. Once the main sequence is finished, the last subsequence will be exhausted completely.
Errors raised in the source or an element sequence (that was not already closed) are propagated.
Note
The concatmap, flatmap and switchmap operators
provide a simpler access to the three merging strategy listed above.
- aiostream.stream.concatmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T], task_limit: int | None = None) Stream[U]
Apply a given function that creates a sequence from the elements of one or several asynchronous sequences, and generate the elements of the created sequences in order.
The function is applied as described in map, and must return an asynchronous sequence. The returned sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.
- aiostream.stream.flatmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T], task_limit: int | None = None) Stream[U]
Apply a given function that creates a sequence from the elements of one or several asynchronous sequences, and generate the elements of the created sequences as soon as they arrive.
The function is applied as described in map, and must return an asynchronous sequence. The returned sequences are awaited concurrently, although it’s possible to limit the amount of running sequences using the task_limit argument.
Errors raised in a source or output sequence are propagated.
- aiostream.stream.switchmap(source: AsyncIterable[T], func: combine.SmapCallable[T, AsyncIterable[U]], *more_sources: AsyncIterable[T]) Stream[U]
Apply a given function that creates a sequence from the elements of one or several asynchronous sequences and generate the elements of the most recently created sequence.
The function is applied as described in map, and must return an asynchronous sequence. Errors raised in a source or output sequence (that was not already closed) are propagated.
Timing operators
- aiostream.stream.spaceout(source: AsyncIterable[T], interval: float) Stream[T]
Make sure the elements of an asynchronous sequence are separated in time by the given interval.
Miscellaneous operators
- aiostream.stream.action(source: AsyncIterable[T], func: Callable[[T], Awaitable[Any] | Any], ordered: bool = True, task_limit: int | None = None) Stream[T]
Perform an action for each element of an asynchronous sequence without modifying it.
The given function can be synchronous or asynchronous.
The results can either be returned in or out of order, depending on the corresponding
orderedargument. This argument is ignored if the provided function is synchronous.The coroutines run concurrently but their amount can be limited using the
task_limitargument. A value of1will cause the coroutines to run sequentially. This argument is ignored if the provided function is synchronous.
- aiostream.stream.print(source: AsyncIterable[T], template: str = '{}', sep: str = ' ', end: str = '\n', file: Any | None = None, flush: bool = False) Stream[T]
Print each element of an asynchronous sequence without modifying it.
An optional template can be provided to be formatted with the elements. All the keyword arguments are forwarded to the builtin function print.