NWPC业务系统笔记:构建图片产品制作系统

目录

数值预报模式的后处理系统一般生成数据文件和图片文件两种类型的产品。

本文主要总结最近一段时间移植和构建图片产品制作系统的经验。

介绍

各个业务系统制作图片产品的流程基本相同,包括下面几个步骤:

  1. 等待需要的数据文件生成
  2. 绘制图片
  3. 分发图片

不同图片产品的区别在于需要的数据和制作的数量不同。

比如GRAPES MESO模式500hPa高度场图片从000到084的每个预报时效都对应一张图片产品; 但24小时2米最高温只制作024、048和072三张图片产品。

设计产品制作系统时需要考虑如何组织各种不同的图片产品,方便系统搭建、更新和维护。

下面首先介绍目前正在使用的两种组织绘图任务的方式。

两种系统结构

目前有两种组织图片制作任务的方式。

以数据时效分类

每个图片产品需要的数据总能找到最晚的一个时效,可以将每个绘图任务分配到最晚数据时效对应的容器节点中。 这样每生成一个时效的数据,绘图系统中对应时效的容器节点下所有的绘图任务都可以同时运行。

比如500hPa高度场图片003时效的图片只需要003时效数据,而24小时2米最高温024时效需要前24小时所有时效的数据。 所以上面两个任务可以分别放到003和024两个容器节点中。

GRAPES GFS后处理系统部分图片产品使用数据时效分类方式,003时效比000时效多制作一些产品。

以产品类型分类

如果产品类型较多,并且有不同的数据时效需求和制作数量,可以将每个产品按照类型放到相应的容器节点中。 每个容器节点对应一类产品,可以单独设置数据依赖关系和制作数量。

GRAPES MESO 3km 后处理系统使用产品类型分类方式,每个容器节点代表一种产品分类,分类中有多种图片产品。

以上两种方式对于运行维护来说没有显著的区别,因为绘图程序很少大规模出错。 所以选择哪种方式编写系统取决于ecFlow系统定义代码的编写难度,还需要考虑后期增加新产品是否方便。

下面着重介绍以产品分类方式的系统如何实现,以时效分类的实现方式类似。

实现

对于每类图片,都需要实现上一节中提到的三个部分:数据依赖、绘图任务和上传任务。

下面介绍首先介绍数据检查如何实现,然后介绍如何定义绘图任务,支持批量创建绘图和上传任务。

数据检查

绘制图片前需要检查依赖的数据是否生成。

直接使用触发器 vs 单独任务检测数据

目前有两种数据检查方式。

因为后处理系统一般都包括数据产品和图片产品制作,所以最常见的方式是直接使用数据转码任务作为触发器。 例如下图所示的3小时降水图片依赖3个时次的数据产品。

GRAPES MESO 后处理系统使用数据转码任务的事件作为触发器

该方式应用于除GRAPES MESO 3KM后处理系统外的全部业务系统。

GRAPES MESO 3KM后处理系统因为历史原因使用另一种方式,单独使用一个任务检测数据产品任务生成的数据。

GRAPES MESO 3KM后处理使用绘图部分使用单独的数据检测任务

使用单独的任务检测数据会增加系统复杂度,延迟绘图任务触发时间。 但却将绘图任务与数据任务解耦,使得两部分之间没有直接的依赖关系,数据任务的变更不会影响绘图任务。 即便需要修改,也只需要修改数据检测任务,不用修改绘图任务。

不过,直接使用触发器依旧是构建业务系统的主流方式,简洁明了,建议大家采用。

绘图任务

逐条构建产品任务会导致代码繁琐,不方便维护。 下面是GRAPES MESO后处理系统中部分代码,大家可以感受一下逐条编写的魅力。

with fm_rundir.add_family('cn_plot_10m_wind') as fm_plot:
    for a_forecast_hour in self.config['3km_forecast_hour_list']:
        with fm_plot.add_task(
                'cn_plot_10m_wind_sep_{hour}'.
                format(hour=a_forecast_hour)) as tk_plot:
            tk_plot.add_variable(
                "ECF_SCRIPT_CMD", 
                "cat {ecf_files}/cn_plot_10m_wind_sep.ecf".format(
                    ecf_files=self.config['meso_chartos']['attrs']['ECF_FILES']
            ))
            tk_plot.add_trigger(
                '/meso_post/{name}/meso_togrib2/grib2WORK/{hour}'
                '/after_data2grib2_{hour}:grib2OK_{hour}==set'.format(
                  name=self.name, hour=a_forecast_hour
            ))
            tk_plot.add_variable("FTHOUR", a_forecast_hour)
            self.add_plot_event_and_meter(tk_plot)
            self.add_late_attribute(tk_plot)
                
with fm_rundir.add_family('cn_grapes_meso_APWC') as fm_plot:
    for a_forecast_hour in self.config['forecast_hour_list']:
        with fm_plot.add_task(
                'cn_grapes_meso_APWC_sep_{hour}'.
                format(hour=a_forecast_hour)) as tk_plot:
            tk_plot.add_variable(
                "ECF_SCRIPT_CMD",
                "cat {ecf_files}/cn_grapes_meso_APWC_sep.ecf".format(
                  ecf_files=self.config['meso_chartos']['attrs']['ECF_FILES']
            ))
            tk_plot.add_trigger(
                '/meso_post/{name}/meso_togrib2/grib2WORK/{hour}'
                '/after_data2grib2_{hour}:grib2OK_{hour}==set'.format(
                  name=self.name, hour=a_forecast_hour))
            tk_plot.add_variable("FTHOUR", a_forecast_hour)
            self.add_plot_event_and_meter(tk_plot)
            self.add_late_attribute(tk_plot)
                
with fm_rundir.add_family('cn_plot_radar_reflectivity') as fm_plot:
    for a_forecast_hour in self.config['3km_forecast_hour_list']:
        with fm_plot.add_task(
                'cn_plot_radar_reflectivity_sep_{hour}'.
                format(hour=a_forecast_hour)) as tk_plot:
            tk_plot.add_variable(
                "ECF_SCRIPT_CMD", 
                "cat {ecf_files}/cn_plot_radar_reflectivity_sep.ecf".format(
                  ecf_files=self.config['meso_chartos']['attrs']['ECF_FILES']
            ))
            tk_plot.add_trigger(
                '/meso_post/{name}/meso_togrib2/grib2WORK/{hour}'
                '/after_data2grib2_{hour}:grib2OK_{hour}==set'.format(
                  name=self.name, hour=a_forecast_hour))
            tk_plot.add_variable("FTHOUR", a_forecast_hour)
            self.add_plot_event_and_meter(tk_plot)
            self.add_late_attribute(tk_plot)

上述代码中,每个产品都包括循环创建绘图任务,指定依赖关系,设定变量。

所以可以将需要设置的属性放到一个结构体中,批量构建绘图任务。

任务设置

目前GRAPES MESO 3KM后处理系统使用如下格式的图片产品设置。

{
    'category': 'meso_prep_24h_10mw',
    'tasks': [
        {
            'name': 'cn_prep_24h_10mw', 
            'triggers': [
                0,
                -24,
            ]
        },
    ],
    'forecast_hour_list': self.config['forecast_hour_list_24h_meso']
}

上面代码表示某个图片类型meso_prep_24h_10mw,使用forecast_hour_list_24h_meso的预报时效列表构建产品。 其中有一个图片产品cn_prep_24h_10mw,依赖当前时刻和前24小时的数据。

分类与名称

每个分类对应一个时效列表,比如下面代码中5个时效列表可分别用于1,3,6,12,24小时累计降水。

self.config['forecast_hour_list_1h'] = self.generate_forecast_hour_list(1, self.config["forecast_length"])
self.config['forecast_hour_list_3h'] = self.generate_forecast_hour_list(3, self.config["forecast_length"], 3)
self.config['forecast_hour_list_6h'] = self.generate_forecast_hour_list(6, self.config["forecast_length"], 3)
self.config['forecast_hour_list_12h'] = self.generate_forecast_hour_list(12, self.config["forecast_length"], 3)
self.config['forecast_hour_list_24h'] = self.generate_forecast_hour_list(24, self.config["forecast_length"], 3)

每个分类中可以包含多个绘图任务,绘图任务的名称(name)与绘图任务脚本对应,为了简化系统,最好保持全局唯一。

数据依赖

绘图任务对象中triggers字段表示依赖的数据时效,0表示当前时效,-24表示前24小时时效。

对于只依赖当前时效数据的绘图任务,只需要设置0即可。

GRAPES MESO 3km后处理系统1小时雷达组合反射率图片需要当前时刻的数据

对于依赖多个时效数据的绘图任务,则需要设置多个值。

GRAPES MESO 3km后处理系统24小时累积降水图片需要当前时刻和前24小时时刻的数据

创建绘图任务

根据上面的绘图任务列表,构建绘图任务。

构建绘图类型的容器节点fm_category

for a_plot_category in self.plot_category_list:
    category_name = a_plot_category['category']
    forecast_hour_list = a_plot_category['forecast_hour_list']
    tasks = a_plot_category['tasks']

    with fm_tograph.add_family(category_name) as fm_category:
        # ... skip ...

为绘图类型中的每个图片产品构建容器节点fm_plot_task

for a_task in tasks:
    task_name = a_task['name']
    triggers = a_task['triggers']
    with fm_category.add_family(task_name) as fm_plot_task:
        fm_plot_task.add_variable(
            "PLOT_TYPE", 
            "{category_name}/{task_name}".format(
                category_name=category_name, 
                task_name=task_name
            )
          )
        # ... skip ...

根据时效列表,为每个图片产品创建绘图任务节点tk_plot_task。 图片产品的绘图脚本使用统一的命名规范{task_name}_plot.ecf

for a_forecast_hour in forecast_hour_list:
    with fm_plot_task.add_task('plot_hour_{hour}'.format(
            task_name=task_name, hour=a_forecast_hour)) as tk_plot_task:
        tk_plot_task.add_variable(
          "ECF_SCRIPT_CMD", 
          "cat {ecf_files}/{task_name}_plot.ecf".format(
            ecf_files=self.config['tograph']['attrs']['ECF_FILES'],
            task_name=task_name
        ))
        tk_plot_task.add_variable("FTHOUR", a_forecast_hour)
        self.add_late_attribute(tk_plot_task)

最后为图片任务设置触发器。

for index, a_trigger_hour in enumerate(triggers):
    hour = int(a_forecast_hour) + a_trigger_hour
    trigger = "{root_node_path}/{name}/tograph/initial:orig_grib2_{hour:03}==set".format(
        root_node_path=self.root_node_path, name=self.name, hour=hour)
    if index == 0:
        tk_plot_task.add_part_trigger(trigger)
    else:
        tk_plot_task.add_part_trigger(trigger, True)

创建上传任务

上传任务与绘图任务类似,为每个图片产品的每个时效创建一个上传任务。 上传任务脚本使用统一的命名规范{task_name}_upload.ecf

with fm_tograph.add_family(category_name) as fm_category:
    for a_task in tasks:
        task_name = a_task['name']
        with fm_category.add_family(task_name) as fm_upload_task:
            fm_upload_task.add_variable("PLOT_TYPE", "{category_name}/{task_name}".format(
                category_name=category_name, task_name=task_name
            ))
            for a_forecast_hour in forecast_hour_list:
                with fm_upload_task.add_task('plot_hour_{hour}'.format(
                    task_name=task_name, hour=a_forecast_hour
                )) as tk_upload_task:
                    tk_upload_task.add_trigger(
                        "{root_node_path}/{name}/tograph/{category_name}"
                        "/{task_name}/plot_hour_{hour}==complete".format(
                            root_node_path=self.root_node_path, 
                            name=self.name, 
                            category_name=category_name, 
                            task_name=task_name, 
                            hour=a_forecast_hour
                    ))
                    tk_upload_task.add_variable("FTHOUR", a_forecast_hour)
                    tk_upload_task.add_variable(
                        "ECF_SCRIPT_CMD", 
                        "cat {ecf_files}/{task_name}_upload.ecf".format(
                          ecf_files=self.config['tograph']['attrs']['ECF_FILES'],
                          task_name=task_name
                    ))