Examples

Demonstration

The following example demonstrates most of the streams capabilities:

import asyncio

from aiostream import pipe, stream


def square(x: int, *_: int) -> int:
    return x**2


async def main() -> None:
    # Create a counting stream with a 0.2 seconds interval
    xs = stream.count(interval=0.2)

    # Operators can be piped using '|'
    ys = xs | pipe.map(square)

    # Streams can be sliced
    zs = ys[1:10:2]

    # Use a stream context for proper resource management
    async with zs.stream() as streamer:
        # Asynchronous iteration
        async for z in streamer:
            # Print 1, 9, 25, 49 and 81
            print("->", z)

    # Streams can be awaited and return the last value
    print("9² = ", await zs)

    # Streams can run several times
    print("9² = ", await zs)

    # Streams can be concatenated
    one_two_three = stream.just(1) + stream.range(2, 4)

    # Print [1, 2, 3]
    print(await stream.list(one_two_three))


# Run main coroutine
asyncio.run(main())

Simple computation

This simple example computes 11² + 13² in 1.5 second:

import asyncio

from aiostream import pipe, stream


def is_odd(x: int) -> bool:
    return x % 2 == 1


def square(x: int, *_: object) -> int:
    return x**2


async def main() -> None:
    # This stream computes 11² + 13² in 1.5 second
    xs = (
        stream.count(interval=0.1)  # Count from zero every 0.1 s
        | pipe.skip(10)  # Skip the first 10 numbers
        | pipe.take(5)  # Take the following 5
        | pipe.filter(is_odd)  # Keep odd numbers
        | pipe.map(square)  # Square the results
        | pipe.accumulate()  # Add the numbers together
    )
    print("11² + 13² = ", await xs)


# Run main coroutine
asyncio.run(main())

Preserve a generator

This example shows how to preserve an async generator from being closed by the iteration context:

import asyncio
from typing import AsyncIterator

from aiostream import operator, stream


async def main() -> None:
    async def agen() -> AsyncIterator[int]:
        yield 1
        yield 2
        yield 3

    # The xs stream does not preserve the generator
    xs = stream.iterate(agen())
    print(await xs[0])  # Print 1
    print(await stream.list(xs))  # Print [] (2 and 3 have never yielded)

    # The xs stream does preserve the generator
    xs = stream.preserve(agen())
    print(await xs[0])  # Print 1
    print(await stream.list(xs))  # Print [2, 3]

    # Transform agen into a stream operator
    agen_stream = operator(agen)
    xs = agen_stream()  # agen is now reusable
    print(await stream.list(xs))  # Print [1, 2, 3]
    print(await stream.list(xs))  # Print [1, 2, 3]


# Run main coroutine
asyncio.run(main())

Norm server

The next example runs a TCP server that computes the euclidean norm of vectors for its clients.

Run the server:

$ python3.6 norm_server.py
Serving on ('127.0.0.1', 8888)

Test using a netcat client:

$ nc localhost 8888
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
[...]

Check the logs on the server side, and see how the computation is performed on the fly.

import math
import asyncio

from aiostream import pipe, stream

# Constants

INSTRUCTIONS = """\
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
Enter each coordinate of the vector on a separate line, and add an empty
line at the end to get the result. Anything else will result in an error.
> """

ERROR = """\
-> Error ! Try again...
"""

RESULT = """\
-> Euclidean norm: {}
"""


# Client handler


async def euclidean_norm_handler(
    reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
    # Define lambdas
    def strip(x: bytes, *_: object) -> str:
        return x.decode().strip()

    def nonempty(x: str) -> bool:
        return x != ""

    def square(x: float, *_: object) -> float:
        return x**2

    def write_cursor(_: float) -> None:
        return writer.write(b"> ")

    def square_root(x: float, *_: object) -> float:
        return math.sqrt(x)

    # Create awaitable
    handle_request = (
        stream.iterate(reader)
        | pipe.print("string: {}")
        | pipe.map(strip)
        | pipe.takewhile(nonempty)
        | pipe.map(float)
        | pipe.map(square)
        | pipe.print("square: {:.2f}")
        | pipe.action(write_cursor)
        | pipe.accumulate(initializer=0.0)
        | pipe.map(square_root)
        | pipe.print("norm -> {:.2f}")
    )

    # Loop over norm computations
    while not reader.at_eof():
        writer.write(INSTRUCTIONS.encode())
        try:
            result = await handle_request
        except ValueError:
            writer.write(ERROR.encode())
        else:
            writer.write(RESULT.format(result).encode())


# Main function


async def main(bind: str = "127.0.0.1", port: int = 8888) -> None:
    # Start the server
    server = await asyncio.start_server(euclidean_norm_handler, bind, port)

    # Serve requests until Ctrl+C is pressed
    print("Serving on {}".format(server.sockets[0].getsockname()))

    async with server:
        await server.serve_forever()


# Main execution

if __name__ == "__main__":
    asyncio.run(main())

Extra operators

This example shows how extra operators can be created and combined with others:

import asyncio
import random as random_module
from typing import AsyncIterable, AsyncIterator

from aiostream import operator, pipable_operator, pipe, streamcontext


@operator
async def random(
    offset: float = 0.0, width: float = 1.0, interval: float = 0.1
) -> AsyncIterator[float]:
    """Generate a stream of random numbers."""
    while True:
        await asyncio.sleep(interval)
        yield offset + width * random_module.random()


@pipable_operator
async def power(
    source: AsyncIterable[float], exponent: float | int
) -> AsyncIterator[float]:
    """Raise the elements of an asynchronous sequence to the given power."""
    async with streamcontext(source) as streamer:
        async for item in streamer:
            yield item**exponent


@pipable_operator
def square(source: AsyncIterable[float]) -> AsyncIterator[float]:
    """Square the elements of an asynchronous sequence."""
    return power.raw(source, 2)


async def main() -> None:
    xs = (
        random()  # Stream random numbers
        | square.pipe()  # Square the values
        | pipe.take(5)  # Take the first five
        | pipe.accumulate()
    )  # Sum the values
    print(await xs)


# Run main coroutine
asyncio.run(main())