Examples

Demonstration

The following example demonstrates most of the streams capabilities:

import asyncio
from aiostream import stream, pipe


async def main():

    # Create a counting stream with a 0.2 seconds interval
    xs = stream.count(interval=0.2)

    # Operators can be piped using '|'
    ys = xs | pipe.map(lambda x: x**2)

    # Streams can be sliced
    zs = ys[1:10:2]

    # Use a stream context for proper resource management
    async with zs.stream() as streamer:

        # Asynchronous iteration
        async for z in streamer:

            # Print 1, 9, 25, 49 and 81
            print('->', z)

    # Streams can be awaited and return the last value
    print('9² = ', await zs)

    # Streams can run several times
    print('9² = ', await zs)

    # Streams can be concatenated
    one_two_three = stream.just(1) + stream.range(2, 4)

    # Print [1, 2, 3]
    print(await stream.list(one_two_three))


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Simple computation

This simple example computes 11² + 13² in 1.5 second:

import asyncio
from aiostream import stream, pipe


async def main():
    # This stream computes 11² + 13² in 1.5 second
    xs = (
        stream.count(interval=0.1)      # Count from zero every 0.1 s
        | pipe.skip(10)                 # Skip the first 10 numbers
        | pipe.take(5)                  # Take the following 5
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers
        | pipe.map(lambda x: x ** 2)    # Square the results
        | pipe.accumulate()             # Add the numbers together
    )
    print('11² + 13² = ', await xs)


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Preserve a generator

This example shows how to preserve an async generator from being closed by the iteration context:

import asyncio
from aiostream import stream, operator


async def main():
    async def agen():
        yield 1
        yield 2
        yield 3

    # The xs stream does not preserve the generator
    xs = stream.iterate(agen())
    print(await xs[0])            # Print 1
    print(await stream.list(xs))  # Print [] (2 and 3 have never yielded)

    # The xs stream does preserve the generator
    xs = stream.preserve(agen())
    print(await xs[0])            # Print 1
    print(await stream.list(xs))  # Print [2, 3]

    # Transform agen into a stream operator
    agen_stream = operator(agen)
    xs = agen_stream()            # agen is now reusable
    print(await stream.list(xs))  # Print [1, 2, 3]
    print(await stream.list(xs))  # Print [1, 2, 3]


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Norm server

The next example runs a TCP server that computes the euclidean norm of vectors for its clients.

Run the server:

$ python3.6 norm_server.py
Serving on ('127.0.0.1', 8888)

Test using a netcat client:

$ nc localhost 8888
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
[...]

Check the logs on the server side, and see how the computation is performed on the fly.

import asyncio
from aiostream import stream, pipe

# Constants

INSTRUCTIONS = """\
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
Enter each coordinate of the vector on a separate line, and add an empty
line at the end to get the result. Anything else will result in an error.
> """

ERROR = """\
-> Error ! Try again...
"""

RESULT = """\
-> Euclidean norm: {}
"""


# Client handler

async def euclidean_norm_handler(reader, writer):

    # Define lambdas
    strip =        lambda x: x.decode().strip()
    nonempty =     lambda x: x != ''
    square =       lambda x: x ** 2
    write_cursor = lambda x: writer.write(b'> ')
    square_root =  lambda x: x ** 0.5

    # Create awaitable
    handle_request = (
        stream.iterate(reader)
        | pipe.print('string: {}')
        | pipe.map(strip)
        | pipe.takewhile(nonempty)
        | pipe.map(float)
        | pipe.map(square)
        | pipe.print('square: {:.2f}')
        | pipe.action(write_cursor)
        | pipe.accumulate(initializer=0)
        | pipe.map(square_root)
        | pipe.print('norm -> {:.2f}')
    )

    # Loop over norm computations
    while not reader.at_eof():
        writer.write(INSTRUCTIONS.encode())
        try:
            result = await handle_request
        except ValueError:
            writer.write(ERROR.encode())
        else:
            writer.write(RESULT.format(result).encode())


# Main function

def run_server(bind='127.0.0.1', port=8888):

    # Start the server
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(euclidean_norm_handler, bind, port)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()


# Main execution

if __name__ == '__main__':
    run_server()

Extra operators

This example shows how extra operators can be created and combined with others:

import asyncio
import random as random_module

from aiostream import operator, pipe, streamcontext


@operator
async def random(offset=0., width=1., interval=0.1):
    """Generate a stream of random numbers."""
    while True:
        await asyncio.sleep(interval)
        yield offset + width * random_module.random()


@operator(pipable=True)
async def power(source, exponent):
    """Raise the elements of an asynchronous sequence to the given power."""
    async with streamcontext(source) as streamer:
        async for item in streamer:
            yield item ** exponent


@operator(pipable=True)
def square(source):
    """Square the elements of an asynchronous sequence."""
    return power.raw(source, 2)


async def main():
    xs = (
        random()              # Stream random numbers
        | square.pipe()       # Square the values
        | pipe.take(5)        # Take the first five
        | pipe.accumulate())  # Sum the values
    print(await xs)


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()