AsyncResult object#

apply() returns in the noblock mode an AsyncResult object. This allows inquiries with the get() method at a later point in time. In addition, metadata occurring during execution is also collected in this object.

The AsyncResult object provides a number of useful functions for parallelisation that can be accessed through Python’s multiprocessing.pool.AsyncResult:



import os

import ipyparallel as ipp

rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
rc[:]["pid_map"] = pids




Iterable map results#

from __future__ import print_function

import time

import ipyparallel as ipp

# create client & view
rc = ipp.Client()
dv = rc[:]
v = rc.load_balanced_view()

# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter("id", rc.ids, flatten=True)
print("Engine IDs: ", dv["id"])

# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference("id")
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i, r in enumerate(ar):
    print("%i: %.3f" % (i, time.time() - tic))

def sleep_here(t):
    import time

    return id, t

# one call per task
print("running with one call per task")
amr =, [0.01 * t for t in range(100)])
tic = time.time()
for i, r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))

print("running with four calls per task")
# with chunksize, we can have four calls per task
amr =, [0.01 * t for t in range(100)], chunksize=4)
tic = time.time()
for i, r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r[0], time.time() - tic))

print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr =
    [0.01 * t for t in range(100, 0, -1)],
tic = time.time()
for i, r in enumerate(amr):
    print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time() - tic))
Engine IDs:  [0, 1, 2, 3]
sleeping for `id` seconds on each engine
0: 0.005
1: 1.012
2: 2.011
3: 3.013
running with one call per task
task 0 on engine 0: 0.007
task 1 on engine 3: 0.016
task 2 on engine 2: 0.029
task 3 on engine 1: 0.042
task 4 on engine 0: 0.053
task 5 on engine 3: 0.073
task 6 on engine 2: 0.092
task 7 on engine 1: 0.118
task 8 on engine 0: 0.139
from functools import reduce
from math import sqrt
import numpy as np

X = np.linspace(0,100)
add = lambda a,b: a+b
sq = lambda x: x*x
sqrt(reduce(add, map(sq, X)) / len(X))
  1. map(sq, X) computes the square of each item in the list.

  2. reduce(add, sqX) / len(X) calculates the mean by adding the list of AsyncMapResult and dividing by the number.

  3. Square root of the resulting number.

See also:

If you want to expand the results of AsyncResult or AsyncMapResult you can do so with the msg_ids attribute. You can find an example for this at ipyparallel/docs/source/examples/