Core objects

Stream base class

class aiostream.core.Stream(factory)

Enhanced asynchronous iterable.

It provides the following features:

  • Operator pipe-lining - using pipe symbol |
  • Repeatability - every iteration creates a different iterator
  • Safe iteration context - using async with and the stream method
  • Simplified execution - get the last element from a stream using await
  • Slicing and indexing - using square brackets []
  • Concatenation - using addition symbol +

It is not meant to be instanciated directly. Use the stream operators instead.

Example:

xs = stream.count()    # xs is a stream object
ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
zs = ys[5:10:2]        # slice ys using start, stop and step

async with zs.stream() as streamer:  # stream zs in a safe context
    async for z in streamer:         # iterate the zs streamer
        print(z)                     # Prints 10, 12, 14

result = await zs  # await zs and return its last element
print(result)      # Prints 14
result = await zs  # zs can be used several times
print(result)      # Prints 14
stream()

Return a streamer context for safe iteration.

Example:

xs = stream.count()
async with xs.stream() as streamer:
    async for item in streamer:
        <block>

Stream context manager

aiostream.core.streamcontext(aiterable)

Return a stream context manager from an asynchronous iterable.

The context management makes sure the aclose asynchronous method of the corresponding iterator has run before it exits. It also issues warnings and RuntimeError if it is used incorrectly.

It is safe to use with any asynchronous iterable and prevent asynchronous iterator context to be wrapped twice.

Correct usage:

ait = some_asynchronous_iterable()
async with streamcontext(ait) as streamer:
    async for item in streamer:
        <block>

For streams objects, it is possible to use the stream method instead:

xs = stream.count()
async with xs.stream() as streamer:
    async for item in streamer:
        <block>

Operator decorator

aiostream.core.operator(func=None, *, pipable=False)

Create a stream operator from an asynchronous generator (or any function returning an asynchronous iterable).

Decorator usage:

@operator
async def random(offset=0., width=1.):
    while True:
        yield offset + width * random.random()

Decorator usage for pipable operators:

@operator(pipable=True)
async def multiply(source, factor):
    async with streamcontext(source) as streamer:
         async for item in streamer:
             yield factor * item

In the case of pipable operators, the first argument is expected to be the asynchronous iteratable used for piping.

The return value is a dynamically created class. It has the same name, module and doc as the original function.

A new stream is created by simply instanciating the operator:

xs = random()
ys = multiply(xs, 2)

The original function is called at instanciation to check that signature match. In the case of pipable operators, the source is also checked for asynchronous iteration.

The operator also have a pipe class method that can be used along with the piping synthax:

xs = random()
ys = xs | multiply.pipe(2)

This is strictly equivalent to the previous example.

Other methods are available:

  • original: the original function as a static method
  • raw: same as original but add extra checking

The raw method is useful to create new operators from existing ones:

@operator(pipable=True)
def double(source):
    return multiply.raw(source, 2)