Dask
Dask is a Python library for parallel and distributed computing. Dask offers an array of tools such as Dask DataFrames, Dask Bags, Dask Delayed Functions and has machine learning integrations. We will foucus with Dask DataFrames.
Dask dataframes process data in parallel across multiple cores or machines.
Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index.
Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These pandas objects may live on disk or on other machines.
Internally, a Dask DataFrame is split into many partitions, where each partition is one Pandas DataFrame. When our index is sorted and we know the values of the divisions of our partitions, then we can be clever and efficient with expensive algorithms (e.g. groupby’s, joins, etc…).
Use Cases:
Dask DataFrame is used in situations where pandas is commonly needed, usually when pandas fails due to data size or speed of computation. Common use cases are:
Manipulating large datasets, even when those datasets don’t fit in memory
Accelerating long computations by using many cores
Distributed computing on large datasets with standard pandas operations like
groupby
,join
, and time series computations
Dask DataFrame may not be the best choice in the following situations:
If your dataset fits comfortably into RAM on your laptop, then you may be better off just using pandas. There may be simpler ways to improve performance than through parallelism
If your dataset doesn’t fit neatly into the pandas tabular model, then you might find more use in dask.bag or dask.array
If you need functions that are not implemented in Dask DataFrame, then you might want to look at dask.delayed which offers more flexibility
If you need a proper database with all of the features that databases offer you might prefer something like Postgres or SQLite
#Import libraries and datasets
import pandas as pd
import numpy as np
import scipy as sp
import seaborn as sns
import dask.datasets
import dask.dataframe as dd
= sns.load_dataset('diamonds') # a pandas dataframe
df 'diamonds.csv') # can be saved to file df.to_csv(
Transitioning to Dask DataFrames
Dask mimics most but not all of the well known pandas API syntax.
# IO ----------------------------------------------------------------
# loading into a dask dataframe
= dd.read_csv('diamonds.csv') #reading multiple files with patterns dd.read_csv('d*.csv')
ddf = dd.from_pandas(df,npartitions = 2)
ddf
# by default it has lazy execution where computation are triggered by compute() (or head)
# convert dd to pd.DataFrame
ddf.compute() 2)
ddf.head(
# Inspecting ----------------------------------------------------------------
# number of partitions
ddf.npartitions # Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index
ddf.divisions 1] # access a particular partition
ddf.partitions[1].index # which have similar pd.DataFrame attributes
ddf.partitions[# general stats
ddf.describe().compute() # access attributes
ddf.dtypes
# Columns
"carat","price"]] # subset columns
ddf[['new_column'] = ddf['carat'] * ddf['depth'] # creating new column
ddf[
# Filtering rows ----------------------------------------------------------------
'price > 50') # same as pd.DataFrame
ddf.query(
15:20] # subseing rows
ddf.loc[
# Group by Aggregations ----------------------------------------------------------------
# By default, groupby methods return an object with only 1 partition.
# This is to optimize performance, and assumes the groupby reduction returns an object that is small enough to fit into memory.
# If your returned object is larger than this, you can increase the number of output partitions using the split_out argument.
'cut').price.mean() #npartitions=1
ddf.groupby(#ddf.groupby('cut').mean(split_out=2) #npartitions=2
# dask aggregate has more features than pandas agg equivalent, supports reductions on the same group.
= (ddf.groupby('cut')
ddf_aggs "price":"mean","carat":"sum"})).compute()
.aggregate({ ddf_aggs
price | carat | |
---|---|---|
cut | ||
Ideal | 3457.541970 | 15146.84 |
Premium | 4584.257704 | 12300.95 |
Good | 3928.864452 | 4166.10 |
Very Good | 3981.759891 | 9742.70 |
Fair | 4358.757764 | 1684.28 |
Contolling Laziness and Optimising with Dask
#controlling when execution occurs is key to performance
= (ddf.query('price > 50').
lazy_manipulations 'clarity').
groupby(
price.mean())# trigger computation to pd.DataFrame
lazy_manipulations.compute()
# Can persist data into RAM if possible making future operations on it faster
= ddf_aggs.repartition(npartitions = 1).persist()
ddf_aggs
carat_original | cut | color | clarity | depth | table | price_original | x | y | z | price_aggregated | carat_aggregated | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.23 | Ideal | E | SI2 | 61.5 | 55.0 | 326 | 3.95 | 3.98 | 2.43 | 3457.54197 | 15146.84 |
11 | 0.23 | Ideal | J | VS1 | 62.8 | 56.0 | 340 | 3.93 | 3.90 | 2.46 | 3457.54197 | 15146.84 |
Note that not all apis from pandas are available in Dask. For example, ddf.filter(['carat','price'])
is not available. For more details and a list of available options, see here.
Challenge
What is the price per carat over the entire dataset?
Create a column called price_to_carat that calculates this for each row
Create a column called expensive that flags whether price is greater than price_to_carat
How many expensive diamonds are there
Solution
- Average price to carat $4928
- 15003 expensive diamonds compared to whole dataset
= (ddf.price.sum() / ddf.carat.sum()).compute()
price_per_carat
= ddf.assign(price_to_carat = ddf.price / ddf.carat)
ddf
def greater_than_avg(price):
if price > price_per_carat:
return True
else:
return False
= ddf.assign(expensive = ddf.price.apply(greater_than_avg))
ddf 'expensive',ascending= False).compute()
ddf.sort_values(= ddf.expensive.sum().compute() number_expensive
Dask Best Practice Guide
- Use
set_index()
sparingly to speed up data naturally sorted on a single index- Use
ddf.set_index('column')
- Use
- Persist intelligently
- If you have the available RAM for your dataset then you can persist data in memory. On distributed systems, it is a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory.
df = df.persist()
- Repartition to reduce overhead
- As you reduce or increase the size of your pandas DataFrames by filtering or joining, it may be wise to reconsider how many partitions you need. Adjust partitions accordingly using repartition.
df = df.repartition(npartitions=df.npartitions // 100)
- Consider storing large data in Apache Parquet Format (binary column based format)
Additional Info on Dask DataFrames
To learn more on how to use dask dataframes, feel free to go through Reading Messy Data Binder at your own pace.This example demostrates both dask and using delayed functions. Otherwise here is more info including best practices and the API, and other tutorials on dask is also available.
Dask Arrays
import dask.array as da
= da.random.random((10000, 10000), chunks=(1000, 1000))
x x
|
# numpy syntax as usual
= x + x.T
y = y[::2, 5000:].mean(axis=1) # axis 0 is index, axis 1 is columns
z
z# Trigger compute and investigate Client
|
Diagnostics - Profile resource efficiency in real time
The Dask Dashboard enables resource monitoring across RAM, CPU, workers, threads and tasks (functions), in real time as your code is running. It opens a user friendly dashboard allowing you to inspect what processes are running and what work (or bottlenecks) exist across your cores.
See here for documentation and videos.
A few key definitions:
Bytes Stored and Bytes per Worker: Cluster memory and Memory per worker.
Task Processing/CPU Utilization/Occupancy: Tasks being processed by each worker/ CPU Utilization per worker/ Expected runtime for all tasks currently on a worker.
Progress: Progress of a set of tasks.
There are three different colors of workers in a task graph:
Blue: Processing tasks.
Green: Saturated: It has enough work to stay busy.
Red: Idle: Does not have enough work to stay busy.
Task Stream: Individual task across threads.
- White colour represents deadtime.
# To load diagnostic in web browser on local. Wont work on CoLab.
from dask.distributed import Client
= Client()
client #client.shutdown after use client
Client
Client-282909c2-4e0d-11ef-8d95-d687d7367288
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
2c397460
Dashboard: http://127.0.0.1:8787/status | Workers: 5 |
Total threads: 10 | Total memory: 32.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-07b1460e-b35e-4c3c-afc5-5a9b007d4875
Comm: tcp://127.0.0.1:50573 | Workers: 5 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 10 |
Started: Just now | Total memory: 32.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:50592 | Total threads: 2 |
Dashboard: http://127.0.0.1:50597/status | Memory: 6.40 GiB |
Nanny: tcp://127.0.0.1:50576 | |
Local directory: /var/folders/j2/6znnc26n7r9gk1qjgh7wy6bh0000gn/T/dask-scratch-space/worker-jt_xmqss |
Worker: 1
Comm: tcp://127.0.0.1:50593 | Total threads: 2 |
Dashboard: http://127.0.0.1:50598/status | Memory: 6.40 GiB |
Nanny: tcp://127.0.0.1:50577 | |
Local directory: /var/folders/j2/6znnc26n7r9gk1qjgh7wy6bh0000gn/T/dask-scratch-space/worker-1c5y5lww |
Worker: 2
Comm: tcp://127.0.0.1:50594 | Total threads: 2 |
Dashboard: http://127.0.0.1:50599/status | Memory: 6.40 GiB |
Nanny: tcp://127.0.0.1:50578 | |
Local directory: /var/folders/j2/6znnc26n7r9gk1qjgh7wy6bh0000gn/T/dask-scratch-space/worker-3b1scof8 |
Worker: 3
Comm: tcp://127.0.0.1:50595 | Total threads: 2 |
Dashboard: http://127.0.0.1:50604/status | Memory: 6.40 GiB |
Nanny: tcp://127.0.0.1:50579 | |
Local directory: /var/folders/j2/6znnc26n7r9gk1qjgh7wy6bh0000gn/T/dask-scratch-space/worker-rkqgplj_ |
Worker: 4
Comm: tcp://127.0.0.1:50596 | Total threads: 2 |
Dashboard: http://127.0.0.1:50603/status | Memory: 6.40 GiB |
Nanny: tcp://127.0.0.1:50580 | |
Local directory: /var/folders/j2/6znnc26n7r9gk1qjgh7wy6bh0000gn/T/dask-scratch-space/worker-n26yax3w |
# Example of efficient resource utilisation
import dask.array as da
= da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
x = da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
y = (da.arcsin(x) + da.arcsin(y)).sum(axis = (1,2))
z z.compute()
array([114088.23038208, 114412.76898299, 114062.70087907, ...,
114507.50869038, 114166.89260871, 113983.68356763])
# Inefficient resource utilisation - dask introduces too much overhead for simple sizes np handles well
= da.random.random(size = (10_000_000),chunks = (1000,))
x sum().compute() x.
/Users/kris/miniconda3/envs/parallel/lib/python3.9/site-packages/distributed/client.py:3357: UserWarning: Sending large graph of size 25.26 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
5000296.402465192
client.shutdown
<bound method Client.shutdown of <Client: 'tcp://127.0.0.1:50573' processes=5 threads=10, memory=32.00 GiB>>
Key points
The similarity-by-design of the Dask API with pandas makes the transition easy compared to alternatives - although not all functions are replicated.
Scaling up to distributed systems, or down to simply running on your laptop, makes code easily transferable between different resources.
Dask enables parallelism without low level alterations in code.