NWPC消息平台:验证 ecFlow 命令消息

目录

本文属于介绍 NWPC 消息平台 系列文章。

NWPC 消息平台引入 ecFlow 命令消息 (Ecflow Client Message) 用于代替无法实时获取更新的 ecFlow 日志。

在提供实时数据之前,首先要确认 NWPC 消息平台记录的 ecFlow 命令事件是否与 ecFlow 日志相吻合。

import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

数据

本文数据来自目前已启用 ecFlow 命令消息的 globalchartos 系统,分别从 ecFlow 日志和 NWPC 消息存储中获取事件记录。

globalchartos 系统为 WMC-BJ 网站生成 GRAPES GFS 模式的图形产品

为了尽可能简化处理过程,仅选用 globalchartos 系统 00 时次。

ecFlow 日志

使用 nwpc-oper/nwpc-workflow-log-model 项目从 ecFlow 日志中提取任务节点执行 ecflow_client 子命令的日志条目。

关于日志解析的详细信息,请参看以下文章:

准备数据

为了加快日志解析,使用 grep 命令提取 00 时次的记录。

grep " /globalchartos/00" \
    /g1/u/nwp_pd/ecfworks/ecflow/login_b01.31071.ecf.log > "nwpc_pd.log"

解析条目

加载日志解析库

from nwpc_workflow_log_model.log_record.ecflow import (
    EcflowLogParser, 
    EventType,
)

只解析子命令条目,类型为 EventType.Child

total_lines = 0
with open("nwpc_pd.log") as f:
    for line in f:
        total_lines += 1
        
progress = tqdm(total=total_lines)

records = []
with open("nwpc_pd.log") as f:
    for line in f:
        progress.update(1)
        parser = EcflowLogParser(
            options={
                EventType.Status: {
                    "debug": False,
                    "enable": False,
                },
                EventType.Client: {
                    "debug": False,
                    "enable": False,
                },
                EventType.Child: {
                    "debug": False,
                    "enable": True,
                },
                EventType.Server: {
                    "debug": False,
                    "enable": False,
                },
                EventType.Unknown: {
                    "debug": False,
                    "enable": False,
                }
            }
        )
        record = parser.parse(line)
        if record is None:
            continue
        records.append(record)
len(records)
663013

数据处理

将日志条目列表转换成 pd.DataFrame 对象,提取以下字段:

  • date:日期
  • time:时间
  • event:事件,即 child 子命令
  • node_path:节点路径
log_times = [record.time for record in records]
log_dates = [record.date for record in records]
log_events = [record.event for record in records]
node_paths = [record.node_path for record in records]
log_df = pd.DataFrame({
    "date": log_dates,
    "time": log_times,
    "event": log_events,
    "node_path": node_paths,
})
log_df.head()

消息平台

NWPC 消息平台将消息数据保存在 ElasticSearch 中。

本节从 ElasticSearch 库中获取单个节点某个日期的消息,用于后续对比。

节点路径和日期

node_path = "/globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066"
query_date = pd.to_datetime("2020-10-10")

检索

创建 ElasticSearch 客户端对象

from elasticsearch import Elasticsearch

elastic_server = "localhost:9200"
client = Elasticsearch(hosts=elastic_server)

从多个索引中检索消息

results = client.search(
    body={
        "query": {
            "bool": {
                "filter": {
                    "term": {
                      "data.ecf_name.keyword": node_path
                    }
                }
            }
        }
    },
    index="ecflow-client-2020-10-08,ecflow-client-2020-10-09,ecflow-client-2020-10-10,ecflow-client-2020-10-11",
    size=30
)

处理

将消息转换成 pd.DataFrame 对象,提取以下字段:

  • data.ecf_date:日期
  • time:时间
  • data.command:child 子命令
data_list = []
for message in results["hits"]["hits"]:
    message_time = pd.to_datetime(message["_source"]["time"])
    data_list.append({
        "date": pd.to_datetime(message["_source"]["data"]["ecf_date"]),
        "time": message_time.round("s").time(),
        "event": message["_source"]["data"]["command"],
    })
message_df = pd.DataFrame(data_list)
message_df.head()

方法

事件记录包含时间和执行的命令,需要比较记录的命令是否相同,记录的时间是否保持一致。

下面针对单个时次的单个节点,对比 ecFlow 日志和 ecFlow 命令消息的事件记录

从 ecFlow 日志中提取

node_path_df = log_df[log_df.node_path==node_path]
day_df = node_path_df[node_path_df.date==query_date]
day_df[["date", "time", "event"]]

从 ecFlow 消息中提取

message_df[message_df.date==query_date]

两者的命令顺序完全一致,时间基本相同,最多相差 1s,在可以接受范围内。

实验

下面进行批量对比:

  • 比较每个任务节点的命令执行顺序是否相同
  • 计算每个任务节点所有命令执行时间的均方根误差

任务节点

首先使用 nwpc-oper/nwpc-workflow-model 提取所有的任务节点。

from nwpc_workflow_model.ecflow import Bunch
from nwpc_workflow_model.visitor import (
    NodeVisitor,
    pre_order_travel,
)

从节点路径列表中创建节点树 Bunch 对象

node_paths = log_df.node_path.unique()
node_paths[:10]
array(['/globalchartos/00/initial',
       '/globalchartos/00/deterministic/base/000/wmc_div_250_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_wind_windbarb_850_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_vor_925_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_wind_windbarb_700_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_vor_700_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_temp_2m_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_vor_250_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_humidity_850_sep_000',
       '/globalchartos/00/deterministic/base/000/wmc_hgt_250_sep_000'],
      dtype=object)
bunch = Bunch()
for n in node_paths:
    bunch.add_node(n)

创建 TaskVisitor 类,并提取任务节点。

class TaskVisitor(NodeVisitor):
    def __init__(self):
        self.tasks = []
        
    def visit(self, node):
        if node.is_leaf():
            self.tasks.append(node)
visitor = TaskVisitor()
pre_order_travel(bunch, visitor)
len(visitor.tasks)
486

筛选函数

创建提取某节点某天事件记录的函数

日志

get_from_log() 函数从 log_df 中提取某节点某天的条目

def get_from_log(
    log_df,
    node_path,
    date: pd.Timestamp,
):
    node_path_df = log_df[log_df.node_path==node_path]
    day_df = node_path_df[node_path_df.date==date]
    return day_df[["date", "time", "event"]]

消息

使用 nwpc-oper/nwpc-message-tool 项目集成的检索接口获取消息记录。

import nwpc_message_tool.source.ecflow_client
from nwpc_message_tool.storage import EsMessageStorage

创建客户端 EsMessageStorage

client = EsMessageStorage(
    hosts=[elastic_server],
    debug=False,
    show_progress=False
)

get_from_message() 用于从 ElasticSearch 中检索消息并返回 pd.DataFrame 对象

def get_from_message(
    node_path,
    date: pd.Timestamp,
):
    results = client.get_ecflow_client_messages(
        engine=nwpc_message_tool.source.ecflow_client,
        node_name=node_path,
        ecflow_host="XXXXX",
        ecflow_port="XXXXX",
        ecf_date=date,
        size=20,
    )
    data_list = []
    for message in results:
        message_time = message.time
        data_list.append({
            "date": message.ecf_date,
            "time": message_time.round("s").time(),
            "event": message.command,
        })
    message_df = pd.DataFrame(data_list)
    return message_df

测试单个任务

节点路径和日期

node_path = "/globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066"
query_date = pd.to_datetime("2020-10-07")

ecFlow 日志结果

node_log_df = get_from_log(log_df, node_path, query_date)
node_log_df

消息平台结果

node_message_df = get_from_message(node_path, query_date)
node_message_df

对比两个结果的事件序列是否相同

np.array_equal(
    node_log_df.event.values, 
    node_message_df.event.values
)
True

验证事件顺序

验证所有任务节点的事件顺序是否相同

下面的代码会输出顺序不一样的节点路径

for node in tqdm(visitor.tasks):
    node_path = node.get_node_path()
    node_log_df = get_from_log_record(log_df, node_path, query_date)
    node_message_df = get_from_message(node_path, query_date)
    if not np.array_equal(node_log_df.event.values, node_message_df.event.values):
        print(node_path)

没有输出节点路径,说明所有节点的事件顺序均相同

对比事件时间

既然事件顺序相同,就可以为每个节点的所有事件时间计算均方根误差 (RMSE)

rmses = []
for node in tqdm(visitor.tasks):
    node_path = node.get_node_path()
    node_log_df = get_from_log_record(log_df, node_path, query_date)
    node_message_df = get_from_message(node_path, query_date)
    rmse = np.sqrt(
        np.mean(
            np.power(
                (
                    node_message_df.time.apply(lambda x: pd.to_timedelta(x.isoformat())) 
                    - node_log_df.time.apply(lambda x: pd.to_timedelta(x.isoformat())).reset_index(drop=True)
                ) / pd.Timedelta(seconds=1), 
                2
            )
        )
    )
    rmses.append(rmse)

RMSE 的最大值

np.max(rmses)
1.4907119849998598

最大值为 1.5 秒,在可以接受范围内。

结论

以上测试说明,ecFlow 命令消息可以代替 ecFlow 日志中的 child 类型条目。

参考

nwpc-oper/nwpc-message-client 项目

https://github.com/nwpc-oper/nwpc-message-client

nwpc-oper/nwpc-message-tool 项目

https://github.com/nwpc-oper/nwpc-message-tool

NWPC消息平台 系列文章:

NWPC消息平台:产品事件消息

NWPC消息平台:ecFlow 命令消息