Examples
Demonstration
The following example demonstrates most of the streams capabilities:
import asyncio
from aiostream import pipe, stream
def square(x: int, *_: int) -> int:
return x**2
async def main() -> None:
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(square)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print("->", z)
# Streams can be awaited and return the last value
print("9² = ", await zs)
# Streams can run several times
print("9² = ", await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
asyncio.run(main())
Simple computation
This simple example computes 11² + 13²
in 1.5 second:
import asyncio
from aiostream import pipe, stream
def is_odd(x: int) -> bool:
return x % 2 == 1
def square(x: int, *_: object) -> int:
return x**2
async def main() -> None:
# This stream computes 11² + 13² in 1.5 second
xs = (
stream.count(interval=0.1) # Count from zero every 0.1 s
| pipe.skip(10) # Skip the first 10 numbers
| pipe.take(5) # Take the following 5
| pipe.filter(is_odd) # Keep odd numbers
| pipe.map(square) # Square the results
| pipe.accumulate() # Add the numbers together
)
print("11² + 13² = ", await xs)
# Run main coroutine
asyncio.run(main())
Preserve a generator
This example shows how to preserve an async generator from being closed by the iteration context:
import asyncio
from typing import AsyncIterator
from aiostream import operator, stream
async def main() -> None:
async def agen() -> AsyncIterator[int]:
yield 1
yield 2
yield 3
# The xs stream does not preserve the generator
xs = stream.iterate(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [] (2 and 3 have never yielded)
# The xs stream does preserve the generator
xs = stream.preserve(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [2, 3]
# Transform agen into a stream operator
agen_stream = operator(agen)
xs = agen_stream() # agen is now reusable
print(await stream.list(xs)) # Print [1, 2, 3]
print(await stream.list(xs)) # Print [1, 2, 3]
# Run main coroutine
asyncio.run(main())
Norm server
The next example runs a TCP server that computes the euclidean norm of vectors for its clients.
Run the server:
$ python3.6 norm_server.py
Serving on ('127.0.0.1', 8888)
Test using a netcat client:
$ nc localhost 8888
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
[...]
Check the logs on the server side, and see how the computation is performed on the fly.
import math
import asyncio
from aiostream import pipe, stream
# Constants
INSTRUCTIONS = """\
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
Enter each coordinate of the vector on a separate line, and add an empty
line at the end to get the result. Anything else will result in an error.
> """
ERROR = """\
-> Error ! Try again...
"""
RESULT = """\
-> Euclidean norm: {}
"""
# Client handler
async def euclidean_norm_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
# Define lambdas
def strip(x: bytes, *_: object) -> str:
return x.decode().strip()
def nonempty(x: str) -> bool:
return x != ""
def square(x: float, *_: object) -> float:
return x**2
def write_cursor(_: float) -> None:
return writer.write(b"> ")
def square_root(x: float, *_: object) -> float:
return math.sqrt(x)
# Create awaitable
handle_request = (
stream.iterate(reader)
| pipe.print("string: {}")
| pipe.map(strip)
| pipe.takewhile(nonempty)
| pipe.map(float)
| pipe.map(square)
| pipe.print("square: {:.2f}")
| pipe.action(write_cursor)
| pipe.accumulate(initializer=0.0)
| pipe.map(square_root)
| pipe.print("norm -> {:.2f}")
)
# Loop over norm computations
while not reader.at_eof():
writer.write(INSTRUCTIONS.encode())
try:
result = await handle_request
except ValueError:
writer.write(ERROR.encode())
else:
writer.write(RESULT.format(result).encode())
# Main function
async def main(bind: str = "127.0.0.1", port: int = 8888) -> None:
# Start the server
server = await asyncio.start_server(euclidean_norm_handler, bind, port)
# Serve requests until Ctrl+C is pressed
print("Serving on {}".format(server.sockets[0].getsockname()))
async with server:
await server.serve_forever()
# Main execution
if __name__ == "__main__":
asyncio.run(main())
Extra operators
This example shows how extra operators can be created and combined with others:
import asyncio
import random as random_module
from typing import AsyncIterable, AsyncIterator
from aiostream import operator, pipable_operator, pipe, streamcontext
@operator
async def random(
offset: float = 0.0, width: float = 1.0, interval: float = 0.1
) -> AsyncIterator[float]:
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()
@pipable_operator
async def power(
source: AsyncIterable[float], exponent: float | int
) -> AsyncIterator[float]:
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item**exponent
@pipable_operator
def square(source: AsyncIterable[float]) -> AsyncIterator[float]:
"""Square the elements of an asynchronous sequence."""
return power.raw(source, 2)
async def main() -> None:
xs = (
random() # Stream random numbers
| square.pipe() # Square the values
| pipe.take(5) # Take the first five
| pipe.accumulate()
) # Sum the values
print(await xs)
# Run main coroutine
asyncio.run(main())