Dask教程:使用dask.delayed并行化代码

目录

本文翻译自 dask-tutorial 项目

在本节中,我们使用 Dask 和 dask.delayed 并行化简单的 for 循环样例代码。 通常,这是将函数转换为与 Dask 一起使用所需的唯一函数。

这是使用 dask 并行化现有代码库或构建复杂系统的一种简单方法。 这也将有助于我们对后面的部分进行理解。

相关文档

正如我们将在分布式调度器笔记本中看到的,Dask 有多种并行执行代码的方法。 我们将通过创建 dask.distributed.Client 来使用分布式调度器。 现在,这将为我们提供一些不错的诊断。 稍后我们将深入讨论调度器。

from dask.distributed import Client

client = Client(n_workers=4)

基础

首先让我们创建一些玩具函数,incadd,它们会休眠一段时间来模拟工作。 然后我们将正常运行这些函数。

在下一节中,我们将并行化此代码。

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

我们使用 %%time magic 指令来计时这段普通代码的执行时间,这是 Jupyter Notebook 的一个特殊功能。

%%time
# 这需要三秒钟才能运行,因为我们依次调用每个函数,一个接一个

x = inc(1)
y = inc(2)
z = add(x, y)
Wall time: 3.02 s

使用 dask.delayed 装饰器并行化

两个 inc 调用可以并行调用,因为它们完全相互独立。

我们将使用 dask.delayed 函数转换 incadd 函数。 当我们通过传递参数调用延迟版本时,与以前完全一样,原始函数实际上还没有被调用 —— 这就是单元执行很快完成的原因。 相反,会生成一个延迟对象,它会跟踪要调用的函数和要传递给它的参数。

from dask import delayed
%%time
# 这会立即运行,它所做的只是构建一个图

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
Wall time: 1e+03 µs

上述代码立即运行,因为还没有真正发生任何事情。

要获得结果,请调用 compute。 请注意,这比原始代码运行得更快。

%%time
# 实际上使用本地线程池运行我们的计算

z.compute()
Wall time: 2.05 s
5

刚才发生了什么?

z 对象是一个惰性 Delayed 对象。 这个对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用,以及它们的输入和相互之间的关系。 我们可以使用上面的 .compute() 评估结果,或者我们可以使用 .visualize() 可视化此值的任务图。

z
Delayed('add-25aea027-2aa1-4253-9eb7-962a7d804914')

查看 z 的任务图

z.visualize()

请注意,这包括之前的函数名称,以及 inc 函数输出到 add 输入的逻辑流。

一些需要考虑的问题

为什么我们从 3s 变成了 2s? 为什么我们不能并行化到 1s?

如果 incadd 函数不包括 sleep(1) 会发生什么? Dask 还能加速这段代码吗?

不会加速

def inc_v2(x):
    return x + 1

def add_v2(x, y):
    return x + y
%%time

x = inc_v2(1)
y = inc_v2(2)
z = add_v2(x, y)
z
Wall time: 0 ns
5
x = delayed(inc_v2)(1)
y = delayed(inc_v2)(2)
z = delayed(add_v2)(x, y)
%%time

z.compute()
Wall time: 24 ms
5

如果我们有多个输出或者还想访问 xy 怎么办?

练习:并行化 for 循环

for 循环是我们想要并行化的最常见的事情之一。 在 incsum 上使用 dask.delayed 并行化以下计算。

串行代码

data = [1, 2, 3, 4, 5, 6, 7, 8]
%%time
# 串行代码

results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)
Wall time: 8.05 s
total
44

并行代码

%%time

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)
print("Before computing:", total) # 查看 total 的类型
result = total.compute()
print("After computing:", result) # 计算之后
Before computing: Delayed('sum-492662c6-3934-408a-beea-763b4f421a40')
After computing: 88
Wall time: 1.04 s

与直接使用 sum 函数而不是延迟包装的版本相比,图形可视化与给定的解决方案相比如何? 你能解释一下后面的版本吗? 您可能会发现以下表达式的结果很有启发性

delayed(inc)(1) + delayed(inc)(2)
z = delayed(inc)(1) + delayed(inc)(2) + delayed(inc)(3)
z.visualize()

z = delayed(sum)(delayed(inc)(1), delayed(inc)(2), delayed(inc)(3))
z.visualize()

练习:并行化带有流程控制的 for 循环代码

通常我们只想延迟一些函数,立即运行其中的几个。 当这些函数速度很快时,这尤其有用,并帮助我们确定应该调用哪些其他较慢的函数。 这个决定,延迟还是不延迟,通常是我们在使用 dask.delayed 时需要深思熟虑的地方。

在下面的示例中,我们遍历输入列表。 如果输入是偶数,那么我们想调用 inc。 如果输入是奇数,那么我们要调用 double。 必须立即(而不是懒惰地)做出调用 incdoubleis_even 决定,以便我们的图形构建 Python 代码继续进行。

def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

串行代码

%%time
results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
print(total)
90
Wall time: 10.1 s

并行版本

%%time

results = []
for x in data:
    if is_even(x):
        y = delayed(double)(x)
    else:
        y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print(total)
Delayed('sum-f5af7db2-ff32-4186-af6c-2106e51a7341')
Wall time: 999 µs
%time total.compute()
Wall time: 2.04 s
90
total.visualize()

一些需要考虑的问题

我们不能使用延迟的其他控制流示例是什么?

如果我们在上面的例子中延迟了 is_even(x) 的计算会发生什么?

你对延迟 sum() 有什么看法? 这个函数既是计算又运行快速。

创建数据

运行此代码以准备一些数据。

这将下载并提取 1990 年至 2000 年间从纽约出发的航班的一些历史航班数据。 数据最初来自此处

%run prep.py -d flights

查看数据

import pathlib
sorted(pathlib.Path("data", "nycflights").iterdir())
[WindowsPath('data/nycflights/1990.csv'),
 WindowsPath('data/nycflights/1991.csv'),
 WindowsPath('data/nycflights/1992.csv'),
 WindowsPath('data/nycflights/1993.csv'),
 WindowsPath('data/nycflights/1994.csv'),
 WindowsPath('data/nycflights/1995.csv'),
 WindowsPath('data/nycflights/1996.csv'),
 WindowsPath('data/nycflights/1997.csv'),
 WindowsPath('data/nycflights/1998.csv'),
 WindowsPath('data/nycflights/1999.csv')]

使用 pandas.read_csv 读取一个文件,并计算平均起飞延误

import pandas as pd
df = pd.read_csv(pathlib.Path("data", "nycflights", "1990.csv"))
df.head()

数据模式

df.dtypes
Year                   int64
Month                  int64
DayofMonth             int64
DayOfWeek              int64
DepTime              float64
CRSDepTime             int64
ArrTime              float64
CRSArrTime             int64
UniqueCarrier         object
FlightNum              int64
TailNum              float64
ActualElapsedTime    float64
CRSElapsedTime         int64
AirTime              float64
ArrDelay             float64
DepDelay             float64
Origin                object
Dest                  object
Distance             float64
TaxiIn               float64
TaxiOut              float64
Cancelled              int64
Diverted               int64
dtype: object

数据中有哪些始发机场

df.Origin.unique()
array(['EWR', 'LGA', 'JFK'], dtype=object)

每个机场平均起飞延误

df.groupby("Origin").DepDelay.mean()
Origin
EWR     9.168411
JFK    11.857274
LGA     8.560045
Name: DepDelay, dtype: float64

串行代码:每个机场平均起飞延误

上述单元格计算每个机场一年的平均起飞延误。 在这里,我们使用顺序 for 循环将其扩展到所有年份。

import pathlib
filenames = sorted(pathlib.Path("data", "nycflights").glob("*.csv"))
filenames
[WindowsPath('data/nycflights/1990.csv'),
 WindowsPath('data/nycflights/1991.csv'),
 WindowsPath('data/nycflights/1992.csv'),
 WindowsPath('data/nycflights/1993.csv'),
 WindowsPath('data/nycflights/1994.csv'),
 WindowsPath('data/nycflights/1995.csv'),
 WindowsPath('data/nycflights/1996.csv'),
 WindowsPath('data/nycflights/1997.csv'),
 WindowsPath('data/nycflights/1998.csv'),
 WindowsPath('data/nycflights/1999.csv')]
%%time

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = pd.read_csv(fn)
    
    # 按起飞机场分组
    by_origin = df.groupby("Origin")
    
    # 按起飞机场计算所有起飞延误和
    total = by_origin.DepDelay.sum()
    
    # 按机场汇总航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间结果
    sums.append(total)
    counts.append(count)
    
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
Wall time: 9.92 s
mean
Origin
EWR    10.295469
JFK    10.351299
LGA     7.431142
Name: DepDelay, dtype: float64

并行化

使用 dask.delayed 并行化上面的代码。 需要知道一些额外的事情。

  1. 延迟对象上的方法和属性访问会自动工作,因此如果您有一个延迟对象,您可以对其执行正常的算术、切片和方法调用,它将产生正确的延迟调用。
x = delayed(np.arange)(10)
y = (x + 1)[::2].sum()  # 所有计算都被延迟
  1. 当您只有一个输出时,调用 .compute() 方法效果很好。 当您有多个输出时,您可能需要使用 dask.compute 函数:
>>> from dask import compute
>>> x = delayed(np.arange)(10)
>>> y = x ** 2
>>> min_, max_ = compute(y.min(), y.max())
>>> min_, max_
(0, 81)

这样 Dask 就可以共享中间值 (比如 y = x**2)

因此,您的目标是使用 dask.delayed 并行化上面的代码 (已在下面复制)。 您可能还想对一些计算进行可视化,看看您是否正确地进行了计算。

from dask import compute
%%time

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = delayed(pd.read_csv)(fn)
    
    # 按起飞机场分组
    by_origin = df.groupby("Origin")
    
    # 按起飞机场计算所有起飞延误和
    total = by_origin.DepDelay.sum()
    
    # 按机场汇总航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间结果
    sums.append(total)
    counts.append(count)
    
# 组合中间结果得到全部 mean-delay-per-origin
sums, counts = compute(sums, counts)
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights
Wall time: 2.55 s
mean
Origin
EWR    10.295469
JFK    10.351299
LGA     7.431142
Name: DepDelay, dtype: float64

一些需要考虑的问题

你得到了多少加速?这是您期望的加速程度吗?

尝试在何处调用 compute。当你在 sumcounts 上使用时会发生什么?如果你等待并在 mean 上调用会发生什么?

mean 上使用 compute

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = delayed(pd.read_csv)(fn)
    
    # 按起飞机场分组
    by_origin = df.groupby("Origin")
    
    # 按起飞机场计算所有起飞延误和
    total = by_origin.DepDelay.sum()
    
    # 按机场汇总航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间结果
    sums.append(total)
    counts.append(count)
    
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
mean = delayed(lambda a, b: a/b)(total_delays, n_flights)
mean.visualize()

%%time
mean = mean.compute()
Wall time: 1.98 s

尝试延迟 sum 调用。如果 sum 延迟,图形会是什么样子? 如果不是,图表会是什么样子?

sum 上使用 compute

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = delayed(pd.read_csv)(fn)
    
    # 按起飞机场分组
    by_origin = df.groupby("Origin")
    
    # 按起飞机场计算所有起飞延误和
    total = by_origin.DepDelay.sum()
    
    # 按机场汇总航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间结果
    sums.append(total)
    counts.append(count)
    
# 组合中间结果得到全部 mean-delay-per-origin
total_delays = delayed(sum)(sums)
n_flights = delayed(sum)(counts)
from dask import visualize
visualize(total_delays, n_flights)

%%time
total_delays, n_flights = compute(total_delays, n_flights)
mean = total_delays / n_flights
Wall time: 2.12 s

原始版本

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = delayed(pd.read_csv)(fn)
    
    # 按起飞机场分组
    by_origin = df.groupby("Origin")
    
    # 按起飞机场计算所有起飞延误和
    total = by_origin.DepDelay.sum()
    
    # 按机场汇总航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间结果
    sums.append(total)
    counts.append(count)
visualize(sums, counts)

你能想出你想要以一种方式减少另一种方式的任何原因吗?

学习更多

访问 Delayed documentation。 特别是,(delayed screencast 将强化您在此处学到的概念,delayed best practices 文档收集了有关如何使用 dask.delayed 的建议。

关闭客户端

在继续下一个练习之前,请确保关闭您的客户端或停止此内核。

client.close()

参考

dask-tutorial

Dask 教程

相关文章

计算等压面要素场检验指标 - 并发

使用 Dask 并行抽取站点数据