Multi-Processing-Beispiel

Wir beginnen hier mit Code, der klar und einfach ist und von oben nach unten ausgeführt wird. Er ist einfach zu entwickeln und inkrementell zu testen:

[1]:
from multiprocessing.pool import ThreadPool as Pool

import requests


sites = [
    "https://github.com/veit/jupyter-tutorial/",
    "https://jupyter-tutorial.readthedocs.io/en/latest/",
    "https://github.com/veit/pyviz-tutorial/",
    "https://pyviz-tutorial.readthedocs.io/de/latest/",
    "https://cusy.io/en",
]


def sitesize(url):
    with requests.get(url) as u:
        return url, len(u.content)


pool = Pool(4)
for result in pool.imap_unordered(sitesize, sites):
    print(result)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/jupyter-tutorial/', 237670)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://cusy.io/en', 33545)

Hinweis

Eine gute Entwicklungsstrategie ist die Verwendung von map, um den Code in einem einzelnen Prozess und einem einzelnen Thread zu testen, bevor zu Multi-Processing gewechselt wird.

Hinweis

Um besser einschätzen zu können, wann ThreadPool und wann Pool verwendet werden sollte, hier einige Faustregeln:

  • Für CPU-lastige Jobs sollte multiprocessing.pool.Pool verwendet werden. Üblicherweise beginnen wir hier mit der doppelten Anzahl von CPU-Kernen für die Pool-Größe, mindestens jedoch mit 4.

  • Für I/O-lastige Jobs sollte multiprocessing.pool.ThreadPool verwendet werden. Üblicherweise beginnen wir hier mit der fünffachen Anzahl von CPU-Kernen für die Pool-Größe.

  • Verwenden wir Python 3 und benötigen kein mit Pool identisches Interface, nutzen wir concurrent.future.Executor statt multiprocessing.pool.ThreadPool; er hat ein einfacheres Interface und wurde von Anfang an für Threads konzipiert. Da er Instanzen von concurrent.futures.Future zurückgibt, ist er kompatibel zu vielen anderen Bibliotheken, einschließlich asyncio.

  • Für CPU- und I/O-lastige Jobs bevorzugen wir multiprocessing.Pool, da hierdurch eine bessere Prozess-Isolierung erreicht wird.

[2]:
from multiprocessing.pool import ThreadPool as Pool

import requests


sites = [
    "https://github.com/veit/jupyter-tutorial/",
    "https://jupyter-tutorial.readthedocs.io/en/latest/",
    "https://github.com/veit/pyviz-tutorial/",
    "https://pyviz-tutorial.readthedocs.io/de/latest/",
    "https://cusy.io/en",
]


def sitesize(url):
    with requests.get(url) as u:
        return url, len(u.content)


for result in map(sitesize, sites):
    print(result)
('https://github.com/veit/jupyter-tutorial/', 237669)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)
('https://cusy.io/en', 33545)

Was ist parallelisierbar?

Amdahlsche Gesetz

Der Geschwindigkeitszuwachs vor allem durch den sequentiellen Anteil des Problems beschränkt, da sich dessen Ausführungszeit durch Parallelisierung nicht verringern lässt. Zudem entstehen durch Parallelisierung zusätzliche Kosten wie etwa für die Kommunikation und die Synchronisierung der Prozesse.

In unserem Beispiel können die folgenden Aufgaben nur seriell abgearbeitet werden:

  • UDP DNS request für die URL

  • UDP DNS response

  • Socket vom OS

  • TCP-Connection

  • Senden des HTTP Request für die Root-Ressource

  • Warten auf die TCP Response

  • Zählen der Zeichen auf der Website

[3]:
from multiprocessing.pool import ThreadPool as Pool

import requests


sites = [
    "https://github.com/veit/jupyter-tutorial/",
    "https://jupyter-tutorial.readthedocs.io/en/latest/",
    "https://github.com/veit/pyviz-tutorial/",
    "https://pyviz-tutorial.readthedocs.io/de/latest/",
    "https://cusy.io/en",
]


def sitesize(url):
    """Determine the size of a website"""
    with requests.get(url, stream=True) as u:
        return url, len(u.content)


pool = Pool(4)
for result in pool.imap_unordered(sitesize, sites):
    print(result)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://cusy.io/en', 33545)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/jupyter-tutorial/', 237670)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)

Hinweis

imap_unordered wird verwendet, um die Reaktionsfähigkeit zu verbessern. Dies ist jedoch nur möglich, da die Funktion das Argument und das Ergebnis als Tuple zurückgibt.

Tipps

  • Macht nicht zu viele Trips hin und her

    Erhaltet ihr zu viele iterierbare Ergebnisse, ist dies ein guter Indikator für zu viele Trips, wie z.B. in

    >>>     def sitesize(url, start):
    ...         req = urllib.request.Request()
    ...         req.add_header('Range:%d-%d' % (start, start+1000))
    ...         u = urllib.request.urlopen(url, req)
    ...         block = u.read()
    ...         return url, len(block)
    
  • Macht auf jedem Trip relevante Fortschritte

    Sobald ihr den Prozess erhaltet, solltet ihr deutliche Fortschritte erzielen und euch nicht verzetteln. Das folgende Beispiel verdeutlicht zu kleine Zwischenschritte:

    >>> def sitesize(url, results):
    ...     with requests.get(url, stream=True) as u:
    ...         while True:
    ...             line = u.iter_lines()
    ...             results.put((url, len(line)))
    
  • Sendet und empfangt nicht zu viele Daten

    Das folgende Beispiel erhöht unnötig die Datenmenge:

    >>> def sitesize(url):
    ...     with requests.get(url) as u:
    ...         return url, u.content