GRIB笔记:获取并加载GRIB 2消息字节流
之前的文章《GRIB笔记:从GRIB 2文件中加载单个要素场》介绍如何从本地文件中获取单个要素场。
如果 GRIB 2 文件保存在远程服务器,想要在本地服务器加载一个要素场,可以将该文件下载到本地,再按照上文介绍的方法读取一个要素场。 这种方式不用修改现有代码,但数据传输量比较大。
如果在远程服务器从文件中提取要素场,保存为一个临时文件后,本地服务器再下载,就会显著减少数据传输量。 这也是目前单位正在搭建的数据平台使用的方法。
下面首先介绍如何获取 GRIB 2 消息的原始字节流。
获取 GRIB 2 消息字节流
如果已经从文件中加载 GRIB 2 消息,可以使用 eccodes-python 库提供 eccodes.codes_get_message
函数返回 GRIB 2 消息的原始字节数据。
raw_bytes = eccodes.codes_get_message(message_id)
这种方式适合手动查找要素场的场景,需要遍历 GRIB 2 文件,找到符合条件的 GRIB 2 消息,每次运行都需要对文件进行解码。
如果之前已对文件进行一次解码,可以保留每个消息对应的起始字节的偏移量和消息长度,直接从文件加载 GRIB 2 消息。 这也是单位数据平台的核心功能。
下面的函数仅利用偏移量信息offset
,从文件file_path
中直接加载并返回对应的 GRIB 2 消息。
@attr.s
class GribMessageIndex(object):
offset = attr.ib()
length = attr.ib()
file_path = attr.ib()
def load_bytes_from_index(index: GribMessageIndex) -> bytes or None:
with open(index.file_path, "rb") as f:
f.seek(index.offset)
message = eccodes.codes_grib_new_from_file(f)
if message is None:
return None
raw_bytes = eccodes.codes_get_message(message)
return raw_bytes
如果同时使用偏移量和消息长度length
,则可以直接读取文件返回字节流,无需使用 eccodes-python 。
从字节流中加载 GRIB 2 消息
获取到 GRIB 2 消息的字节流后,可以使用 eccodes.codes_new_from_message
函数从字节流 bytes
中加载 GRIB 2 消息。
message = eccodes.codes_new_from_message(raw_message)
message
与从文件中创建的 GRIB 2 handler 类型一样,支持同样的操作。
远程传输 GRIB 2 消息内容
下载抽取后的单个要素场文件不一定是最佳的选择。 如果只需要在程序中读取要素场内容,不需要保存成本地文件,那么下载单个要素场文件会在服务器和客户端产生两次额外的磁盘IO。
远程传输 GRIB 2 消息的字节流可以省去服务端写入和客户端读取的磁盘IO操作。
笔者使用 Flask 搭建一个简单的 Web 服务,根据请求返回对应的 GRIB 2 消息字节流。 请求和响应都使用 JSON 格式,但JSON无法直接使用字节流数据。
笔者使用 protobuf 库将字节流序列化为 JSON 字符串,可以直接传输。
下面是一个简单的proto文件,消息RawField
中只有一个成员,即 GRIB 2 消息的原始字节流 raw_bytes
。
syntax = "proto2";
package transport;
message RawField {
required bytes raw_bytes = 1;
}
使用 google.protobuf.json_format
模块实现 protobuf 对象和 JSON 字符串之间的相互转换。
from google.protobuf import json_format
from .transport_pb2 import RawField
def convert_to_json(raw_field):
return json_format.MessageToJson(raw_field)
def load_from_json(json_string, cls):
return json_format.Parse(json_string, cls())
服务器端使用 convert_to_json
将字节流转换为字符串,客户端使用 load_from_json
解析出原始的字节流。
详细实现请参考 nwpc-oper/nwpc-gdata 项目。