ecFlow学习笔记03 - 构建数值预报业务系统

目录

三年多前开始研究 SMS 的升级版 ecFlow,翻译了 ecFlow 的官方教程。我还尝试使用 ecFlow 的 python api 接口创建 SMS 的 def 文件,应用在业务系统中。但因为与其他业务系统的构建方式不一致,给维护带来不小的麻烦。

去年单位的HPC由IBM AIX切换为曙光Linux,作业调度软件也由 SMS 升级成 ecFlow,我也正式开始使用 ecFlow 构建数值预报业务系统。

SMS与ecFlow的区别

ecFlow 是 SMS 的升级版,继承 SMS 的基本概念,业务系统可以延续 SMS 的构建方式。但需要对系统进行一定的修改,详细的迁移方案请参考ecFlow官方网站Migration from SMS to ecFlow

API

对于用户来说,ecFlow 相对于 SMS 最大的变化就是提供了 Python API 接口,用于构建系统流程和实现与服务器的交互。

ecFlow 和 SMS 使用相同语法的 def 文件来定义系统流程,详细语法规则请参考Definition file Grammar。直接编写 def 文件可以定义系统,但 def 语法不提供定义复杂系统需要的循环、判断等编程语言的基本要素。因此两者都内置工具生成 def 文件。

SMS 提供自定义的 CDP 接口创建 def 文件,语法规则类似 shell,因此也保留了 shell 中循环、判断语法规则不灵活的特点。

下面是一个简单的示例,对模式数据一天4个时次,每个时次0-24时效的输出文件进行处理。可以看到,CDP 的语法类似 shell,使用set设置变量,使用$访问变量,循环是 loop

suite downscl 
  clock hybrid    
  repeat day 1    
  
  family t639_downscale 
    limit job_num 14
    inlimit job_num

    loop HH ( 00 06 12 18 ) do
      set fam $HH
      family $HH
        edit HH $fam
        case fam
          in( 00 ) do time 05:15 ;endin
          in( 06 ) do time 12:45 ;endin
          in( 12 ) do time 17:15 ;endin
          in( 18 ) do time 00:15 ;endin
        endcase

        set list "000 003 006 009 012 015 018 021 024"
        family downscale
          loop h ( ${list} ) do
            task downscale_$h
              edit FTIMEH $h
          endloop
        endfamily #downscale
      endfamily #HH
    endloop   #HH
  endfamily  #t639_downscale

  family archive 
    time 00:40
    trigger ./t639_downscale==complete
    edit HH NONE
    edit ID NONE
    task archiving 
  endfamily #archive 

  family housekeep
    time 01:00
    trigger ./archive==complete
    edit HH NONE
    edit ID NONE
    task housekeeping
  endfamily #housekeep
endsuite

尽管SMS可以构造出复杂的数值预报业务系统,但shell本身语法要素的缺失导致系统定义脚本比较冗长。例如上面标亮的设置时效列表语句,在 python 中可以使用一行语句更灵活地设置,而无需将整个时效列表项目一一列出。

forecast_hour_list = ["{hour:03}".format(hour=hour) for hour in range(0, 25, 3)]

ecFlow提供Python定义系统的接口,让构建复杂系统变得更加容易。

当然可以使用任意工具生成 def 文件,例如同事就使用 shell 脚本逐行输出 def 文件,但为了维护方便,建议使用官方提供的API接口开发系统。

命令行通讯

SMS中的命令行通讯分为两种:

  • CDP交互式操作,主要用于操控 SMS
  • smsinit、smsabort 等多个命令,主要用于任务与SMS服务器通讯

ecFlow 将这两种通讯整合为同一个命令行工具ecflow_client,是操控 ecFlow 和任务脚本中与 ecFlow 通讯的方式统一,使用更加方便。

任务脚本

ecFlow 执行的任务脚本与 SMS 脚本没有显著区别,只需替换变量名,并修改服务器交互命令,就可以无缝迁移到 ecFlow。

通用系统定义模型

以一个产品后处理系统为例说明通用的系统定义模型。

GRAPES MESO 3km产品后处理系统

该系统每天循环运行,每天有四个时次,每个时次执行数据检查(initial)、数据转码(togrib2)、图片绘制(tograph)和上传(upload)等四个任务。因为不同时次都执行相似的任务,所以将单个时次抽象为一个类GrapesMeso3kmPost,其中build_one_cycle方法返回一个时次节点(family),该方法调用各个子方法生成四个子任务。整个系统抽象为一个类GrapesMeso3kmPostSystem,其中build_suite方法创建各个时次的GrapesMeso3kmPost对象,生成时次节点,并将所有的节点组成整个系统。模块方法create_suite会设定系统需要的各项参数,返回最终的def对象。最后,将def对象保存成本地文件,使用ecflow_client可以将系统加载到ecFlow服务中。

GrapesMeso3kmPost

GrapesMeso3kmPost类定义一个时次,示意代码如下所示,省略了大部分定义系统结构的详细代码,仅保留框架。

class GrapesMeso3kmPost(object):
    def __init__(self, name, root_node_path):
        pass

    def build_one_cycle(self):
        """
        Family: one start hour
        """
        with Family(self.name) as fm_one_cycle:
            fm_one_cycle.add_variable('HH', self.name)
            fm_one_cycle.add_variable(self.attrs)
            fm_one_cycle.add_variable(common.lljob_s("serial_op"))

            with self.initial() as tk_initial:
                fm_one_cycle.add_task(tk_initial)

            with self.togrib2() as fm_togrib2:
                fm_one_cycle.add_family(fm_togrib2)

            with self.tograph() as fm_tograph:
               fm_one_cycle.add_family(fm_tograph)

            with self.upload() as fm_upload:
                fm_one_cycle.add_family(fm_upload)

            with self.housekeep_cycle() as fm_housekeep_cycle:
                fm_one_cycle.add_family(fm_housekeep_cycle)

            return fm_one_cycle

    def initial(self):
        """
        Task: initial
        """
        with Task("initial") as tk_initial:
            for a_forecast_hour in self.config['forecast_hour_list']:
                tk_initial.add_event(
                    'grapes_grib2_{hour}'.format(hour=a_forecast_hour))
            return tk_initial

    def togrib2(self):
        """
        Family: togrib2
        """
        with Family("togrib2") as fm_togrib2:
            # ...skip...
            return fm_togrib2

    def tograph(self):
        """
        Family: tograph
        """
        with Family("tograph") as fm_tograph:
            # ...skip...
            return fm_tograph

    def upload(self):
        with Family('upload') as fm_upload:
            # ...skip...
            return fm_upload
    
    def housekeep_cycle(self):
        with Family("housekeep_cycle") as fm_housekeep_cycle:
            # ...skip...
            return fm_housekeep_cycle

GrapesMeso3kmPostSystem

GrapesMeso3kmPostSystem类定义整个系统,下面是示例代码,省略了部分代码。

class GrapesMeso3kmPostSystem(object):
    def __init__(self, suite_name, suite_attrs, start_hour_config, attrs, config):
        pass

    def build_suite(self):
        suite = Suite(self.suite_name)

        for HH_name in sorted(self.start_hour_config.keys()):
            gmf_grapes_gfs_post_one_time = GrapesMeso3kmPost(
                HH_name, "/{suite_name}".format(suite_name=self.suite_name))

            with gmf_grapes_gfs_post_one_time.build_one_cycle() as fm_HH:
                suite.add_family(fm_HH)

        with self.housekeep_final() as tk_housekeep_final:
            suite.add_task(tk_housekeep_final)
        
        return suite

    def housekeep_final(self):
        with Task('housekeep_final') as tk_housekeep_final:
            # ...skip...
            return tk_housekeep_final

create_suite

create_suite创建整个系统的 def 对象,示意代码如下所示,省略了大部分参数配置代码。

def create_suite():
    # start hour settings
    start_hour_config = {
        "00": {},
        "06": {},
        "12": {},
        "18": {}
    }

    # varaibles set in suite level
    suite_attrs = {}

    # variables set in cycle level
    common_attrs = {}

    # config for each component.
    config = {
        'initial': {},
        'togrib2': {},
        'tograph': {},
        'upload': {},
        'housekeep_final': {}
    }

    grapes_meso_3km_post_system = GrapesMeso3kmPostSystem(
        sys_name, suite_attrs, start_hour_config, common_attrs, config)
    st_sys = grapes_meso_3km_post_system.build_suite()

    return {
        'suite_name': sys_name,
        'suite': st_sys
    }

main

main函数将 def 对象保存到文件。

def main():
    suite_result = create_suite()

    defs=Defs()
    defs.add_suite(suite_result['suite'])

    defs_filename = suite_result['suite_name'] + ".def"
    defs_file = defs.save_as_defs(defs_filename)

细节

创建数值预报业务系统时,需要考虑诸多的细节,方便开发和维护。这里列出我目前总结的几点。

时间循环

数值预报业务系统是每天循环运行的,考虑到任务的执行时间,系统运行时的日期可能与HPC的当前日期不一致,所以运维人员需要经常查看ecFlow系统的日期。

SMS中使用SMSDATE表示系统日期,在XCDP中打开Generated Variables选项可以查看某个系统的运行日期。

ecFlow的新版UI节点ecflow_ui的节点树中默认不显示Generated Variable,如果用SMS的方式定义系统,则显示效果如下图所示

想查看系统日期变量ECF_DATE,需要在Info面板中查看,十分不方便。因此,我们使用 repeat 变量定义日期。

run_date=int(commands.getoutput("date +%Y%m%d"))
suite.add_repeat(RepeatDate("ECF_DATE", run_date, 20280101))

上述代码获取当前日期,为 suite 创建一个名为ECF_DATE的 RepeatDate 变量,从当前日志到2028年1月1日,显示效果如下图所示

ecFlow UI界面中可以明显看到系统的当前日期。

脚本公用

脚本公用在产品制作系统里更为常见,不同的任务可能仅有少量变量不一致,例如生成不同时效的同种产品,我们一般为这些任务编写一个统一的脚本。

SMS中我们使用文件链接将每个任务链接到同一个脚本中,如下图所示。

当任务数较多时,目录下的会有大量文件,上面的目录中有超过2000个文件。

ecFlow提供一个变量ECF_SCRIPT_CMD,设置获取脚本时需要执行的命令,将该变量与cat命令结合,可以手动设置任务使用的脚本,而无需再链接文件。

for a_forecast_hour in self.config['forecast_hour_list']:
    with fm_plot.add_task(
            'cn_grapes_meso_APWC_sep_{hour}'.
            format(hour=a_forecast_hour)) as tk_plot:
        tk_plot.add_variable("ECF_SCRIPT_CMD", "cat {ecf_files}/cn_grapes_meso_APWC_sep.ecf".format(
            ecf_files=self.config['meso_chartos']['attrs']['ECF_FILES']
        ))

上述代码中,为多个时效创建绘图任务,每个任务都使用ECF_SCRIPT_CMD指向同一个脚本。采用该方法,可以有效减少目录中的文件数据,文件数由2000多减少到100个以内。

测试环境

业务上线以后的更新需要格外的谨慎,调试线上系统引发的错误会给值班人员增加不少的麻烦。因此最好在另外的ecFlow服务上测试系统。

HPC上的ecFlow服务运行在登陆节点,我们利用HPC多登陆节点的架构,构建了一套测试流程。

我们使用相同的端口号在不同的节点上运行ecFlow服务,比如login_b01是业务服务,login_b06是测试服务。因为不同的登陆节点共享存储,所以同一个def可以同时加载到两个服务中,并实际运行。在测试服务上调试与实际业务环境一致,不会影响业务系统(同一个运行目录,还是可能会有影响,需要避开正在运行的任务)。测试通过后更新到业务ecFlow服务中,能避免干扰正常的值班工作。

小结

上面的介绍隐藏了系统的具体实现,后续会详细介绍系统的各个部分,包括通用的作业提交、参数设置等。

利用Python的编程能力,我还尝试使用模块化编写系统结构,但为保持与单位其他系统的风格一致,现在已不再使用。感兴趣的同学可以浏览nwpc-oper/nwpc-product-system-migrate项目,archive目录下的各个系统使用 ecFlow python API 接口创建 SMS def 文件,并采用模块化的风格整合多个子系统。