Cloud-Ws 2021:事件通知工具Aviso

目录

本文正文部分来自 ECMWF 于 2021 年 2 月召开的 Virtual workshop: Weather and climate in the cloud (Cloud-WS 2021) 中的如下演讲:

Enabling machine learning for weather and climate data in the cloud

Enabling notifications for meteorological workflows across HPC and Cloud data centres

by C. Iacopino, J. Hawkes, T. Quintino, B. Raoult

at Cloud-WS 2021

Aviso:为 HPC 和 Cloud 数据中心的气象工作流启用通知

正文

简介

背景

Cloud Data-as-a-Service

ECMWF 每天生成大量数据,原始数据输出和分发产品的大小在持续增长:

  • 5km 集合预报,将在 1 小时内生成 1 PB 数据
  • 将数据从中心转移出去变得令人望而却步
  • 云数据模型:将计算向数据靠拢

图片来自幻灯片

启用跨数据中心的多域工作流 (HPC + Cloud)

对于强时效性的任务,通知数据的可用性至关重要。

目标

Aviso 是 ECMWF 开发的软件,目标是:

  • 通知数据可用性 (data availability)
  • 面向自动化自动化系统 (automated systems)
  • 支持 When … Do …

图片来自幻灯片

角色

领域特定的通知系统

  • 使用气象语言 (MARS DSL)
    • 理解我们的元数据
    • 用户经验丰富
  • 不限定协议
    • 我们开发多种接口:MQTT,AMQP,RabbitMQ,Kafka,CloudEvents 等

持久化状态

  • 一致性
  • 事务性
  • 可靠性 (通知历史)

利用现代云技术

  • 高可用,高吞吐量
  • 可扩展,分布式
  • 轻量级客户端

使用案例

注:尚未在 ECMWF 正式使用

  • 实时模式输出
  • ECPDS 产品分发
  • 来自 ecFlow 套件发送的事件

图片来自幻灯片,为原始模式输出和产品发送事件

通用工作流

服务端:基于键值存储的微服务架构

客户端:命令行应用或 Python API

用户:

  • 终端用户
  • 通知提供者

工作流

  1. 终端用户订阅某个事件,编辑触发器
  2. Aviso 客户端获取更新
  3. 某个通知提供者提交一个通知
  4. 通知订阅者有一个新事件
  5. 事件触发用户的工作流

图片来自幻灯片

Key: 产品元数据

Value:产品位置

服务端

etcd 集群

  • 高度一致的分布式键值存储
  • 基于 Raft 共识算法的领导人选举
  • 容忍机器故障和网络分区
  • 提供 gRPC 和 RESTful 接口
  • 以每秒 10,000 次写入为基准

3 集群的基准数据

N. keysN. clientsWrite QPSLatency x req.
100,000100050,10020 ms

客户端

命令行

编写触发器 (my_lisnter.yaml),每个 listener 包括:

  • event:事件
  • request:元数据
  • triggers:触发器
listeners:
  - event: dissemination
    request: 
      destination: FOO 
      stream: enfo 
      step: [1,2,3]
    triggers:
      - type: command 
        command: my_script.sh 
        environment:
          STEP: ${request.step}

监听事件

aviso listen my_listener.yaml

监听过去特定时间段的事件

aviso listen \
  --from 2020-10-20T00:00:00.0Z \
  --to 2020-10-21T00:00:00.0Z

使用用户定义的通知进行测试

aviso notify \
  event=mars,class=od,date=20190810,domain=g,expver=1,step=1,stream=enfo,time=0 \
  --test

Python API

Aviso 客户端支持 Python API。示例:

from pyaviso import NotificationManager
from ecmwfapi import ECMWFService

定义被调用的函数

def do_something(notification):
    mars_server = ECMWFService("mars") 
    request = notification["request"] 
    request.update({
        "type": "fc",
        "levtype": "sfc",
        "param": 167.128
    })
    mars_server.execute(request, "my_data.grib")

定义触发器

# define the trigger
trigger = {"type": "function", "function": do_something}

创建使用触发器的事件监听请求

request = {
    "stream": "enfo", 
    "date":	20190810, 
    "time": 0
}	
listeners = {
    "listeners": [
        {
            "event":	"mars", 
            "request": request,	
            "triggers": [trigger]
        }
    ]
}

运行

aviso = NotificationManager()
aviso.listen(listeners=listeners)

CloudEvents

CloudDevent 是一种旨在以通用方式描述事件数据的规范

  • 一致性:为不同事件源重用事件处理逻辑
  • 可访问性:提供 Go、JavaScript、Java、C#、Ruby 和 Python 的 SDK,用于构建用于跨环境传递事件数据的通用库、工具和基础设施
  • 便携性:不同云平台之间的互操作性,由大多数云供应商实现
{
  "specversion": "1.0",
  "type": "aviso", 
  "source": "https://...",
  "id": "0c02fdc5-148c-43b5-b2fa-cb1f590369ff", 
  "datacontenttype": "application/json", 
  "data": {
    "event": "mars",
    "request": {...},
    "location": "…"
  }
}

数据包含在 data 字段

Aviso 中的 CloudEvents,图片来自幻灯片

CloudEvents 作为 Aviso 通知

CloudEvents 应用在如下两个方面:

  • 通知发送者和服务之间的协议
  • 服务和客户端之间的协议

图片来自幻灯片

图片来自幻灯片

图片来自幻灯片

通知提供者向 HTTP API 发送事件

API:POST https://aviso.ecmwf.int/api/v1/notification

{
  "type": "aviso",
  "specversion": "1.0",
  "source": "/ecpds/emos",
  "id": "0c02fdc5-148c-43b5-b2fa-cb1f590369ff",
  "datacontenttype": "application/json",
  "data": {
    "event": "dissemination",
    "request": {
      "destination": "FOO",
      "target": "E1",
      "class": "od",
      "date": "20190810",
      "domain": "g",
      "expver": "1",
      "step": "1",
      "stream": "enfo",
      "time": "0"
    },
    "location": "s3://storage.ecmwf.europeanweather.cloud/ecpds/E3S1"
  }
}

Aviso 服务端接收事件,保存键值对

Ke

ec/diss/foo/E1/od/g/1/enfo/20190810/0/1

Value

s3://storage.ecmwf.europeanweather.cloud/ecpds/E3S1

客户端监听程序设置

listeners:
  - event: dissemination
    request:
      destination: FOO
    triggers:
      -	type: post
        protocol:
        type: cloudevents
        url: https://my.endpoint.com/

上述客户端监听事件前缀

ec/diss/foo

Aviso 在接收上面事件后,将其发送给 Aviso 客户端

POST https://my.endpoint.com/

{
  "type": "aviso",
  "specversion": "1.0",
  "source": "https://aviso.ecmwf.int",
  "id": "6e364719-4880-49e9-adaf-c2f3334f771d",
  "datacontenttype": "application/json",
  "data": {
    "event": "dissemination",
    "request": {
      "destination": "FOO",
      "target": "E1",
      "class": "od",
      "date": "20190810",
      "domain": "g",
      "expver": "1",
      "step": "1",
      "stream": "enfo",
      "time": "0"
    },
    "location": "s3://storage.ecmwf.europeanweather.cloud/e1c4pds/E3S1"
  }
}

注:两个消息的 source 和 id 字段不同,但 data 字段内容完全相同。

总结

Aviso 是专为高吞吐量设计的可扩展通知系统

自动触发用户定义的工作流程

实时预报数据或派生产品的 ECMWF 数据可用性

CloudEvent 兼容通知,用于跨云互操作

当前状态

Aviso 服务目前已投入准业务运行,对 European Weather Cloud 的用户开放

每天发送大于 30 万条实时数据和产品通知

第一批用户:Met Norway 和 FMI

Beta 版本:

https://github.com/ecmwf/aviso

当前项目

ECMWF 内部开发:

  • Aviso:关于 ECMWF 数据可用性的可扩展通知系统
  • Polytype:用于数据访问和操作的分布式 API

European 项目:

  • Lexis:复杂天气和气候工作流,基于 3 个 HPC 和 2 个云平台系统
  • HiDALGO:多 HPC 站点面向 Global Challenges 应用的按需数据访问

讨论

以下部分仅代表作者在文章发布时的个人观点,不排除未来会有所改变。

事件消息

从当前趋势来看,NWP 业务系统中的产品制作部分将会从高性能计算机 (HPC) 逐步转移到气象大数据云平台 (DaaS) 中。 这就需要一种有效任务触发的机制,能将 HPC 和 DaaS 中的工作流整合。

当前 NWPC 的产品后处理系统使用两种触发机制:

  • 与模式积分整合的后处理系统使用 ecFlow 触发器
  • 独立的后处理系统使用基于数据文件的驱动

详细介绍请参考《NWPC笔记:检查输出数据

当数据保存到 DaaS 后,就无法使用上述两种方式,基于 事件消息 的触发机制成为一种最直接的选择。 Avsio 库实现了事件消息触发机制所需要的核心功能,并提供丰富的功能。

基于 事件消息 触发的工作流一直是我非常感兴趣的领域。 之前申请的两个项目都或多或少与其相关,可以看成对该领域前置技术的研究。 两个项目都是在单位申请的小项目 (个人),分别是:

  • 《基于分布式调度的绘图技术研究》(2017-2019),已结题。 使用消息队列将批量绘图任务分发给多个工作负载节点,实现批量绘图。 类似 Aviso 中的客户端监听机制和精简版的加工流水线,但缺乏工作流机制。

  • 《基于消息通讯的数值预报业务系统运行监视和分析技术研发》(2020-2021),在研。 发送产品生成和任务状态变化消息,用于实时监视产品生成情况,并支撑历史运行情况分析。 类似 Aviso 中发送的产品事件消息,但缺乏消息驱动任务机制和消息持久化保存。

关于第二个项目的更多介绍可以浏览本博客的 nwpc-message 标签。

整合方案

今年单位将继续开展后处理系统移植加工流水线的相关工作,数据事件消息 将成为驱动加工流水线任务的关键机制。 我对在 HPC 和 DaaS 上整合后的 NWPC 业务系统有两套不成熟的监控方案。

HPC/ecFlow 与 DaaS/web

使用两套监控系统:

  • HPC 依然使用 ecFlow
  • DaaS 开发新的监控系统 (基于 Web)

优点:

  • 两套系统之间互不干扰
  • 产品制作系统可以利用 DaaS 提供的各项基础设施进行全新设计,增加系统的灵活性

缺点:

  • 运维人员从监控单一界面变为同时监控至少两个界面
  • 新开发的监控系统需要一段时间调试才能达到最基本的实时业务运维要求

HPC 与 DaaS 使用两套监控系统

HPC+DaaS/ecFlow

使用一套监控系统:

  • HPC 沿用 ecFlow
  • DaaS 任务与 ecFlow 整合,使 ecFlow 可以控制、查询加工流水线上的任务
  • 或者开发新的监控平台整合 ecFlow 和加工流水线 (太复杂,暂不考虑)

优点:

  • 运维人员仅需监控单一界面 (ecFlow UI)
  • 后处理系统流程保持不变,方便在 HPC 和 DaaS 之间切换

缺点:

  • 需要研究技术可行性,对 DaaS 接口有要求
  • 放弃加工流水线自带的自动任务触发机制
  • 依然要符合 ecFlow 的树形结构,当前产品制作系统灵活性不足的问题依然存在

HPC 与 DaaS 均使用 ecFlow 监控

以上方案尚未经过仔细思考,后续会继续研究。

正如去年我在《气象大数据云平台培训感想》中提到的一样,目前我依然对研究气象大数据云平台相关技术持谨慎态度。

参考

Virtual workshop: Weather and climate in the cloud

Aviso: Enabling notifications for meteorological workflows across HPC and Cloud data centres

幻灯片:https://events.ecmwf.int/event/211/contributions/1862/attachments/979/1718/Cloud-WS_Iacopino.pdf

视频:https://vimeo.com/510345510