NWPC消息平台:验证 ecFlow 命令消息
本文属于介绍 NWPC 消息平台 系列文章。
NWPC 消息平台引入 ecFlow 命令消息 (Ecflow Client Message) 用于代替无法实时获取更新的 ecFlow 日志。
在提供实时数据之前,首先要确认 NWPC 消息平台记录的 ecFlow 命令事件是否与 ecFlow 日志相吻合。
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
数据
本文数据来自目前已启用 ecFlow 命令消息的 globalchartos 系统,分别从 ecFlow 日志和 NWPC 消息存储中获取事件记录。
globalchartos 系统为 WMC-BJ 网站生成 GRAPES GFS 模式的图形产品
为了尽可能简化处理过程,仅选用 globalchartos 系统 00 时次。
ecFlow 日志
使用 nwpc-oper/nwpc-workflow-log-model 项目从 ecFlow 日志中提取任务节点执行 ecflow_client
子命令的日志条目。
关于日志解析的详细信息,请参看以下文章:
准备数据
为了加快日志解析,使用 grep
命令提取 00 时次的记录。
grep " /globalchartos/00" \
/g1/u/nwp_pd/ecfworks/ecflow/login_b01.31071.ecf.log > "nwpc_pd.log"
解析条目
加载日志解析库
from nwpc_workflow_log_model.log_record.ecflow import (
EcflowLogParser,
EventType,
)
只解析子命令条目,类型为 EventType.Child
total_lines = 0
with open("nwpc_pd.log") as f:
for line in f:
total_lines += 1
progress = tqdm(total=total_lines)
records = []
with open("nwpc_pd.log") as f:
for line in f:
progress.update(1)
parser = EcflowLogParser(
options={
EventType.Status: {
"debug": False,
"enable": False,
},
EventType.Client: {
"debug": False,
"enable": False,
},
EventType.Child: {
"debug": False,
"enable": True,
},
EventType.Server: {
"debug": False,
"enable": False,
},
EventType.Unknown: {
"debug": False,
"enable": False,
}
}
)
record = parser.parse(line)
if record is None:
continue
records.append(record)
len(records)
663013
数据处理
将日志条目列表转换成 pd.DataFrame
对象,提取以下字段:
date
:日期time
:时间event
:事件,即 child 子命令node_path
:节点路径
log_times = [record.time for record in records]
log_dates = [record.date for record in records]
log_events = [record.event for record in records]
node_paths = [record.node_path for record in records]
log_df = pd.DataFrame({
"date": log_dates,
"time": log_times,
"event": log_events,
"node_path": node_paths,
})
log_df.head()
消息平台
NWPC 消息平台将消息数据保存在 ElasticSearch 中。
本节从 ElasticSearch 库中获取单个节点某个日期的消息,用于后续对比。
节点路径和日期
node_path = "/globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066"
query_date = pd.to_datetime("2020-10-10")
检索
创建 ElasticSearch 客户端对象
from elasticsearch import Elasticsearch
elastic_server = "localhost:9200"
client = Elasticsearch(hosts=elastic_server)
从多个索引中检索消息
results = client.search(
body={
"query": {
"bool": {
"filter": {
"term": {
"data.ecf_name.keyword": node_path
}
}
}
}
},
index="ecflow-client-2020-10-08,ecflow-client-2020-10-09,ecflow-client-2020-10-10,ecflow-client-2020-10-11",
size=30
)
处理
将消息转换成 pd.DataFrame
对象,提取以下字段:
data.ecf_date
:日期time
:时间data.command
:child 子命令
data_list = []
for message in results["hits"]["hits"]:
message_time = pd.to_datetime(message["_source"]["time"])
data_list.append({
"date": pd.to_datetime(message["_source"]["data"]["ecf_date"]),
"time": message_time.round("s").time(),
"event": message["_source"]["data"]["command"],
})
message_df = pd.DataFrame(data_list)
message_df.head()
方法
事件记录包含时间和执行的命令,需要比较记录的命令是否相同,记录的时间是否保持一致。
下面针对单个时次的单个节点,对比 ecFlow 日志和 ecFlow 命令消息的事件记录
从 ecFlow 日志中提取
node_path_df = log_df[log_df.node_path==node_path]
day_df = node_path_df[node_path_df.date==query_date]
day_df[["date", "time", "event"]]
从 ecFlow 消息中提取
message_df[message_df.date==query_date]
两者的命令顺序完全一致,时间基本相同,最多相差 1s,在可以接受范围内。
实验
下面进行批量对比:
- 比较每个任务节点的命令执行顺序是否相同
- 计算每个任务节点所有命令执行时间的均方根误差
任务节点
首先使用 nwpc-oper/nwpc-workflow-model 提取所有的任务节点。
from nwpc_workflow_model.ecflow import Bunch
from nwpc_workflow_model.visitor import (
NodeVisitor,
pre_order_travel,
)
从节点路径列表中创建节点树 Bunch
对象
node_paths = log_df.node_path.unique()
node_paths[:10]
array(['/globalchartos/00/initial',
'/globalchartos/00/deterministic/base/000/wmc_div_250_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_wind_windbarb_850_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_vor_925_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_wind_windbarb_700_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_vor_700_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_temp_2m_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_vor_250_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_humidity_850_sep_000',
'/globalchartos/00/deterministic/base/000/wmc_hgt_250_sep_000'],
dtype=object)
bunch = Bunch()
for n in node_paths:
bunch.add_node(n)
创建 TaskVisitor
类,并提取任务节点。
class TaskVisitor(NodeVisitor):
def __init__(self):
self.tasks = []
def visit(self, node):
if node.is_leaf():
self.tasks.append(node)
visitor = TaskVisitor()
pre_order_travel(bunch, visitor)
len(visitor.tasks)
486
筛选函数
创建提取某节点某天事件记录的函数
日志
get_from_log()
函数从 log_df
中提取某节点某天的条目
def get_from_log(
log_df,
node_path,
date: pd.Timestamp,
):
node_path_df = log_df[log_df.node_path==node_path]
day_df = node_path_df[node_path_df.date==date]
return day_df[["date", "time", "event"]]
消息
使用 nwpc-oper/nwpc-message-tool 项目集成的检索接口获取消息记录。
import nwpc_message_tool.source.ecflow_client
from nwpc_message_tool.storage import EsMessageStorage
创建客户端 EsMessageStorage
client = EsMessageStorage(
hosts=[elastic_server],
debug=False,
show_progress=False
)
get_from_message()
用于从 ElasticSearch 中检索消息并返回 pd.DataFrame
对象
def get_from_message(
node_path,
date: pd.Timestamp,
):
results = client.get_ecflow_client_messages(
engine=nwpc_message_tool.source.ecflow_client,
node_name=node_path,
ecflow_host="XXXXX",
ecflow_port="XXXXX",
ecf_date=date,
size=20,
)
data_list = []
for message in results:
message_time = message.time
data_list.append({
"date": message.ecf_date,
"time": message_time.round("s").time(),
"event": message.command,
})
message_df = pd.DataFrame(data_list)
return message_df
测试单个任务
节点路径和日期
node_path = "/globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066"
query_date = pd.to_datetime("2020-10-07")
ecFlow 日志结果
node_log_df = get_from_log(log_df, node_path, query_date)
node_log_df
消息平台结果
node_message_df = get_from_message(node_path, query_date)
node_message_df
对比两个结果的事件序列是否相同
np.array_equal(
node_log_df.event.values,
node_message_df.event.values
)
True
验证事件顺序
验证所有任务节点的事件顺序是否相同
下面的代码会输出顺序不一样的节点路径
for node in tqdm(visitor.tasks):
node_path = node.get_node_path()
node_log_df = get_from_log_record(log_df, node_path, query_date)
node_message_df = get_from_message(node_path, query_date)
if not np.array_equal(node_log_df.event.values, node_message_df.event.values):
print(node_path)
没有输出节点路径,说明所有节点的事件顺序均相同
对比事件时间
既然事件顺序相同,就可以为每个节点的所有事件时间计算均方根误差 (RMSE)
rmses = []
for node in tqdm(visitor.tasks):
node_path = node.get_node_path()
node_log_df = get_from_log_record(log_df, node_path, query_date)
node_message_df = get_from_message(node_path, query_date)
rmse = np.sqrt(
np.mean(
np.power(
(
node_message_df.time.apply(lambda x: pd.to_timedelta(x.isoformat()))
- node_log_df.time.apply(lambda x: pd.to_timedelta(x.isoformat())).reset_index(drop=True)
) / pd.Timedelta(seconds=1),
2
)
)
)
rmses.append(rmse)
RMSE 的最大值
np.max(rmses)
1.4907119849998598
最大值为 1.5 秒,在可以接受范围内。
结论
以上测试说明,ecFlow 命令消息可以代替 ecFlow 日志中的 child 类型条目。
参考
nwpc-oper/nwpc-message-client 项目
https://github.com/nwpc-oper/nwpc-message-client
nwpc-oper/nwpc-message-tool 项目
https://github.com/nwpc-oper/nwpc-message-tool
NWPC消息平台 系列文章: