使用Airflow运行诊断绘图任务
本文介绍如何使用Airflow构建工作流实现诊断绘图任务。
背景
我在2017年开始的青年基金项目中实现了一套分布式调度系统(perillaroc/ploto),用于执行批量绘图任务。 今年将该系统应用到地球模式系统诊断平台建设项目中,集成了dongli/esmdiag项目的部分诊断绘图功能。
项目将单个的诊断绘图任务看成由多个步骤(step)组成的任务,步骤分为多个类型,包括数据获取、数据处理、图形绘制等。 绘图服务(plot service)会依次调用相应的模块顺序执行每个步骤。 目前绘图服务只支持串行执行步骤,但诊断绘图任务中的部分步骤都可以并行执行,例如获取并处理多个要素场可以同时进行。 多个这样的步骤考虑相互依赖关系可以组成一个DAG图,也就是工作流的形式。
本文介绍如何使用Airflow构建DAG图,执行批量绘图任务。
诊断绘图任务
下面以esmdiag中climo/precip诊断为例说明诊断绘图任务的常见流程。
esmdiag诊断包使用cdo和NCL脚本将输入数据转换成绘图脚本需要的中间数据,然后使用NCL脚本绘图。 地球模式系统诊断平台中集成的esmdiag包增加对数据接口的支持,数据文件不再保存到本地而是通过数据平台的接口获取。 诊断任务的串行流程如下图所示。
图中蓝色步骤是调用数据平台接口检索数据,绿色步骤是使用cdo对数据进行操作,黄色步骤是调用NCL脚本绘图。
目前使用的数据接口获取的数据是逐月保存的,而绘图需要的中间文件是每个变量保存到一个文件中,所以需要使用cdo将变量提取到单个文件中。上面的提取变量步骤可以并行执行,两个数据获取任务也可以并行。
将上述流程组织成DAG图的形式如下图所示:
图中有两组数据获取、处理步骤,第一组中有四个并行的数据处理步骤。两组操作都完成后,执行绘图任务。
使用Airflow构建DAG
先介绍下如何使用Airflow构建上面的DAG图。下面的代码只展示基本代码框架,省略了大量细节,无法真正运行。
构建DAG对象
Airflow中每个工作流项目都是一个DAG对象,工作流中的每个任务都是一个Operator对象。
每个DAG有一个唯一的ID,通过dag_id
指定。
DAG可以设置一些默认参数,传递给DAG中的所有task。诊断平台中的诊断任务都是相互独立的,前一次任务出错不应该影响下一次任务,所以将depends_on_past
参数设置为false
。开始时间参数start_date
对我们的诊断任务没有意义,但每个DAG必须设置,所以可以设置任意的时间。
DAG通过schedule_interval
设置调度方式,诊断平台中的每个任务都是由客户端请求驱动的,不需要定时启动,所以直接设置为None。
import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
"owner": "ploto",
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
}
dag_id = "ploto_esmdiag_climo_precip"
dag = DAG(
dag_id=dag_id,
default_args=args,
schedule_interval=None,
)
创建工作目录
为每个诊断任务创建一个唯一的运行目录。
为了保持一致性,后续所有的任务都使用PythonOperator
。
create_dir_step = PythonOperator(
"create_work_dir",
dag=dag
)
获取并处理数据
本组操作包括两步:运行一个任务检索包含四个变量的数据,同时运行四个任务从文件中提取变量。
每个任务都有唯一的task_id,并通过set_upstream
等函数设置任务之间的依赖关系。
任务需要绑定到DAG中,同一个任务只能绑定到单一的DAG中,后面会介绍如何动态构建任务实现代码复用。
另外,我们将所有的数据操作步骤放到一个list中,方便设置图形绘制步骤的依赖关系。
# 获取数据
fields = [
'PRECT',
'PRECC',
'PRECL',
'PS'
]
fetch_data_step = PythonOperator(
"fetch_data",
dag=dag
# ...
)
fetch_data_step.set_upstream(create_dir_step)
# 处理数据
select_steps = []
for field in fields:
select_data_step = PyhtonOperator(
f"select_data_{field}",
dag=dag
# ...
)
select_data_step.set_upstream(fetch_data_step)
select_steps.append(select_data_step)
获取并处理gw数据
同样分为两步:检索数据,提取变量。
# 获取gw
gw_fetch_data_step = PythonOperator(
"fetch_gw",
dag=dag
# ...
)
gw_fetch_data_step.set_upstream(create_dir_step)
# 处理gw
gw_select_step = PythonOperator(
"select_gw",
dag=dag
# ...
)
select_steps.append(gw_select_step)
gw_select_step.set_upstream(gw_fetch_data_step)
绘制图像
将图形绘制步骤的前置任务设置为所有的数据处理步骤,完成整个DAG的构建。
# 绘制图像
plot_step = PythonOperator(
"plot",
dag=dag
# ...
)
plot_step.set_upstream(select_steps)
DAG图示
最终形成的DAG如下图所示:
上述步骤仅搭建一个DAG的框架,但无法真正应用。
参数传递
在消息队列版ploto的诊断绘图任务中,我们为了保持绘图任务的灵活性,任务消息中的每个步骤都单独配置参数,这些参数与用户所要执行的诊断任务相关。但在Airflow中,任务使用的参数是在任务创建时指定的,无法动态控制每个任务的参数。所以需要考虑如何实现参数传递。
如果将应用范围限制在操作流程相对固定的集成诊断工具包中,就会有折中的解决方案。消息队列版ploto提供一个web服务器,用户给定和诊断任务相关的参数(诸如模式信息、时间、诊断项等),web服务会根据该请求生成包含全部步骤参数的任务消息。
Airflow中也可以采用类似的方案。将一个通用的参数传递给DAG,每个任务根据该参数生成自己需要的实际步骤参数。
任务参数
下面给出一个通用参数的示例,表示该任务执行climo/precip诊断项目。
{
"figure_config": {
"metric": "climo",
"name": "precip"
},
}
命令行传递参数
在trigger_dag
命令中使用--conf
参数传递json字符串形式的参数。
airflow trigger_dag \
--conf "$(jq '.' ./common_example.json)" \
ploto_esmdiag_climo_precip
参数保存在common_example.json文件中,上述命令中使用jq
将json文件转为字符串。
Airflow会将--conf
设置的参数传递给所有的任务。
在任务中使用参数
Airflow会将通用参数传递给PythonOperator
的执行函数run_step
。
为了接受该参数,执行函数需要支持接收任意的关键词参数,例如使用**context
,然后就可以通过context["dag_run"].conf
访问该参数。
def run_step(**context):
drag_run_config = context["dag_run"].conf
figure_config = drag_run_config["figure_config"]
task_config = {
"step_type": "plotter",
"type": "esmdiag_plotter",
"metric": figure_config["metric"],
"figure": figure_config["name"],
}
esmdiag_plotter.run_plotter(
task=task_config
)
构建可重复的DAG图
上面章节以climo/precip为例介绍如何使用Airflow构建诊断绘图工作流。 esmdiag诊断包还有其他的诊断项目,不同诊断项目间的很多步骤是相同的,例如都有数据检索、数据处理等步骤。 但Airflow中每个Operator只能添加到一个DAG中,如果我们为每个诊断项目都编写所有的Operator,就会有大量重复代码。
ploto项目通过动态生成Operator,提高代码的复用率。
下面是生成图形绘制步骤Operator的通用函数。需要提供两个参数:
task_id:任务id,每个DAG唯一。
params_generator:函数对象,使用命令行提供的通用参数,生成
esmdiag_plotter.run_plotter
函数需要的参数。不同的诊断项目可能使用不同的函数。
from typing import Callable
from airflow.operators.python_operator import PythonOperator
from ploto.plotter import esmdiag_plotter
def generate_operator(
task_id: str,
params_generator: Callable[[dict], dict]
):
def run_step(**context):
drag_run_config = context["dag_run"].conf
esmdiag_plotter.run_plotter(
**params_generator(drag_run_config)
)
airflow_task = PythonOperator(
task_id=task_id,
provide_context=True,
python_callable=run_step,
)
return airflow_task
对于所有的climo诊断,绘图步骤使用的参数生成函数相同。
def generate_plotter_params(drag_run_config: dict) -> dict:
figure_config = drag_run_config["figure_config"]
task_config = {
"step_type": "plotter",
"type": "esmdiag_plotter",
"metric": figure_config["metric"],
"figure": figure_config["name"],
}
return {
"task": task_config,
}
在DAG中使用generate_operator函数创建绘图任务的operator,并将该operator的DAG设置为当前的dag。
plot_step = generate_operator(
"plot",
generate_plotter_params,
)
plot_step.dag = dag
个例测试
climo/precip
我还同时构建了climo/precip的串行版本Airflow工作流,对比串行和并行的运行效率。
并行版本某次运行的甘特图:
串行版本某次运行的甘特图:
可以看到并行版本相对于串行版本有一定的优势。
climo/zonal_mean
为了更好地体现工作流的效果,使用另一个流程更复杂的climo/zonal_mean诊断项目测试。 同样测试了并行和串行版本。
DAG图:
并行版本某次运行的甘特图:
串行版本某次运行的甘特图:
可以看到随着任务数量的增多,串行版本因为任务调度时间增大,会显著拉长整个任务的运行时间。
未来计划
目前ploto仅实现了两个诊断项目的Airflow工作流,未来会将所有已集成的esmdiag诊断方法都在Airflow中实现。
目前仅在单机实现了Airflow调度,未来会尝试使用Celery在集群中实现任务调度,并将Celery worker部署到Docker中。