Core objects¶
Stream base class¶
-
class
aiostream.core.
Stream
(factory)¶ Enhanced asynchronous iterable.
It provides the following features:
- Operator pipe-lining - using pipe symbol
|
- Repeatability - every iteration creates a different iterator
- Safe iteration context - using
async with
and thestream
method - Simplified execution - get the last element from a stream using
await
- Slicing and indexing - using square brackets
[]
- Concatenation - using addition symbol
+
It is not meant to be instanciated directly. Use the stream operators instead.
Example:
xs = stream.count() # xs is a stream object ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements zs = ys[5:10:2] # slice ys using start, stop and step async with zs.stream() as streamer: # stream zs in a safe context async for z in streamer: # iterate the zs streamer print(z) # Prints 10, 12, 14 result = await zs # await zs and return its last element print(result) # Prints 14 result = await zs # zs can be used several times print(result) # Prints 14
-
stream
()¶ Return a streamer context for safe iteration.
Example:
xs = stream.count() async with xs.stream() as streamer: async for item in streamer: <block>
- Operator pipe-lining - using pipe symbol
Stream context manager¶
-
aiostream.core.
streamcontext
(aiterable)¶ Return a stream context manager from an asynchronous iterable.
The context management makes sure the aclose asynchronous method of the corresponding iterator has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.
It is safe to use with any asynchronous iterable and prevent asynchronous iterator context to be wrapped twice.
Correct usage:
ait = some_asynchronous_iterable() async with streamcontext(ait) as streamer: async for item in streamer: <block>
For streams objects, it is possible to use the stream method instead:
xs = stream.count() async with xs.stream() as streamer: async for item in streamer: <block>
Operator decorator¶
-
aiostream.core.
operator
(func=None, *, pipable=False)¶ Create a stream operator from an asynchronous generator (or any function returning an asynchronous iterable).
Decorator usage:
@operator async def random(offset=0., width=1.): while True: yield offset + width * random.random()
Decorator usage for pipable operators:
@operator(pipable=True) async def multiply(source, factor): async with streamcontext(source) as streamer: async for item in streamer: yield factor * item
In the case of pipable operators, the first argument is expected to be the asynchronous iteratable used for piping.
The return value is a dynamically created class. It has the same name, module and doc as the original function.
A new stream is created by simply instanciating the operator:
xs = random() ys = multiply(xs, 2)
The original function is called at instanciation to check that signature match. In the case of pipable operators, the source is also checked for asynchronous iteration.
The operator also have a pipe class method that can be used along with the piping synthax:
xs = random() ys = xs | multiply.pipe(2)
This is strictly equivalent to the previous example.
Other methods are available:
- original: the original function as a static method
- raw: same as original but add extra checking
The raw method is useful to create new operators from existing ones:
@operator(pipable=True) def double(source): return multiply.raw(source, 2)