asyncio Beispiel#

Ab IPython≥7.0 könnt ihr asyncio direkt in Jupyter Notebooks verwenden; seht auch IPython 7.0, Async REPL.

Wenn ihr die Fehlermeldung RuntimeError: This event loop is already running erhaltet, hilft euch vielleicht [nest-asyncio] weiter.

Ihr könnt das Paket in eurer Jupyter- oder JupyterHub-Umgebung installieren mit

$ pipenv install nest-asyncio

Ihr könnt es dann in euer Notebook importieren und verwenden mit:

[1]:
import nest_asyncio


nest_asyncio.apply()

Zum Weiterlesen

Einfaches Hello world-Beispiel#

[2]:
import asyncio


async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("world")


await hello()
Hello
world

Ein bisschen näher an einem realen Beispiel#

[3]:
import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print("producing {}/{}".format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print("consuming {}".format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue()
asyncio.ensure_future(produce(queue, 10), loop=loop)
loop.run_until_complete(consume(queue))
producing 1/10
producing 2/10
consuming 1
producing 3/10
producing 4/10
producing 5/10
consuming 2
producing 6/10
consuming 3
producing 7/10
consuming 4
producing 8/10
consuming 5
producing 9/10
consuming 6
consuming 7
consuming 8
producing 10/10
consuming 9
consuming 10

Ausnahmebehandlung#

[4]:
def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
        )
    loop.set_exception_handler(handle_exception)
    queue = asyncio.Queue()

Testen mit pytest#

Beispiel:#

[5]:
import pytest


@pytest.mark.asyncio
async def test_consume(mock_get, mock_queue, message, create_mock_coro):
    mock_get.side_effect = [message, Exception("break while loop")]

    with pytest.raises(Exception, match="break while loop"):
        await consume(mock_queue)

Bibliotheken von Drittanbietern#

  • pytest-asyncio hat hilfreiche Dinge wie Test-Fixtures für event_loop, unused_tcp_port, und unused_tcp_port_factory; und die Möglichkeit zum Erstellen eurer eigenen asynchronen Fixtures.

  • asynctest verfügt über hilfreiche Werkzeuge, einschließlich Coroutine-Mocks und exhaust_callbacks so dass wir await task nicht manuell erstellen müssen.

  • aiohttp hat ein paar wirklich nette eingebaute Test-Utilities.

Debugging#

asyncio hat bereits einen debug mode in der Standardbibliothek. Ihr könnt ihn einfach mit der Umgebungsvariablen PYTHONASYNCIODEBUG oder im Code mit loop.set_debug(True) aktivieren.

Verwendet den Debug-Modus zum Identifizieren langsamer asynchroner Aufrufe#

Der Debug-Modus von asyncio hat einen kleinen eingebauten Profiler. Wenn der Debug-Modus aktiviert ist, protokolliert asyncio alle asynchronen Aufrufe, die länger als 100 Millisekunden dauern.

Debugging im Produktivbetrieb mit aiodebug#

aiodebug ist eine kleine Bibliothek zum Überwachen und Testen von Asyncio-Programmen.

Beispiel#

[6]:
from aiodebug import log_slow_callbacks


def main():
    loop = asyncio.get_event_loop()
    log_slow_callbacks.enable(0.05)

Logging#

aiologger ermöglicht eine nicht-blockierendes Logging.

Asynchrone Widgets#

Siehe auch

[7]:
def wait_for_change(widget, value):
    future = asyncio.Future()

    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)

    widget.observe(getvalue, value)
    return future
[8]:
from ipywidgets import IntSlider


slider = IntSlider()


async def f():
    for i in range(10):
        print("did work %s" % i)
        x = await wait_for_change(slider, "value")
        print("async function continued with value %s" % x)


asyncio.ensure_future(f())

slider
did work 0