asyncio Beispiel

Ab IPython≥7.0 könnt ihr asyncio direkt in Jupyter Notebooks verwenden; seht auch IPython 7.0, Async REPL.

Wenn ihr die Fehlermeldung RuntimeError: This event loop is already running erhaltet, hilft euch vielleicht [nest-asyncio] weiter.

Ihr könnt das Paket in eurer Jupyter- oder JupyterHub-Umgebung installieren mit

$ pipenv install nest-asyncio

Ihr könnt es dann in euer Notebook importieren und verwenden mit:

[1]:
import nest_asyncio

nest_asyncio.apply()

Einfaches Hello world-Beispiel

[2]:
import asyncio

async def hello():
  print('Hello')
  await asyncio.sleep(1)
  print('world')

await hello()
Hello
world

Ein bisschen näher an einem realen Beispiel

[3]:
import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming {}'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
asyncio.ensure_future(produce(queue, 10), loop=loop)
loop.run_until_complete(consume(queue))
producing 1/10
producing 2/10
consuming 1
producing 3/10
consuming 2
producing 4/10
consuming 3
producing 5/10
consuming 4
producing 6/10
consuming 5
producing 7/10
consuming 6
producing 8/10
consuming 7
producing 9/10
consuming 8
producing 10/10
consuming 9
consuming 10

Ausnahmebehandlung

[4]:
def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s)))
    loop.set_exception_handler(handle_exception)
    queue = asyncio.Queue()

Testen mit pytest

Beispiel:

[5]:
import pytest

@pytest.mark.asyncio
async def test_consume(mock_get, mock_queue, message, create_mock_coro):
    mock_get.side_effect = [message, Exception("break while loop")]

    with pytest.raises(Exception, match="break while loop"):
        await consume(mock_queue)

Bibliotheken von Drittanbietern

  • pytest-asyncio hat hilfreiche Dinge wie Test-Fixtures für event_loop, unused_tcp_port, und unused_tcp_port_factory; und die Möglichkeit zum Erstellen eurer eigenen asynchronen Fixtures.

  • asynctest verfügt über hilfreiche Werkzeuge, einschließlich Coroutine-Mocks und exhaust_callbacks so dass wir await task nicht manuell erstellen müssen.

  • aiohttp hat ein paar wirklich nette eingebaute Test-Utilities.

Debugging

asyncio hat bereits einen debug mode in der Standardbibliothek. Ihr könnt ihn einfach mit der Umgebungsvariablen PYTHONASYNCIODEBUG oder im Code mit loop.set_debug(True) aktivieren.

Verwendet den Debug-Modus zum Identifizieren langsamer asynchroner Aufrufe

Der Debug-Modus von asyncio hat einen kleinen eingebauten Profiler. Wenn der Debug-Modus aktiviert ist, protokolliert asyncio alle asynchronen Aufrufe, die länger als 100 Millisekunden dauern.

Debugging im Produktivbetrieb mit aiodebug

aiodebug ist eine kleine Bibliothek zum Überwachen und Testen von Asyncio-Programmen.

Beispiel

[6]:
from aiodebug import log_slow_callbacks

def main():
    loop = asyncio.get_event_loop()
    log_slow_callbacks.enable(0.05)

Logging

aiologger ermöglicht eine nicht-blockierendes Logging.

Asynchrone Widgets

[7]:
def wait_for_change(widget, value):
    future = asyncio.Future()
    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)
    widget.observe(getvalue, value)
    return future
[8]:
from ipywidgets import IntSlider
slider = IntSlider()

async def f():
    for i in range(10):
        print('did work %s'%i)
        x = await wait_for_change(slider, 'value')
        print('async function continued with value %s'%x)
asyncio.ensure_future(f())

slider
did work 0

Weiterlesen