Dask

Dask erfüllt zwei verschiedene Aufgaben:

  1. die dynamische Aufgabenplanung wird optimiert, ähnlich wie bei Airflow, Luigi oder Celery

  2. Arrays, Dataframes und Lists werden parallel mit dynamischem Task Scheduling ausgeführt.

Skalierung von Laptops bis hin zu Clustern

Dask kann mit pipenv einfach auf einem Laptop installiert werden und erweitert die Größe der Datensätze von passt in den Arbeitsspeicher zu passt auf die Festplatte. Dask kann jedoch auch auf einen Cluster mit Hunderten von Rechnern skaliert werden. Dask ist robust, flexibel, Data Local und hat eine geringe Latenzzeit. Weitere Informationen findet ihr in der Dokumentation zum Distributed Scheduler. Dieser einfache Übergang zwischen einer einzelnen Maschine und einem Cluster ermöglicht einen einfachen Start und ein Wachstum nach Bedarf.

Dask installieren

Ihr könnt alles installieren, was für die meisten gängigen Anwendungen von Dask erforderlich ist (Arrays, Dataframes, …). Dies installiert sowohl Dask als auch Abhängigkeiten wie NumPy, Pandas, usw., die für verschiedene Arbeiten benötigt werden:

$ pipenv install "dask[complete]"

Es können aber auch nur einzelne Subsets installiert werden:

$ pipenv install "dask[array]"
$ pipenv install "dask[dataframe]"
$ pipenv install "dask[diagnostics]"
$ pipenv install "dask[distributed]"

Testen der Installation

[1]:
!pytest   ../../../spackenvs/python-374/.spack-env/view/lib/python3.7/site-packages/dask/tests  ../../../spackenvs/python-374/.spack-env/view/lib/python3.7/site-packages/dask/array/tests
============================= test session starts ==============================
platform darwin -- Python 3.7.4, pytest-6.2.5, py-1.10.0, pluggy-1.0.0
Users/veit
plugins: anyio-3.3.1
collected 4218 items / 16 skipped / 4202 selected                              =============================== warnings summary ===============================
…
-- Docs: https://docs.pytest.org/en/stable/warnings.html
==== 4013 passed, 207 skipped, 14 xfailed, 22 warnings in 357.82s (0:05:57) ====

Vertraute Bedienung

Dask DataFrame

… imitiert Pandas

[2]:
import pandas as pd
df = pd.read_csv('2021-09-01.csv')
df.groupby(df.user_id).value.mean()
[3]:
import dask.dataframe as dd
dd = pd.read_csv('2021-09-01.csv')
dd.groupby(dd.user_id).value.mean()

Dask Array

… imitiert NumPy

[4]:
import numpy as np
f = h5py.File('mydata.h5')
x = np.array(f['.'])
[5]:
import dask.array as da
f = h5py.File('mydata.h5')
x = da.array(f['.'])

Dask Bag

… imitiert iterators, Toolz und PySpark.

[6]:
import dask.bag as db
import json
b = db.read_text('2021-09-01.csv').map(json.loads)
b.pluck('user_id').frequencies().topk(10, lambda pair: pair[1]).compute()

Siehe auch:

Dask Delayed

… imitiert loops und umschließt benutzerdefinierten Code.

[7]:
from dask import delayed
L = []
for fn in '2021-*-*.csv':             # Schleife von Berechnungen
    data = delayed(load)(fn)          # Verzögern der auszuführenden Funktion
    L.append(delayed(process)(data))  # Verbindungen zwischen Variablen herstellen

result = delayed(summarize)(L)
result.compute()

Das concurrent.futures-Interface ermöglicht die Übermittlung von selbstdefinierten Aufgaben.

Anmerkung:

Für das folgende Beispiel muss Dask mit der distributed-Option installiert werden, z.B.

$ pipenv install "dask[distributed]"
[8]:
from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()