GRIB笔记:获取并加载GRIB 2消息字节流

目录

之前的文章《GRIB笔记:从GRIB 2文件中加载单个要素场》介绍如何从本地文件中获取单个要素场。

如果 GRIB 2 文件保存在远程服务器,想要在本地服务器加载一个要素场,可以将该文件下载到本地,再按照上文介绍的方法读取一个要素场。 这种方式不用修改现有代码,但数据传输量比较大。

如果在远程服务器从文件中提取要素场,保存为一个临时文件后,本地服务器再下载,就会显著减少数据传输量。 这也是目前单位正在搭建的数据平台使用的方法。

下面首先介绍如何获取 GRIB 2 消息的原始字节流。

获取 GRIB 2 消息字节流

如果已经从文件中加载 GRIB 2 消息,可以使用 eccodes-python 库提供 eccodes.codes_get_message 函数返回 GRIB 2 消息的原始字节数据。

raw_bytes = eccodes.codes_get_message(message_id)

这种方式适合手动查找要素场的场景,需要遍历 GRIB 2 文件,找到符合条件的 GRIB 2 消息,每次运行都需要对文件进行解码。

如果之前已对文件进行一次解码,可以保留每个消息对应的起始字节的偏移量和消息长度,直接从文件加载 GRIB 2 消息。 这也是单位数据平台的核心功能。

下面的函数仅利用偏移量信息offset,从文件file_path中直接加载并返回对应的 GRIB 2 消息。

@attr.s
class GribMessageIndex(object):
    offset = attr.ib()
    length = attr.ib()
    file_path = attr.ib()


def load_bytes_from_index(index: GribMessageIndex) -> bytes or None:
    with open(index.file_path, "rb") as f:
        f.seek(index.offset)
        message = eccodes.codes_grib_new_from_file(f)
        if message is None:
            return None
        raw_bytes = eccodes.codes_get_message(message)
        return raw_bytes

如果同时使用偏移量和消息长度length,则可以直接读取文件返回字节流,无需使用 eccodes-python 。

从字节流中加载 GRIB 2 消息

获取到 GRIB 2 消息的字节流后,可以使用 eccodes.codes_new_from_message 函数从字节流 bytes 中加载 GRIB 2 消息。

message = eccodes.codes_new_from_message(raw_message)

message 与从文件中创建的 GRIB 2 handler 类型一样,支持同样的操作。

远程传输 GRIB 2 消息内容

下载抽取后的单个要素场文件不一定是最佳的选择。 如果只需要在程序中读取要素场内容,不需要保存成本地文件,那么下载单个要素场文件会在服务器和客户端产生两次额外的磁盘IO。

远程传输 GRIB 2 消息的字节流可以省去服务端写入和客户端读取的磁盘IO操作。

笔者使用 Flask 搭建一个简单的 Web 服务,根据请求返回对应的 GRIB 2 消息字节流。 请求和响应都使用 JSON 格式,但JSON无法直接使用字节流数据。

笔者使用 protobuf 库将字节流序列化为 JSON 字符串,可以直接传输。

下面是一个简单的proto文件,消息RawField中只有一个成员,即 GRIB 2 消息的原始字节流 raw_bytes

syntax = "proto2";

package transport;

message RawField {
    required bytes raw_bytes = 1;
}

使用 google.protobuf.json_format 模块实现 protobuf 对象和 JSON 字符串之间的相互转换。

from google.protobuf import json_format
from .transport_pb2 import RawField

def convert_to_json(raw_field):
    return json_format.MessageToJson(raw_field)

def load_from_json(json_string, cls):
    return json_format.Parse(json_string, cls())

服务器端使用 convert_to_json 将字节流转换为字符串,客户端使用 load_from_json 解析出原始的字节流。

详细实现请参考 nwpc-oper/nwpc-gdata 项目。

参考

nwpc-oper/nwpc-data

nwpc-oper/nwpc-gdata