AsyncResult
-Objekt¶
apply()
gibt im noblock
-Modus ein AsyncResult
-Objekt zurück. Dieses erlaubt zu einem späteren Zeitpunkt Anfragen mit der get()
-Methode zu den Ergebnissen. Überdies werden in diesem Objekt auch bei der Ausführung anfallende Metadaten gesammelt.
Das AsyncResult-Objekt bietet eine Reihe praktischer Funktionen für die Parallelisierung, die über Pythons multiprocessing.pool.AsyncResult. hinausgehen:
get_dict
¶
AsyncResult.get_dict()
[1]:
import os
import ipyparallel as ipp
rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
rc[:]["pid_map"] = pids
Metadaten¶
Client.metadata
Timing¶
Iterierbare Map-Ergebnisse¶
[2]:
from __future__ import print_function
import time
import ipyparallel as ipp
# create client & view
rc = ipp.Client()
dv = rc[:]
v = rc.load_balanced_view()
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter("id", rc.ids, flatten=True)
print("Engine IDs: ", dv["id"])
# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference("id")
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i, r in enumerate(ar):
print("%i: %.3f" % (i, time.time() - tic))
def sleep_here(t):
import time
time.sleep(t)
return id, t
# one call per task
print("running with one call per task")
amr = v.map(sleep_here, [0.01 * t for t in range(100)])
tic = time.time()
for i, r in enumerate(amr):
print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))
print("running with four calls per task")
# with chunksize, we can have four calls per task
amr = v.map(sleep_here, [0.01 * t for t in range(100)], chunksize=4)
tic = time.time()
for i, r in enumerate(amr):
print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))
print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr = v.map(
sleep_here,
[0.01 * t for t in range(100, 0, -1)],
ordered=False,
chunksize=2,
)
tic = time.time()
for i, r in enumerate(amr):
print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time() - tic))
Engine IDs: [0, 1, 2]
sleeping for `id` seconds on each engine
0: 0.027
1: 1.022
2: 2.024
running with one call per task
task 0 on engine 2: 0.000
task 1 on engine 1: 0.001
task 2 on engine 0: 0.001
task 3 on engine 2: 0.001
task 4 on engine 1: 0.001
…
slept 0.12s on engine 2: 16.868
slept 0.11s on engine 2: 16.868
slept 0.14s on engine 0: 16.873
slept 0.13s on engine 0: 16.873
slept 0.16s on engine 1: 16.893
…
[4]:
from functools import reduce
from math import sqrt
import numpy as np
X = np.linspace(0, 100)
add = lambda a, b: a + b
sq = lambda x: x * x
sqrt(reduce(add, map(sq, X)) / len(X))
[4]:
58.028845747399714
map(sq, X)
berechnet das Quadrat jedes Elements in der Liste.reduce(add, sqX) / len(X)
berechnet den Mittelwert indem die Liste vonAsyncMapResult
summiert und durch die Anzahl dividiert wird.Quadratwurzel der resultierenden Zahl.
Siehe auch: Wenn ihr die Ergebnisse von
AsyncResult
oderAsyncMapResult
noch erweitern wollt, könnt ihr dies mit demmsg_ids
-Attribut. Ein Beispiel hierfür findet ihr unter ipyparallel/docs/source/examples/customresults.py.