使用Airflow运行诊断绘图任务

目录

本文介绍如何使用Airflow构建工作流实现诊断绘图任务。

背景

我在2017年开始的青年基金项目中实现了一套分布式调度系统(perillaroc/ploto),用于执行批量绘图任务。 今年将该系统应用到地球模式系统诊断平台建设项目中,集成了dongli/esmdiag项目的部分诊断绘图功能。

项目将单个的诊断绘图任务看成由多个步骤(step)组成的任务,步骤分为多个类型,包括数据获取、数据处理、图形绘制等。 绘图服务(plot service)会依次调用相应的模块顺序执行每个步骤。 目前绘图服务只支持串行执行步骤,但诊断绘图任务中的部分步骤都可以并行执行,例如获取并处理多个要素场可以同时进行。 多个这样的步骤考虑相互依赖关系可以组成一个DAG图,也就是工作流的形式。

本文介绍如何使用Airflow构建DAG图,执行批量绘图任务。

诊断绘图任务

下面以esmdiag中climo/precip诊断为例说明诊断绘图任务的常见流程。

esmdiag诊断包使用cdo和NCL脚本将输入数据转换成绘图脚本需要的中间数据,然后使用NCL脚本绘图。 地球模式系统诊断平台中集成的esmdiag包增加对数据接口的支持,数据文件不再保存到本地而是通过数据平台的接口获取。 诊断任务的串行流程如下图所示。

诊断绘图任务climo/precip串行流程

图中蓝色步骤是调用数据平台接口检索数据,绿色步骤是使用cdo对数据进行操作,黄色步骤是调用NCL脚本绘图。

目前使用的数据接口获取的数据是逐月保存的,而绘图需要的中间文件是每个变量保存到一个文件中,所以需要使用cdo将变量提取到单个文件中。上面的提取变量步骤可以并行执行,两个数据获取任务也可以并行。

将上述流程组织成DAG图的形式如下图所示:

诊断绘图任务climo/precip工作流流程

图中有两组数据获取、处理步骤,第一组中有四个并行的数据处理步骤。两组操作都完成后,执行绘图任务。

使用Airflow构建DAG

先介绍下如何使用Airflow构建上面的DAG图。下面的代码只展示基本代码框架,省略了大量细节,无法真正运行。

构建DAG对象

Airflow中每个工作流项目都是一个DAG对象,工作流中的每个任务都是一个Operator对象。

每个DAG有一个唯一的ID,通过dag_id指定。

DAG可以设置一些默认参数,传递给DAG中的所有task。诊断平台中的诊断任务都是相互独立的,前一次任务出错不应该影响下一次任务,所以将depends_on_past参数设置为false。开始时间参数start_date对我们的诊断任务没有意义,但每个DAG必须设置,所以可以设置任意的时间。

DAG通过schedule_interval设置调度方式,诊断平台中的每个任务都是由客户端请求驱动的,不需要定时启动,所以直接设置为None。

import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "ploto",
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
}

dag_id = "ploto_esmdiag_climo_precip"

dag = DAG(
    dag_id=dag_id,
    default_args=args,
    schedule_interval=None,
)

创建工作目录

为每个诊断任务创建一个唯一的运行目录。 为了保持一致性,后续所有的任务都使用PythonOperator

create_dir_step = PythonOperator(
    "create_work_dir",
    dag=dag
)

获取并处理数据

本组操作包括两步:运行一个任务检索包含四个变量的数据,同时运行四个任务从文件中提取变量。

每个任务都有唯一的task_id,并通过set_upstream等函数设置任务之间的依赖关系。 任务需要绑定到DAG中,同一个任务只能绑定到单一的DAG中,后面会介绍如何动态构建任务实现代码复用。

另外,我们将所有的数据操作步骤放到一个list中,方便设置图形绘制步骤的依赖关系。

# 获取数据
fields = [
    'PRECT',
    'PRECC',
    'PRECL',
    'PS'
]
fetch_data_step = PythonOperator(
    "fetch_data",
    dag=dag
    # ...
)
fetch_data_step.set_upstream(create_dir_step)

# 处理数据
select_steps = []
for field in fields:
    select_data_step = PyhtonOperator(
        f"select_data_{field}",
        dag=dag
        # ...
    )
    select_data_step.set_upstream(fetch_data_step)
    select_steps.append(select_data_step)

获取并处理gw数据

同样分为两步:检索数据,提取变量。

# 获取gw
gw_fetch_data_step = PythonOperator(
    "fetch_gw",
    dag=dag
    # ...
)
gw_fetch_data_step.set_upstream(create_dir_step)

# 处理gw
gw_select_step = PythonOperator(
    "select_gw",
    dag=dag
    # ...
)
select_steps.append(gw_select_step)

gw_select_step.set_upstream(gw_fetch_data_step)

绘制图像

将图形绘制步骤的前置任务设置为所有的数据处理步骤,完成整个DAG的构建。

# 绘制图像
plot_step = PythonOperator(
    "plot",
    dag=dag
    # ...
)
plot_step.set_upstream(select_steps)

DAG图示

最终形成的DAG如下图所示:

Airflow版climo/precip

上述步骤仅搭建一个DAG的框架,但无法真正应用。

参数传递

在消息队列版ploto的诊断绘图任务中,我们为了保持绘图任务的灵活性,任务消息中的每个步骤都单独配置参数,这些参数与用户所要执行的诊断任务相关。但在Airflow中,任务使用的参数是在任务创建时指定的,无法动态控制每个任务的参数。所以需要考虑如何实现参数传递。

如果将应用范围限制在操作流程相对固定的集成诊断工具包中,就会有折中的解决方案。消息队列版ploto提供一个web服务器,用户给定和诊断任务相关的参数(诸如模式信息、时间、诊断项等),web服务会根据该请求生成包含全部步骤参数的任务消息。

Airflow中也可以采用类似的方案。将一个通用的参数传递给DAG,每个任务根据该参数生成自己需要的实际步骤参数。

任务参数

下面给出一个通用参数的示例,表示该任务执行climo/precip诊断项目。

{
    "figure_config": {
        "metric": "climo",
        "name": "precip"
    },
}

命令行传递参数

trigger_dag命令中使用--conf参数传递json字符串形式的参数。

airflow trigger_dag \
    --conf "$(jq '.' ./common_example.json)" \
    ploto_esmdiag_climo_precip

参数保存在common_example.json文件中,上述命令中使用jq将json文件转为字符串。

Airflow会将--conf设置的参数传递给所有的任务。

在任务中使用参数

Airflow会将通用参数传递给PythonOperator的执行函数run_step

为了接受该参数,执行函数需要支持接收任意的关键词参数,例如使用**context,然后就可以通过context["dag_run"].conf访问该参数。

def run_step(**context):
    drag_run_config = context["dag_run"].conf

    figure_config = drag_run_config["figure_config"]

    task_config = {
        "step_type": "plotter",
        "type": "esmdiag_plotter",
        "metric": figure_config["metric"],
        "figure": figure_config["name"],
    }

    esmdiag_plotter.run_plotter(
        task=task_config
    )

构建可重复的DAG图

上面章节以climo/precip为例介绍如何使用Airflow构建诊断绘图工作流。 esmdiag诊断包还有其他的诊断项目,不同诊断项目间的很多步骤是相同的,例如都有数据检索、数据处理等步骤。 但Airflow中每个Operator只能添加到一个DAG中,如果我们为每个诊断项目都编写所有的Operator,就会有大量重复代码。

ploto项目通过动态生成Operator,提高代码的复用率。

下面是生成图形绘制步骤Operator的通用函数。需要提供两个参数:

  • task_id:任务id,每个DAG唯一。

  • params_generator:函数对象,使用命令行提供的通用参数,生成esmdiag_plotter.run_plotter函数需要的参数。不同的诊断项目可能使用不同的函数。

from typing import Callable
from airflow.operators.python_operator import PythonOperator
from ploto.plotter import esmdiag_plotter


def generate_operator(
        task_id: str,
        params_generator: Callable[[dict], dict]
):
    def run_step(**context):
        drag_run_config = context["dag_run"].conf

        esmdiag_plotter.run_plotter(
            **params_generator(drag_run_config)
        )

    airflow_task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        python_callable=run_step,
    )

    return airflow_task

对于所有的climo诊断,绘图步骤使用的参数生成函数相同。

def generate_plotter_params(drag_run_config: dict) -> dict:
    figure_config = drag_run_config["figure_config"]

    task_config = {
        "step_type": "plotter",
        "type": "esmdiag_plotter",
        "metric": figure_config["metric"],
        "figure": figure_config["name"],
    }

    return {
        "task": task_config,
    }

在DAG中使用generate_operator函数创建绘图任务的operator,并将该operator的DAG设置为当前的dag。

plot_step = generate_operator(
    "plot",
    generate_plotter_params,
)
plot_step.dag = dag

个例测试

climo/precip

我还同时构建了climo/precip的串行版本Airflow工作流,对比串行和并行的运行效率。

并行版本某次运行的甘特图:

climo/precip 并行版本运行甘特图

串行版本某次运行的甘特图:

climo/precip 串行版本运行甘特图

可以看到并行版本相对于串行版本有一定的优势。

climo/zonal_mean

为了更好地体现工作流的效果,使用另一个流程更复杂的climo/zonal_mean诊断项目测试。 同样测试了并行和串行版本。

DAG图:

climo/zonal_mean DAG图

并行版本某次运行的甘特图:

climo/zonal_mean 并行版本运行甘特图

串行版本某次运行的甘特图:

climo/zonal_mean 串行版本运行甘特图

可以看到随着任务数量的增多,串行版本因为任务调度时间增大,会显著拉长整个任务的运行时间。

未来计划

目前ploto仅实现了两个诊断项目的Airflow工作流,未来会将所有已集成的esmdiag诊断方法都在Airflow中实现。

目前仅在单机实现了Airflow调度,未来会尝试使用Celery在集群中实现任务调度,并将Celery worker部署到Docker中。

参考

perillaroc/ploto

dongli/esmdiag