Dask

Dask is a Python library for parallel and distributed computing. Dask offers an array of tools such as Dask DataFrames, Dask Bags, Dask Delayed Functions and has machine learning integrations. We will foucus with Dask DataFrames.

Dask dataframes process data in parallel across multiple cores or machines.

Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index.

Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These pandas objects may live on disk or on other machines.

Internally, a Dask DataFrame is split into many partitions, where each partition is one Pandas DataFrame. When our index is sorted and we know the values of the divisions of our partitions, then we can be clever and efficient with expensive algorithms (e.g. groupby’s, joins, etc…).

Use Cases:

Dask DataFrame is used in situations where pandas is commonly needed, usually when pandas fails due to data size or speed of computation. Common use cases are:

Dask DataFrame may not be the best choice in the following situations:

#Import libraries and datasets
import pandas as pd
import numpy as np
import scipy as sp
import seaborn as sns
import dask.datasets
import dask.dataframe as dd

df = sns.load_dataset('diamonds') # a pandas dataframe
df.to_csv('diamonds.csv') # can be saved to file

Transitioning to Dask DataFrames

Dask mimics most but not all of the well known pandas API syntax.

# IO ---------------------------------------------------------------- 
  
# loading into a dask dataframe
ddf = dd.read_csv('diamonds.csv') #reading multiple files with patterns dd.read_csv('d*.csv')
ddf = dd.from_pandas(df,npartitions = 2) 

# by default it has lazy execution where computation are triggered by compute() (or head) 
ddf.compute() # convert dd to pd.DataFrame
ddf.head(2)

# Inspecting ----------------------------------------------------------------
ddf.npartitions # number of partitions
ddf.divisions # Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index
ddf.partitions[1] # access a particular partition
ddf.partitions[1].index # which have similar pd.DataFrame attributes
ddf.describe().compute() # general stats
ddf.dtypes  # access attributes

# Columns 
ddf[["carat","price"]] # subset columns
ddf['new_column'] = ddf['carat'] * ddf['depth'] # creating new column

# Filtering rows ----------------------------------------------------------------

ddf.query('price > 50') # same as pd.DataFrame

ddf.loc[15:20] # subseing rows 

# Group by Aggregations ----------------------------------------------------------------

# By default, groupby methods return an object with only 1 partition. 
# This is to optimize performance, and assumes the groupby reduction returns an object that is small enough to fit into memory. 
# If your returned object is larger than this, you can increase the number of output partitions using the split_out argument.
ddf.groupby('cut').price.mean() #npartitions=1
#ddf.groupby('cut').mean(split_out=2) #npartitions=2

# dask aggregate has more features than pandas agg equivalent, supports reductions on the same group.
ddf_aggs = (ddf.groupby('cut')
    .aggregate({"price":"mean","carat":"sum"})).compute()
ddf_aggs
price carat
cut
Ideal 3457.541970 15146.84
Premium 4584.257704 12300.95
Good 3928.864452 4166.10
Very Good 3981.759891 9742.70
Fair 4358.757764 1684.28

Contolling Laziness and Optimising with Dask


#controlling when execution occurs is key to performance
lazy_manipulations = (ddf.query('price > 50').
    groupby('clarity').
    price.mean())
lazy_manipulations.compute() # trigger computation to pd.DataFrame


# Can persist data into RAM if possible making future operations on it faster
ddf_aggs = ddf_aggs.repartition(npartitions = 1).persist()
carat_original cut color clarity depth table price_original x y z price_aggregated carat_aggregated
0 0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43 3457.54197 15146.84
11 0.23 Ideal J VS1 62.8 56.0 340 3.93 3.90 2.46 3457.54197 15146.84

Note that not all apis from pandas are available in Dask. For example, ddf.filter(['carat','price']) is not available. For more details and a list of available options, see here.

Challenge

  1. What is the price per carat over the entire dataset?

  2. Create a column called price_to_carat that calculates this for each row

  3. Create a column called expensive that flags whether price is greater than price_to_carat

  4. How many expensive diamonds are there

Solution
  • Average price to carat $4928
  • 15003 expensive diamonds compared to whole dataset
price_per_carat = (ddf.price.sum() / ddf.carat.sum()).compute()

ddf = ddf.assign(price_to_carat = ddf.price / ddf.carat)

def greater_than_avg(price):
    if price > price_per_carat:
        return True
    else:
        return False

ddf = ddf.assign(expensive = ddf.price.apply(greater_than_avg))
ddf.sort_values('expensive',ascending= False).compute()
number_expensive = ddf.expensive.sum().compute()

Dask Best Practice Guide

  1. Use set_index() sparingly to speed up data naturally sorted on a single index
    • Use ddf.set_index('column')
  2. Persist intelligently
  • If you have the available RAM for your dataset then you can persist data in memory. On distributed systems, it is a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory.
    • df = df.persist()
  1. Repartition to reduce overhead
    • As you reduce or increase the size of your pandas DataFrames by filtering or joining, it may be wise to reconsider how many partitions you need. Adjust partitions accordingly using repartition.
    • df = df.repartition(npartitions=df.npartitions // 100)
  2. Consider storing large data in Apache Parquet Format (binary column based format)

Additional Info on Dask DataFrames

To learn more on how to use dask dataframes, feel free to go through Reading Messy Data Binder at your own pace.This example demostrates both dask and using delayed functions. Otherwise here is more info including best practices and the API, and other tutorials on dask is also available.

Dask Arrays

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x
Array Chunk
Bytes 762.94 MiB 7.63 MiB
Shape (10000, 10000) (1000, 1000)
Dask graph 100 chunks in 1 graph layer
Data type float64 numpy.ndarray
10000 10000
# numpy syntax as usual
y = x + x.T
z = y[::2, 5000:].mean(axis=1) # axis 0 is index, axis 1 is columns
z
# Trigger compute and investigate Client
Array Chunk
Bytes 39.06 kiB 3.91 kiB
Shape (5000,) (500,)
Count 7 Graph Layers 10 Chunks
Type float64 numpy.ndarray
5000 1

Diagnostics - Profile resource efficiency in real time

The Dask Dashboard enables resource monitoring across RAM, CPU, workers, threads and tasks (functions), in real time as your code is running. It opens a user friendly dashboard allowing you to inspect what processes are running and what work (or bottlenecks) exist across your cores.

See here for documentation and videos.

A few key definitions:

  • Bytes Stored and Bytes per Worker: Cluster memory and Memory per worker.

  • Task Processing/CPU Utilization/Occupancy: Tasks being processed by each worker/ CPU Utilization per worker/ Expected runtime for all tasks currently on a worker.

  • Progress: Progress of a set of tasks.

There are three different colors of workers in a task graph:

  • Blue: Processing tasks.

  • Green: Saturated: It has enough work to stay busy.

  • Red: Idle: Does not have enough work to stay busy.

  • Task Stream: Individual task across threads.

    • White colour represents deadtime.
# To load diagnostic in web browser on local. Wont work on CoLab.
from dask.distributed import Client
client = Client()
client #client.shutdown after use

Client

Client-282909c2-4e0d-11ef-8d95-d687d7367288

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

# Example of efficient resource utilisation
import dask.array as da
x = da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
y = da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
z = (da.arcsin(x) + da.arcsin(y)).sum(axis = (1,2))
z.compute()
array([114088.23038208, 114412.76898299, 114062.70087907, ...,
       114507.50869038, 114166.89260871, 113983.68356763])
# Inefficient resource utilisation - dask introduces too much overhead for simple sizes np handles well
x = da.random.random(size = (10_000_000),chunks = (1000,))
x.sum().compute()
/Users/kris/miniconda3/envs/parallel/lib/python3.9/site-packages/distributed/client.py:3357: UserWarning: Sending large graph of size 25.26 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
5000296.402465192
client.shutdown
<bound method Client.shutdown of <Client: 'tcp://127.0.0.1:50573' processes=5 threads=10, memory=32.00 GiB>>

Key points

  • The similarity-by-design of the Dask API with pandas makes the transition easy compared to alternatives - although not all functions are replicated.

  • Scaling up to distributed systems, or down to simply running on your laptop, makes code easily transferable between different resources.

  • Dask enables parallelism without low level alterations in code.