AsyncResult-Objekt

apply() gibt im noblock-Modus ein AsyncResult-Objekt zurück. Dieses erlaubt zu einem späteren Zeitpunkt Anfragen mit der get()-Methode zu den Ergebnissen. Überdies werden in diesem Objekt auch bei der Ausführung anfallende Metadaten gesammelt.

Das AsyncResult-Objekt bietet eine Reihe praktischer Funktionen für die Parallelisierung, die über Pythons multiprocessing.pool.AsyncResult. hinausgehen:

get_dict

AsyncResult.get_dict()

[1]:
import os

import ipyparallel as ipp


rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
rc[:]["pid_map"] = pids

Metadaten

Client.metadata

Timing

Iterierbare Map-Ergebnisse

[2]:
from __future__ import print_function

import time

import ipyparallel as ipp


# create client & view
rc = ipp.Client()
dv = rc[:]
v = rc.load_balanced_view()

# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter("id", rc.ids, flatten=True)
print("Engine IDs: ", dv["id"])

# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference("id")
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i, r in enumerate(ar):
    print("%i: %.3f" % (i, time.time() - tic))


def sleep_here(t):
    import time

    time.sleep(t)
    return id, t


# one call per task
print("running with one call per task")
amr = v.map(sleep_here, [0.01 * t for t in range(100)])
tic = time.time()
for i, r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))

print("running with four calls per task")
# with chunksize, we can have four calls per task
amr = v.map(sleep_here, [0.01 * t for t in range(100)], chunksize=4)
tic = time.time()
for i, r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))

print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr = v.map(
    sleep_here,
    [0.01 * t for t in range(100, 0, -1)],
    ordered=False,
    chunksize=2,
)
tic = time.time()
for i, r in enumerate(amr):
    print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time() - tic))
Engine IDs:  [0, 1, 2]
sleeping for `id` seconds on each engine
0: 0.027
1: 1.022
2: 2.024
running with one call per task
task 0 on engine 2: 0.000
task 1 on engine 1: 0.001
task 2 on engine 0: 0.001
task 3 on engine 2: 0.001
task 4 on engine 1: 0.001
…
slept 0.12s on engine 2: 16.868
slept 0.11s on engine 2: 16.868
slept 0.14s on engine 0: 16.873
slept 0.13s on engine 0: 16.873
slept 0.16s on engine 1: 16.893
…
[4]:
from functools import reduce
from math import sqrt

import numpy as np


X = np.linspace(0, 100)
add = lambda a, b: a + b
sq = lambda x: x * x
sqrt(reduce(add, map(sq, X)) / len(X))
[4]:
58.028845747399714
  1. map(sq, X) berechnet das Quadrat jedes Elements in der Liste.

  2. reduce(add, sqX) / len(X) berechnet den Mittelwert indem die Liste von AsyncMapResult summiert und durch die Anzahl dividiert wird.

  3. Quadratwurzel der resultierenden Zahl.

Siehe auch: Wenn ihr die Ergebnisse von AsyncResult oder AsyncMapResult noch erweitern wollt, könnt ihr dies mit dem msg_ids-Attribut. Ein Beispiel hierfür findet ihr unter ipyparallel/docs/source/examples/customresults.py.