NWPC消息平台:产品事件消息
本文属于介绍 NWPC 消息平台 系列文章。
NWPC 消息平台使用的产品事件消息 (Production Event Message) 用于记录与产品相关的事件,比如
- 产品已生成
- 产品生成出错
- 产品已完成上传
- 产品上传出错
本文首先介绍通用的事件消息结构,然后介绍目前已在 NWPC 消息平台中使用的产品消息结构。
事件消息
NWPC 消息平台使用事件消息 (Event Message) 表示需要记录的某个事件。
消息结构
通用事件消息的结构如下图所示:
其中
app
:发出事件的应用名,例如nwpc-message-client
type
:事件类型,例如production
表示产品消息time
:事件发出时的时间戳,使用 RFC 3339 格式的字符串表示data
:数据字段,键值对,与事件相关的数据
为了保持消息的一致性,每种类型的消息都需要预先定义数据字段。 但正如下面即将介绍的产品消息,每个事件消息不一定包含该类型消息所有的数据字段,可以从备选字段中选择需要的字段。 换一种说法就是,每类消息还可能有子类型。
事件状态
事件消息的一个重要组成部分就是事件的状态。
NWPC 消息平台借鉴 ecFlow 的节点运行状态,使用统一的事件状态 (EventStatus
) 定义,包含以下几种状态:
unknown
:未知状态,默认值complete
:完成queued
:排队aborted
:出错submitted
:提交active
:运行suspended
:挂起
产品消息
产品消息用于记录与产品制作、分发等步骤相关的事件。
目前只记录产品完成上传(分发)的事件。
产品事件消息由三部分构成:
- 产品信息:定义事件描述的是何种产品
- 产品属性:描述在某产品系列中某个特定产品(单个文件)的属性
- 事件状态:事件类型和事件状态
产品定义
首先要定义产品的类型。 产品定义包含以下四个必选字段:
system
:系统名称stream
:产品流type
:产品类型name
:产品名称
系统名称
NWPC 同时运行多套业务数值预报系统,包括 2 个全球模式系统和 4 个区域模式系统:
grapes_gfs_gmf
:GRAPES 全球预报系统grapes_gfs_gda
:GRAPES 全球同化系统grapes_meso_10km
:GRAPES 区域 10km 预报系统grapes_meso_3km
:GRAPES 区域 3km 预报系统grapes_tym
:GRAPES 区域台风预报系统grapes_geps
:GRAPES 全球集合预报系统grapes_reps
:GRAPES 区域集合预报系统
产品流
对于同一种类的产品,不同模式可能需要使用不同的属性来描述单个文件。 例如,确定性模式的 GRIB 2 数据文件产品需要使用起报时次 (start time) 和预报时效 (forecast time),而集合预报模式的 GRIB 2 数据文件产品还需要加上预报成员 (member)。
为了实现对多种属性字段的支持,NWPC 消息平台借用 ECMWF 的 MARS 系统关键字 stream
表示产品流,区分不同的属性集合,与 type
字段相结合确定产品属性。
目前仅支持两种产品流:
oper
:表示确定性预报eps
:表示集合预报
产品类型
type
字段用于表示不同的产品类型。例如:
obs
:观测资料fcst
:模式输出,一般为 GRADS 二进制文件grib2
:GRIB 2 格式数据产品graph
:图片产品
目前 NWPC 消息平台仅使用 grib2
产品类型。
产品名称
同一产品类型可能有不同的产品,比如模式系统会根据用户需求制作不同区域范围、不同分辨率、包含不同要素场的数据产品。
NWPC 消息平台使用 name
字段表示产品名称。
目前仅支持一种产品:
orig
:GRIB 2 类型 (type="grib2"
),原始分辨率 GRIB 2 产品
消息结构
产品事件消息的 data
数据字段由三部分构成:
ProductionInfo
:产品信息ProductionEvent
:产品事件ProductionPreperties
:产品属性
对于所有的产品事件消息,前两个部分字段完全相同,产品属性部分的字段由产品信息中的 stream
,type
等字段决定。
消息结构如下图所示:
产品信息
产品信息 (Production Info) 部分确定消息描述何种产品。
由前面“产品定义”章节介绍的四个字段构成:
system
:系统名称stream
:产品流type
:产品种类name
:产品名称
事件状态
事件状态 (Production Event) 部分描述事件的种类和状态,包括两个字段:
event
:产品事件种类 (ProductionEvent
)status
:事件状态 (EventStatus
)
产品事件种类
目前仅有一种产品事件:
storage
:产品上传二级存储
事件状态
产品事件的状态使用通用的事件状态 (EventStatus
)。
产品属性
产品信息 (Production Info) 部分仅确定产品的类别,而产品属性 (ProductionProperties) 部分包含确定单一产品实体(通常为单个文件)的具体属性。 不同类别的产品可能包含不同的产品属性。
目前仅支持两种面向预报产品的属性集:
OperationProductionProperties
:确定性预报模式预报产品EpsProductionProperties
:集合预报模式预报产品
确定性模式
确定性模式 (OperationProductionProperties
) 属性包含以下字段:
start_time
:起报时次,ISO 格式时间字符串forecast_time
:预报时效,时间段字符串,例如120h
表示 120 小时预报
下面是一个由业务系统实时生成的产品事件消息,表示 GRAPES GFS GMF 系统 2020 年 10 月 3 日 06 时次 120h 的原始分辨率 GRIB 2 数据产品完成上传二级存储。
{
"app": "nwpc-message-client",
"type": "production",
"time": "2020-10-03T10:47:20.395949667Z",
"data": {
"event": "storage",
"forecast_time": "120h",
"name": "orig",
"start_time": "2020-10-03T06:00:00Z",
"status": 1,
"stream": "oper",
"system": "grapes_gfs_gmf",
"type": "grib2"
}
}
集合预报模式
集合预报模式 (EpsProductionProperties
) 属性包含以下字段:
start_time
:起报时次,ISO 格式时间字符串forecast_time
:预报时效,时间段字符串,例如120h
表示 120 小时预报number
:集合成员,整型数字。其中 0 表示控制预报,1 及以上数字表示集合成员。
下面是一个用于测试的事件消息,表示 GRAPES GEPS 系统 2020 年 3 月 5 日 00 时次原始分辨率 GRIB 2 产品完成上传二级存储。
{
"app": "nwpc-message-client",
"type": "production",
"time": "2020-10-07T20:12:01.9177305+08:00",
"data": {
"system": "grapes_geps",
"stream": "eps",
"type": "grib2",
"name": "orig",
"start_time": "2020-03-05T00:00:00Z",
"forecast_time": "0h",
"number": 1,
"event": "storage",
"status": 0
}
}
实现
下面介绍 nwpc-oper/nwpc-message-client 项目中对产品事件消息的实现。
事件消息
EventMessage
是通用的事件消息结构体。其中,Data
成员使用 interface{}
接口,支持任意对象
type EventMessage struct {
App string `json:"app"` // app name
Type string `json:"type"` // type
Time time.Time `json:"time"` // time string, RFC 3339
Data interface{} `json:"data"` // data structure
}
产品消息
产品消息使用通用事件消息结构体 EventMessage
,并为每类消息创建单独的数据 Data
结构体。
OperationProductionData
表示确定性模式预报产品事件数据EpsProductionData
表示集合预报模式预报产品事件数据
以上两个事件数据均由相同的产品信息、事件状态和各自不同的产品属性构成。
type OperationProductionData struct {
ProductionInfo
OperationProductionProperties
ProductionEventStatus
}
type EpsProductionData struct {
ProductionInfo
EpsProductionProperties
ProductionEventStatus
}
产品信息
产品信息 (ProductionInfo
) 包含四个字段:
type ProductionInfo struct {
System string `json:"system"` // system name: grapes_gfs_gmf, grapes_gfs_gda
Stream ProductionStream `json:"stream"` // stream: oper, eps, ...
Type ProductionType `json:"type"` // production type: grib2
Name ProductionName `json:"name"` // production name, orig, ...
}
每个字段实际上都是字符串类型,同时还预定义了部分字段的可选值。
type ProductionStream string
const (
ProductionStreamOperation ProductionStream = "oper"
ProductionStreamEPS ProductionStream = "eps"
)
type ProductionType string
const (
ProductionTypeObs ProductionType = "obs"
ProductionTypeForecast ProductionType = "fcst"
ProductionTypeGrib2 ProductionType = "grib2"
ProductionTypeGraph ProductionType = "graph"
)
type ProductionName string
const (
ProductionNameGrib2Orig ProductionName = "orig"
)
事件状态
产品事件状态 (ProductionEventStatus
) 由两个字段构成:
type ProductionEventStatus struct {
Event ProductionEvent `json:"event"` // production event, storage
Status EventStatus `json:"status"` // unknown, complete, queued, aborted, submitted, active, suspended
}
ProductionEvent
表示产品事件种类:
type ProductionEvent string
const (
ProductionEventStorage ProductionEvent = "storage"
)
EventStatus
是通用的事件类型,表示为整型格式。
type EventStatus int
const (
Unknown EventStatus = 0
Complete EventStatus = 1
Queued EventStatus = 2
Aborted EventStatus = 3
Submitted EventStatus = 4
Active EventStatus = 5
Suspended EventStatus = 6
)
产品属性
每类产品都有独立的产品属性。
OperationProductionProperties
是确定性模式预报产品的属性
type OperationProductionProperties struct {
StartTime time.Time `json:"start_time"` // start time, YYYYMMDDHH
ForecastTime string `json:"forecast_time"` // time duration, such as 3h, 12h, 120h
}
其中
StartTime
是起报时次ForecastTime
是预报时效,时间段字符串
EpsProductionProperties
是集合预报模式预报产品的属性
type EpsProductionProperties struct {
StartTime time.Time `json:"start_time"` // start time, YYYYMMDDHH
ForecastTime string `json:"forecast_time"` // time duration, such as 3h, 12h, 120h
Number int `json:"number"`
}
比 OperationProductionProperties
多一个字段:
Number
表示集合预报成员序号
参考
nwpc-oper/nwpc-message-client 项目
https://github.com/nwpc-oper/nwpc-message-client
nwpc-oper/nmc-messasge-client 项目