AsyncResult-Objekt

apply() gibt im noblock-Modus ein AsyncResult-Objekt zurück. Dieses erlaubt zu einem späteren Zeitpunkt Anfragen mit der get()-Methode zu den Ergebnissen. Überdies werden in diesem Objekt auch bei der Ausführung anfallende Metadaten gesammelt.

Das AsyncResult-Objekt bietet eine Reihe praktischer Funktionen für die Parallelisierung, die über Pythons multiprocessing.pool.AsyncResult. hinausgehen:

get_dict

AsyncResult.get_dict()

[1]:
import os
import ipyparallel as ipp

rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
rc[:]['pid_map'] = pids

Metadaten

Client.metadata

Timing

Iterierbare Map-Ergebnisse

[2]:
from __future__ import print_function

import time

import ipyparallel as ipp

# create client & view
rc = ipp.Client()
dv = rc[:]
v = rc.load_balanced_view()

# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter('id', rc.ids, flatten=True)
print("Engine IDs: ", dv['id'])

# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference('id')
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print("%i: %.3f"%(i, time.time()-tic))

def sleep_here(t):
    import time
    time.sleep(t)
    return id,t

# one call per task
print("running with one call per task")
amr = v.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))

print("running with four calls per task")
# with chunksize, we can have four calls per task
amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))

print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
tic = time.time()
for i,r in enumerate(amr):
    print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic))

Engine IDs:  [0, 1, 2]
sleeping for `id` seconds on each engine
0: 0.027
1: 1.022
2: 2.024
running with one call per task
task 0 on engine 2: 0.000
task 1 on engine 1: 0.001
task 2 on engine 0: 0.001
task 3 on engine 2: 0.001
task 4 on engine 1: 0.001
…
slept 0.12s on engine 2: 16.868
slept 0.11s on engine 2: 16.868
slept 0.14s on engine 0: 16.873
slept 0.13s on engine 0: 16.873
slept 0.16s on engine 1: 16.893
…
[4]:
from functools import reduce
from math import sqrt
import numpy as np

X = np.linspace(0,100)
add = lambda a,b: a+b
sq = lambda x: x*x
sqrt(reduce(add, map(sq, X)) / len(X))
[4]:
58.028845747399714
  1. map(sq, X) berechnet das Quadrat jedes Elements in der Liste.

  2. reduce(add, sqX) / len(X) berechnet den Mittelwert indem die Liste von AsyncMapResult summiert und durch die Anzahl dividiert wird.

  3. Quadratwurzel der resultierenden Zahl.

    Siehe auch: Wenn ihr die Ergebnisse von AsyncResult oder AsyncMapResult noch erweitern wollt, könnt ihr dies mit dem msg_ids-Attribut. Ein Beispiel hierfür findet ihr unter ipyparallel/examples/customresults.py.