ecflow学习笔记:节点状态监控工具V2
本文介绍使用 ecFlow C++ API 构建的节点状态监控工具。
背景
之前的文章《ecflow学习笔记:节点状态监控工具》介绍使用 ecflow 的 Go 客户端开发节点状态监控工具。
经过一个多月的连续运行,发现该程序有严重的内存泄漏问题。虽然经过修改,解决了部分问题,但内存泄漏依然存在。
该程序使用 SWIG 连接 ecflow c++ 接口,从 c++ 接口返回字符串给 goroutine,将信息组织成 json 字符串,再发送给 redis 数据库。
内存泄漏是因为字符串没有被删除,包括 SWIG 接口返回的字符串,和构造的JSON字符串。虽然 inuse_space 很小,但 alloc_space 的内存似乎没有被释放。 pprof 的图示如下图所示:
ecflow_watchman 内存占用图示
可以看到,两个部分占用内存基本相等,可能由同一个问题导致。查看内存占用的代码段:
ecflow_watchman 内存泄漏代码段
可以看到,从 ecflow 获取的状态字符串是内存消耗的主要原因。但程序实际使用的内存较少,不知道为什么 GC 没有回收掉不用的内存。后面会进一步分析原因。
既然使用 SWIG 的程序有这样的问题,还不如直接用 C++ 实现。 下面介绍基于 nwpc-oper/ecflow-client-cpp 构建的节点状态监控工具V2。
程序结构
新版的ecflow-watchman与老版流程一致。从命令行参数指定的配置文件中读取任务列表,启动多个线程(v1 版为 goroutine)同时获取多个 ecflow 服务的状态,并保存到 redis 数据库中。 程序结构示意图如下图所示。
ecflow-watchman程序结构
实现
下面介绍几个关键部分的实现。
多线程
v1 版本中使用 GO 语言提供的 goroutine 以协程的方式实现同时监听多个 ecflow 服务,但 C++ 没有内置的协程库(C++20 标准据说有协程库),所以考虑使用多线程来替代。
v2 版本使用 c++ 的异步函数 std::async
运行子线程函数。子线程中使用无限循环,定时获取 ecflow 服务的状态。
主线程执行 std::async
函数的返回对象std::future
的 get() 方法达到阻塞主线程的效果。
std::vector<std::future< void >> threads;
for (const auto &task_config: tasks) {
threads.push_back(std::async(std::launch::async, [&storer, &task_config](){
// ...skip some codes...
}));
}
for(auto &future: threads) {
future.get();
}
读取YAML配置文件
ecflow-watchman 的 watch-all 命令从 YAML 格式的配置文件中读取任务信息。
使用 yaml-cpp 解析 YAML。
从下面的 YAML 文件读取配置信息。
sink_config:
type: redis
url: 127.0.0.1:6379
使用下面的代码获取 Redis 的 URL。
YAML::Node config = YAML::LoadFile(options.config_file_path);
const auto redis_config = config["sink_config"];
const auto redis_url = redis_config["url"].as<std::string>();
生成JSON字符串
保存到 Redis 数据库的是 JSON 字符串,本项目使用 nlohmann/json.hpp 生成 JSON 字符串。
nlohmann::json value_json;
value_json["status_records"] = nlohmann::json::array();
for (const auto &record: records) {
nlohmann::json record_json;
record_json["path"] = record.path_;
record_json["status"] = record.status_;
value_json["status_records"].push_back(record_json);
}
保存到Reids
使用 sewenew/redis-plus-plus 将系统状态保存到 Redis 数据库。
该库支持多线程,所以只需要创建一个 client,所有子线程都可以使用。
ConnectionOptions connection_options;
connection_options.host = redis_host_;
connection_options.port = redis_port_;
connection_options.password = password_;
connection_options.db = db_;
connection_options.socket_timeout = 1s;
client_ = std::make_unique<Redis>(connection_options);
client_->set(key, value);