NWPC消息平台:运行状态分析算法

目录

本文属于介绍 NWPC 消息平台 系列文章。

简介

NWPC 的数值预报业务系统使用工作流软件 ecFlow 搭建,每个业务系统由大量有依赖关系的任务组成。 NWPC 消息平台为 ecFlow 系统设计运行状态变化消息,记录业务系统中每个任务的运行状态变化信息。 每天滚动循环中,每个任务会产生多条状态变化消息记录,形成一个状态变化序列。 值班人员更关心某个时次某个任务的运行情况,比如运行的起止时间,运行持续时间等具体信息。 虽然可以从状态变化序列中人工计算得到,ecFlow UI 中也提供了类似的统计功能,但这两种方式仅适合对有限任务进行少量分析。 为了实现对长时间序列的批量分析,需要设计一种自动分析运行状态的方法。

本文设计并实现一种基于确定有限状态自动机的业务系统运行状态分析算法,可以计算任意任务的运行情况。

数据

本文使用 NWPC 消息平台生成的运行状态变化状态消息作为输入数据。

消息示例如下:

{
  "app" : "nwpc-message-client",
  "type" : "ecflow-client",
  "time" : "2021-01-01T04:51:15.825482366Z",
  "data" : {
    "args" : [
      "27949969"
    ],
    "command" : "init",
    "ecf_date" : "20210101",
    "ecf_host" : "login_b01",
    "ecf_name" : "/globalchartos/00/deterministic/base/024/wmc_humidity_700_sep_024",
    "ecf_port" : "31071",
    "ecf_rid" : "27949969",
    "ecf_tryno" : "1",
    "envs" : null
}

其中

  • time 表示事件发生的时间
  • data.command 表示任务的状态变化
  • data.ecf_name 表示 ecFlow 中任务节点路径
  • data.ecf_date 表示任务运行的日期

一个运行正常的任务的状态变化序列如下所示。 第 1 个字段是当前系统日期,第 2-3 字段是事件发生的时间,最后一个字段是状态变化类型。

其中类型如下:

  • submit:任务提交
  • init:任务开始运行
  • submit:任务运行结束
  • abort:任务运行出错
20201226 2020-12-26 05:08:23 submit
20201226 2020-12-26 05:08:23 init
20201226 2020-12-26 05:12:09 complete

下面是一个运行出错的任务示例。

globalchartos 系统在 2020 年 11 月 22 日 00 时次的 /globalchartos/12/deterministic/base/144/wmc_div_700_sep_144 任务。

从输出可以看到运行前两次出错,第三次正常结束。

20201122 2020-11-22 16:58:04 submit
20201122 2020-11-22 16:58:05 init
20201122 2020-11-22 17:12:34 abort

20201122 2020-11-22 17:12:35 submit
20201122 2020-11-22 17:12:35 init
20201122 2020-11-22 17:12:36 abort

20201122 2020-11-22 17:46:00 submit
20201122 2020-11-22 17:46:04 init
20201122 2020-11-22 18:00:46 complete

算法

确定有限状态自动机确定有限自动机 (deterministic finite automaton, DFA) 是一个能实现状态转移的自动机。 对于一个给定的属于该自动机的状态和一个属于该自动机字母表 Sigma 的字符,它都能根据事先给定的转移函数转移到下一个状态。[1]

本文基于确定优先状态自动机设计统计业务系统任务运行情况的算法,用于计算正常情况下任务的运行起止时间以及运行时长。 算法示意图如下所示。

计算任务运行状态的DFA示意图,包括 6 种状态和 4 类事件

DFA 包括 6 个状态:

  • initial:初始状态
  • submit:任务已提交
  • activate:任务已运行
  • complete:任务已完成
  • error:任务出错
  • unknown:未知状态

其中 completeerrorunknown 是终止状态。 算法仅在终止状态是 complete 时计算运行起止时间和时长。

DFA 包含 4 个事件,对应任务运行状态变化数据:

  • submit:任务提交
  • init:任务运行
  • complete:任务完成
  • abort:任务结束

正常情况下,DFA 起始状态是 initial,接收 submit 事件变为 submit 状态,接收 init 事件变为 activate 状态,接收 complete 事件转为 complete 状态。 算法根据上面三个事件发生的时间计算任务运行的起止时刻和运行时长。

结果分析

任务运行情况计算结果

下面是 globalchartos 系统 2020 年 12 年 26 日 00 时次 /globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066 任务的运行情况。 可以看到该任务于 05:08:24 UTC 开始运行,到 05:12:10 UTC 运行结束,共运行 3 分 42 秒。

{
  "situation": "complete",
  "time_periods": [
    {
      "start_time": "2020-12-26 05:08:24+00:00",
      "end_time": "2020-12-26 05:12:10+00:00",
      "period_type": "in_all"
    },
    {
      "start_time": "2020-12-26 05:08:24+00:00",
      "end_time": "2020-12-26 05:08:24+00:00",
      "period_type": "in_submitted"
    },
    {
      "start_time": "2020-12-26 05:08:24+00:00",
      "end_time": "2020-12-26 05:12:10+00:00",
      "period_type": "in_activate"
    }
  ],
  "time_points": [
    {
      "status": "submit",
      "time": "2020-12-26 05:08:24+00:00" 
    },
    {
      "status": "init",
      "time": "2020-12-26 05:08:24+00:00" 
    },
    {
      "status": "complete",
      "time": "2020-12-26 05:12:10+00:00" 
    }
  ]
}

长时间序列分析

下面是 globalchartos 系统 /globalchartos/00/initial 任务在 2020 年 12 月的运行情况。 该任务定时检测 GRAPES GFS 系统的 GRIB 2 产品是否生成。 第一张图是任务运行持续时间,第二张图是任务起止时间段。 可以看到 12 月 28 日 00 时的 GRIB 2 产品生成较晚。

globalchartos 系统数据检查任务运行情况

实现

使用 transitions 库实现确定有限状态自动机算法。 以下代码摘自 nwpc-oper/nwpc-message-tool 项目

数据

设计 DFA 需要确定状态和与状态转移相关的事件

DFA 使用的 6 种状态:

class TaskSituationType(Enum):
    Initial = "initial"
    Submit = "submit"
    Active = "active"
    Complete = "complete"
    Error = "error"
    Unknown = "unknown"

DFA 事件由一个抽象数据表示

from abc import ABC, abstractmethod

# 事件类型
class StatusChangeType(Enum):
    Unknown = "unknown"
    Submit = "submit"
    Initial = "init"
    Complete = "complete"
    Abort = "abort"

# 事件数据
class NodeStatusChangeData(ABC):
    @property
    @abstractmethod
    def date_time(self) -> datetime.datetime:
        pass

    @property
    @abstractmethod
    def status(self) -> StatusChangeType:
        pass

需要使用一个接口将运行状态变化消息数据 EcflowClientMessage 转为 DFA 事件数据 NodeStatusChangeData。 本文通过组合方式实现,构造一个符合 NodeStatusChangeData 接口规范的新对象 StatusChangeEntry,内部保存 EcflowClientMessage 对象。

@NodeStatusChangeData.register
class StatusChangeEntry(object):
    def __init__(self, record: EcflowClientMessage):
        self._record = record

    @property
    def status(self) -> StatusChangeType:
        return convert_command_toStatus_change_type(self._record.command)

    @property
    def date_time(self) -> datetime.datetime:
        return self._record.time.ceil("S").to_pydatetime()

自动机

实现自动机需要设置自动机的状态,并为状态转换设置触发条件。 transitions 还支持在状态发生转移的前后执行自定义的回调函数。

计算正常运行任务运行情况的流程示意图,每次状态变化前后会调用回调函数

创建 DFA 对象

self.machine = Machine(
    model=self,
    states=TaskSituationType,
    initial=TaskSituationType.Initial,
    after_state_change=self.change_node_situation_type,
)

为初始状态 initial 设置状态转移和回调函数:

  • submit 事件,状态转为 submit。 状态变化后,执行 enter_new_cycle 函数初始化状态数据, 执行 set_cycle_time_point 函数设置 submit 时间点。
  • 其它事件,状态转为 unknown。
source = TaskSituationType.Initial

self.machine.add_transition(
    trigger=StatusChangeType.Submit.value,
    source=source,
    dest=TaskSituationType.Submit,
    before=self.add_node_data,
    after=[self.enter_new_cycle, self.set_cycle_time_point],
)

for t in (e.value for e in [
    StatusChangeType.Initial,
    StatusChangeType.Complete,
    StatusChangeType.Abort,
]):
    self.machine.add_transition(
        trigger=t,
        source=source,
        dest=TaskSituationType.Unknown,
    )

为提交状态 initial 设置状态转移和回调函数:

  • active 事件,状态转为 active。 状态改变后,执行 set_cycle_time_point() 函数设置 active 事件时间点
  • abort 事件,状态转为 abort。
  • 其它事件,状态转为 unknown。
source = TaskSituationType.Submit

self.machine.add_transition(
    trigger=StatusChangeType.Initial.value,
    source=source,
    dest=TaskSituationType.Active,
    before=self.add_node_data,
    after=self.set_cycle_time_point,
)

self.machine.add_transition(
    trigger=StatusChangeType.Abort.value,
    source=source,
    dest=TaskSituationType.Error,
    before=self.add_node_data,
    after=self.set_cycle_time_point,
)

for s in (StatusChangeType.Complete, StatusChangeType.Submit):
    self.machine.add_transition(
        trigger=s.value,
        source=source,
        dest=TaskSituationType.Unknown,

为运行状态 active 设置状态转移和回调函数:

  • complete 事件,状态转为 complete。 状态改变后,执行 set_cycle_time_point() 设置 complete 时间点,执行 calculate_time_period() 计算任务运行时间段。
  • abort 事件,状态转为 abort。
  • 其它事件,状态转为 unknown。
source = TaskSituationType.Active

self.machine.add_transition(
    trigger=StatusChangeType.Complete.value,
    source=source,
    dest=TaskSituationType.Complete,
    before=self.add_node_data,
    after=[
        self.set_cycle_time_point,
        self.calculate_time_period,
    ],
)

self.machine.add_transition(
    trigger=StatusChangeType.Abort.value,
    source=source,
    dest=TaskSituationType.Error,
    before=self.add_node_data,
    after=self.set_cycle_time_point,
)

for s in (StatusChangeType.Submit, StatusChangeType.Initial):
    self.machine.add_transition(
        trigger=s.value,
        source=source,
        dest=TaskSituationType.Unknown,
    )

示例

批量计算节点运行状态。

下面代码已 globalchartos 系统的 /globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066 任务节点为例进行说明。

载入需要使用到的库

import pandas as pd

import nwpc_message_tool.source.ecflow_client
from nwpc_message_tool.storage import EsMessageStorage

from nwpc_message_tool.analytics.calculator import SituationCalculator
from nwpc_message_tool.analytics.task_status_change_dfa import TaskStatusChangeDFA, TaskSituationType

获取状态变化消息数据

node_name = "/globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066"
start_date = pd.to_datetime("2021-01-01")
end_date = pd.to_datetime("2021-01-02")

elastic_server = "some host"

engine = nwpc_message_tool.source.ecflow_client
client = EsMessageStorage(
    hosts=[elastic_server],
)

results = client.get_ecflow_client_messages(
    engine=engine,
    node_name=node_name,
    ecflow_host="host",
    ecflow_port="port",
    ecf_date=(start_date, end_date)
)
results = list(results)
for record in results:
    print(f"{record.ecf_date.strftime('%Y%m%d')} {record.time.strftime('%Y-%m-%d %H:%M:%S')} {record.command}")
20210101 2021-01-01 04:59:44 submit
20210101 2021-01-01 04:59:44 init
20210101 2021-01-01 04:59:44 meter
20210101 2021-01-01 04:59:44 event
20210101 2021-01-01 05:03:34 meter
20210101 2021-01-01 05:03:34 meter
20210101 2021-01-01 05:03:34 event
20210101 2021-01-01 05:03:34 meter
20210101 2021-01-01 05:03:34 event
20210101 2021-01-01 05:03:34 complete
20210102 2021-01-02 04:59:08 submit
20210102 2021-01-02 04:59:08 init
20210102 2021-01-02 04:59:08 meter
20210102 2021-01-02 04:59:09 event
20210102 2021-01-02 05:03:15 meter
20210102 2021-01-02 05:03:15 meter
20210102 2021-01-02 05:03:15 event
20210102 2021-01-02 05:03:15 meter
20210102 2021-01-02 05:03:15 event
20210102 2021-01-02 05:03:15 complete

计算运行状态

calculator = SituationCalculator(
    dfa_engine=TaskStatusChangeDFA,
    stop_states=(
        TaskSituationType.Error,
        TaskSituationType.Complete,
        TaskSituationType.Unknown,
    )
)

situations = calculator.get_situations(
    records=results,
    node_path=node_name,
    start_date=start_date,
    end_date=end_date
)
for situation in situations:
    logger.info(f'{situation.date.strftime("%Y-%m-%d")}: {situation.node_situation.situation.value}')
2021-01-07 11:58:56.966 | INFO     | nwpc_message_tool.analytics.calculator:get_situations:75 - Finding StatusLogRecord for /globalchartos/00/deterministic/base/066/wmc_vor_250_sep_066
2021-01-07 11:58:56.966 | INFO     | nwpc_message_tool.analytics.calculator:get_situations:81 - Calculating node status change using DFA...
2021-01-07 11:58:56.968 | INFO     | nwpc_message_tool.analytics.calculator:get_situations:108 - Calculating node status change using DFA...Done
2021-01-07 11:58:56.968 | INFO     | __main__:main:52 - 2021-01-01: complete

参考

参考文献

[1] https://zh.wikipedia.org/wiki/确定有限状态自动机

运行状态变化数据

NWPC消息平台:ecFlow 命令消息

NWPC消息平台:验证 ecFlow 命令消息

NWPC消息平台:发送ecFlow命令消息

NWPC 消息平台项目

nwpc-oper/nwpc-message-client

nwpc-oper/nwpc-message-tool