NWPC业务系统笔记:构建数值预报业务系统
三年多前开始研究 SMS 的升级版 ecFlow,翻译了 ecFlow 的官方教程。 我还尝试使用 ecFlow 的 python api 接口创建 SMS 的 def 文件,应用在业务系统中。 但因为与其他业务系统的构建方式不一致,给维护带来不小的麻烦。
去年单位的 HPC 由 IBM AIX 切换为曙光 Linux,作业调度软件也由 SMS 升级成 ecFlow,我也正式开始使用 ecFlow 构建数值预报业务系统。
SMS与ecFlow的区别
ecFlow 是 SMS 的升级版,继承 SMS 的基本概念,业务系统可以延续 SMS 的构建方式。 但需要对系统进行一定的修改,详细的迁移方案请参考ecFlow官方网站 Migration from SMS to ecFlow
API
对于用户来说,ecFlow 相对于 SMS 最大的变化就是提供了 Python API 接口,用于构建系统流程和实现与服务器的交互。
ecFlow 和 SMS 使用相同语法的 def 文件来定义系统流程,详细语法规则请参考 Definition file Grammar。 直接编写 def 文件可以定义系统,但 def 语法不提供定义复杂系统需要的循环、判断等编程语言的基本要素。 因此两者都内置工具生成 def 文件。
SMS 提供自定义的 CDP 接口创建 def 文件,语法规则类似 shell,因此也保留了 shell 中循环、判断语法规则不灵活的特点。
下面是一个简单的示例,对模式数据一天4个时次,每个时次0-24时效的输出文件进行处理。
可以看到,CDP 的语法类似 shell,使用set
设置变量,使用$
访问变量,循环是 loop
。
suite downscl
clock hybrid
repeat day 1
family t639_downscale
limit job_num 14
inlimit job_num
loop HH ( 00 06 12 18 ) do
set fam $HH
family $HH
edit HH $fam
case fam
in( 00 ) do time 05:15 ;endin
in( 06 ) do time 12:45 ;endin
in( 12 ) do time 17:15 ;endin
in( 18 ) do time 00:15 ;endin
endcase
set list "000 003 006 009 012 015 018 021 024"
family downscale
loop h ( ${list} ) do
task downscale_$h
edit FTIMEH $h
endloop
endfamily #downscale
endfamily #HH
endloop #HH
endfamily #t639_downscale
family archive
time 00:40
trigger ./t639_downscale==complete
edit HH NONE
edit ID NONE
task archiving
endfamily #archive
family housekeep
time 01:00
trigger ./archive==complete
edit HH NONE
edit ID NONE
task housekeeping
endfamily #housekeep
endsuite
尽管 SMS 可以构造出复杂的数值预报业务系统,但 shell 本身语法要素的缺失导致系统定义脚本比较冗长。 例如上面标亮的设置时效列表语句,在 python 中可以使用一行语句更灵活地设置,而无需将整个时效列表项目一一列出。
forecast_hour_list = ["{hour:03}".format(hour=hour) for hour in range(0, 25, 3)]
ecFlow 提供 Python 定义系统的接口,让构建复杂系统变得更加容易。
当然可以使用任意工具生成 def 文件,例如同事就使用 shell 脚本逐行输出 def 文件,但为了维护方便,建议使用官方提供的API接口开发系统。
命令行通讯
SMS 中的命令行通讯分为两种:
- CDP 交互式操作,主要用于操控 SMS
smsinit
、smsabort
等多个命令,主要用于任务与 SMS 服务器通讯
ecFlow 将这两种通讯整合为同一个命令行工具 ecflow_client
,是操控 ecFlow 和任务脚本中与 ecFlow 通讯的方式统一,使用更加方便。
任务脚本
ecFlow 执行的任务脚本与 SMS 脚本没有显著区别,只需替换变量名,并修改服务器交互命令,就可以无缝迁移到 ecFlow。
通用系统定义模型
以一个产品后处理系统为例说明通用的系统定义模型。
该系统每天循环运行,每天有四个时次,每个时次执行数据检查(initial)、数据转码(togrib2)、图片绘制(tograph)和上传(upload)等四个任务。
因为不同时次都执行相似的任务,所以将单个时次抽象为一个类 GrapesMeso3kmPost
,其中 build_one_cycle
方法返回一个时次节点(family),该方法调用各个子方法生成四个子任务。
整个系统抽象为一个类 GrapesMeso3kmPostSystem
,其中 build_suite
方法创建各个时次的 GrapesMeso3kmPost
对象,生成时次节点,并将所有的节点组成整个系统。
模块方法 create_suite
会设定系统需要的各项参数,返回最终的def对象。
最后,将def对象保存成本地文件,使用 ecflow_client
可以将系统加载到 ecFlow 服务中。
GrapesMeso3kmPost
GrapesMeso3kmPost
类定义一个时次,示意代码如下所示,省略了大部分定义系统结构的详细代码,仅保留框架。
class GrapesMeso3kmPost(object):
def __init__(self, name, root_node_path):
pass
def build_one_cycle(self):
"""
Family: one start hour
"""
with Family(self.name) as fm_one_cycle:
fm_one_cycle.add_variable('HH', self.name)
fm_one_cycle.add_variable(self.attrs)
fm_one_cycle.add_variable(common.lljob_s("serial_op"))
with self.initial() as tk_initial:
fm_one_cycle.add_task(tk_initial)
with self.togrib2() as fm_togrib2:
fm_one_cycle.add_family(fm_togrib2)
with self.tograph() as fm_tograph:
fm_one_cycle.add_family(fm_tograph)
with self.upload() as fm_upload:
fm_one_cycle.add_family(fm_upload)
with self.housekeep_cycle() as fm_housekeep_cycle:
fm_one_cycle.add_family(fm_housekeep_cycle)
return fm_one_cycle
def initial(self):
"""
Task: initial
"""
with Task("initial") as tk_initial:
for a_forecast_hour in self.config['forecast_hour_list']:
tk_initial.add_event(
'grapes_grib2_{hour}'.format(hour=a_forecast_hour))
return tk_initial
def togrib2(self):
"""
Family: togrib2
"""
with Family("togrib2") as fm_togrib2:
# ...skip...
return fm_togrib2
def tograph(self):
"""
Family: tograph
"""
with Family("tograph") as fm_tograph:
# ...skip...
return fm_tograph
def upload(self):
with Family('upload') as fm_upload:
# ...skip...
return fm_upload
def housekeep_cycle(self):
with Family("housekeep_cycle") as fm_housekeep_cycle:
# ...skip...
return fm_housekeep_cycle
GrapesMeso3kmPostSystem
GrapesMeso3kmPostSystem
类定义整个系统,下面是示例代码,省略了部分代码。
class GrapesMeso3kmPostSystem(object):
def __init__(self, suite_name, suite_attrs, start_hour_config, attrs, config):
pass
def build_suite(self):
suite = Suite(self.suite_name)
for HH_name in sorted(self.start_hour_config.keys()):
gmf_grapes_gfs_post_one_time = GrapesMeso3kmPost(
HH_name, "/{suite_name}".format(suite_name=self.suite_name))
with gmf_grapes_gfs_post_one_time.build_one_cycle() as fm_HH:
suite.add_family(fm_HH)
with self.housekeep_final() as tk_housekeep_final:
suite.add_task(tk_housekeep_final)
return suite
def housekeep_final(self):
with Task('housekeep_final') as tk_housekeep_final:
# ...skip...
return tk_housekeep_final
create_suite
create_suite
创建整个系统的 def 对象,示意代码如下所示,省略了大部分参数配置代码。
def create_suite():
# start hour settings
start_hour_config = {
"00": {},
"06": {},
"12": {},
"18": {}
}
# varaibles set in suite level
suite_attrs = {}
# variables set in cycle level
common_attrs = {}
# config for each component.
config = {
'initial': {},
'togrib2': {},
'tograph': {},
'upload': {},
'housekeep_final': {}
}
grapes_meso_3km_post_system = GrapesMeso3kmPostSystem(
sys_name, suite_attrs, start_hour_config, common_attrs, config)
st_sys = grapes_meso_3km_post_system.build_suite()
return {
'suite_name': sys_name,
'suite': st_sys
}
main
main
函数将 def 对象保存到文件。
def main():
suite_result = create_suite()
defs=Defs()
defs.add_suite(suite_result['suite'])
defs_filename = suite_result['suite_name'] + ".def"
defs_file = defs.save_as_defs(defs_filename)
细节
创建数值预报业务系统时,需要考虑诸多的细节,方便开发和维护。 这里列出我目前总结的几点。
时间循环
数值预报业务系统是每天循环运行的,考虑到任务的执行时间,系统运行时的日期可能与 HPC 的当前日期不一致,所以运维人员需要经常查看 ecFlow 系统的日期。
SMS 中使用 SMSDATE
表示系统日期,在 XCDP 中打开 Generated Variables 选项可以查看某个系统的运行日期。
ecFlow 的新版 UI 界面 ecflow_ui 的节点树中默认不显示 Generated Variable,如果用 SMS 的方式定义系统,则显示效果如下图所示
想查看系统日期变量 ECF_DATE
,需要在 Info 面板中查看,十分不方便。
因此,我们使用 repeat
变量定义日期。
run_date=int(commands.getoutput("date +%Y%m%d"))
suite.add_repeat(RepeatDate("ECF_DATE", run_date, 20280101))
上述代码获取当前日期,为 suite 创建一个名为 ECF_DATE
的 RepeatDate 变量,从当前日期到 2028 年 1 月 1 日,显示效果如下图所示
ecFlow UI 界面中可以明显看到系统的当前日期。
脚本公用
脚本公用在产品制作系统里更为常见,不同的任务可能仅有少量变量不一致,例如生成不同时效的同种产品,我们一般为这些任务编写一个统一的脚本。
SMS 中我们使用文件链接将每个任务链接到同一个脚本中,如下图所示。
当任务数较多时,目录下的会有大量文件,上面的目录中有超过 2000 个文件。
ecFlow 提供一个变量 ECF_SCRIPT_CMD
,设置获取脚本时需要执行的命令,将该变量与 cat
命令结合,可以手动设置任务使用的脚本,而无需再链接文件。
for a_forecast_hour in self.config['forecast_hour_list']:
with fm_plot.add_task(
'cn_grapes_meso_APWC_sep_{hour}'.
format(hour=a_forecast_hour)) as tk_plot:
tk_plot.add_variable("ECF_SCRIPT_CMD", "cat {ecf_files}/cn_grapes_meso_APWC_sep.ecf".format(
ecf_files=self.config['meso_chartos']['attrs']['ECF_FILES']
))
上述代码中,为多个时效创建绘图任务,每个任务都使用 ECF_SCRIPT_CMD
指向同一个脚本。
采用该方法,可以有效减少目录中的文件数据,文件数由 2000 多减少到 100 个以内。
测试环境
业务上线以后的更新需要格外的谨慎,调试线上系统引发的错误会给值班人员增加不少的麻烦。因此最好在另外的 ecFlow 服务上测试系统。
HPC 上的 ecFlow 服务运行在登陆节点,我们利用 HPC 多登陆节点的架构,构建了一套测试流程。
我们使用相同的端口号在不同的节点上运行 ecFlow 服务,比如 login_b01 是业务服务,login_b06 是测试服务。 因为不同的登陆节点共享存储,所以同一个 def 可以同时加载到两个服务中,并实际运行。 在测试服务上调试与实际业务环境一致,不会影响业务系统(同一个运行目录,还是可能会有影响,需要避开正在运行的任务)。 测试通过后更新到业务 ecFlow 服务中,能避免干扰正常的值班工作。
小结
上面的介绍隐藏了系统的具体实现,后续会详细介绍系统的各个部分,包括通用的作业提交、参数设置等。
利用 Python 的编程能力,我还尝试使用模块化编写系统结构,但为保持与单位其他系统的风格一致,现在已不再使用。
感兴趣的同学可以浏览 nwpc-oper/nwpc-product-system-migrate 项目,archive
目录下的各个系统使用 ecFlow python API 接口创建 SMS def 文件,并采用模块化的风格整合多个子系统。