Examples¶
Demonstration¶
The following example demonstrates most of the streams capabilities:
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x ** 2)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print("->", z)
# Streams can be awaited and return the last value
print("9² = ", await zs)
# Streams can run several times
print("9² = ", await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Simple computation¶
This simple example computes 11² + 13²
in 1.5 second:
import asyncio
from aiostream import stream, pipe
async def main():
# This stream computes 11² + 13² in 1.5 second
xs = (
stream.count(interval=0.1) # Count from zero every 0.1 s
| pipe.skip(10) # Skip the first 10 numbers
| pipe.take(5) # Take the following 5
| pipe.filter(lambda x: x % 2) # Keep odd numbers
| pipe.map(lambda x: x ** 2) # Square the results
| pipe.accumulate() # Add the numbers together
)
print("11² + 13² = ", await xs)
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Preserve a generator¶
This example shows how to preserve an async generator from being closed by the iteration context:
import asyncio
from aiostream import stream, operator
async def main():
async def agen():
yield 1
yield 2
yield 3
# The xs stream does not preserve the generator
xs = stream.iterate(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [] (2 and 3 have never yielded)
# The xs stream does preserve the generator
xs = stream.preserve(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [2, 3]
# Transform agen into a stream operator
agen_stream = operator(agen)
xs = agen_stream() # agen is now reusable
print(await stream.list(xs)) # Print [1, 2, 3]
print(await stream.list(xs)) # Print [1, 2, 3]
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Norm server¶
The next example runs a TCP server that computes the euclidean norm of vectors for its clients.
Run the server:
$ python3.6 norm_server.py
Serving on ('127.0.0.1', 8888)
Test using a netcat client:
$ nc localhost 8888
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
[...]
Check the logs on the server side, and see how the computation is performed on the fly.
import asyncio
from aiostream import stream, pipe
# Constants
INSTRUCTIONS = """\
--------------------------------------
Compute the Euclidean norm of a vector
--------------------------------------
Enter each coordinate of the vector on a separate line, and add an empty
line at the end to get the result. Anything else will result in an error.
> """
ERROR = """\
-> Error ! Try again...
"""
RESULT = """\
-> Euclidean norm: {}
"""
# Client handler
async def euclidean_norm_handler(reader, writer):
# Define lambdas
strip = lambda x: x.decode().strip()
nonempty = lambda x: x != ""
square = lambda x: x ** 2
write_cursor = lambda x: writer.write(b"> ")
square_root = lambda x: x ** 0.5
# Create awaitable
handle_request = (
stream.iterate(reader)
| pipe.print("string: {}")
| pipe.map(strip)
| pipe.takewhile(nonempty)
| pipe.map(float)
| pipe.map(square)
| pipe.print("square: {:.2f}")
| pipe.action(write_cursor)
| pipe.accumulate(initializer=0)
| pipe.map(square_root)
| pipe.print("norm -> {:.2f}")
)
# Loop over norm computations
while not reader.at_eof():
writer.write(INSTRUCTIONS.encode())
try:
result = await handle_request
except ValueError:
writer.write(ERROR.encode())
else:
writer.write(RESULT.format(result).encode())
# Main function
def run_server(bind="127.0.0.1", port=8888):
# Start the server
loop = asyncio.get_event_loop()
coro = asyncio.start_server(euclidean_norm_handler, bind, port)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print("Serving on {}".format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
# Main execution
if __name__ == "__main__":
run_server()
Extra operators¶
This example shows how extra operators can be created and combined with others:
import asyncio
import random as random_module
from aiostream import operator, pipe, streamcontext
@operator
async def random(offset=0.0, width=1.0, interval=0.1):
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()
@operator(pipable=True)
async def power(source, exponent):
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item ** exponent
@operator(pipable=True)
def square(source):
"""Square the elements of an asynchronous sequence."""
return power.raw(source, 2)
async def main():
xs = (
random() # Stream random numbers
| square.pipe() # Square the values
| pipe.take(5) # Take the first five
| pipe.accumulate()
) # Sum the values
print(await xs)
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()