ecflow学习笔记:节点状态监控工具V2

目录

本文介绍使用 ecFlow C++ API 构建的节点状态监控工具。

背景

之前的文章《ecflow学习笔记:节点状态监控工具》介绍使用ecflow的Go客户端开发节点状态监控工具。

经过一个多月的连续运行,发现该程序有严重的内存泄漏问题。虽然经过修改,解决了部分问题,但内存泄漏依然存在。

该程序使用 SWIG 连接 ecflow c++ 接口,从 c++ 接口返回字符串给 goroutine,将信息组织成 json 字符串,再发送给 redis 数据库。

内存泄漏是因为字符串没有被删除,包括 SWIG 接口返回的字符串,和构造的JSON字符串。虽然inuse_space很小,但alloc_space的内存似乎没有被释放。pprof的图示如下图所示:

ecflow_watchman 内存占用图示

可以看到,两个部分占用内存基本相等,可能由同一个问题导致。查看内存占用的代码段:

可以看到,从ecflow获取的状态字符串是内存消耗的主要原因。但程序实际使用的内存较少,不知道为什么GC没有回收掉不用的内存。后面会进一步分析原因。

既然使用 SWIG 的程序有这样的问题,还不如直接用 C++ 实现。 下面介绍基于 nwpc-oper/ecflow-client-cpp 构建的节点状态监控工具V2。

程序结构

新版的ecflow-watchman与老板流程一致。从命令行参数指定的配置文件中读取任务列表,启动多个线程(v1版为goroutine)同时获取多个ecflow服务的状态,并保存到redis数据库中。 程序结构示意图如下图所示。

ecflow-watchman程序结构

实现

下面介绍几个关键部分的实现。

多线程

v1版本中使用GO语言提供的goroutine以协程的方式实现同时监听多个ecflow服务,但 C++ 没有内置的协程库(C++20标准据说有协程库),所以考虑使用多线程来替代。

v2版本使用 c++ 的异步函数 std::async 运行子线程函数。子线程中使用无限循环,定时获取ecflow服务的状态。 主线程执行 std::async 函数的返回对象std::future的 get() 方法达到阻塞主线程的效果。

std::vector<std::future< void >> threads;

for (const auto &task_config: tasks) {
    threads.push_back(std::async(std::launch::async, [&storer, &task_config](){
        // ...skip some codes...
    }));
}

for(auto &future: threads) {
    future.get();
}

读取YAML配置文件

ecflow-watchman 的 watch-all 命令从YAML格式的配置文件中读取任务信息。

使用yaml-cpp解析YAML。

从下面的YAML文件读取配置信息。

sink_config:
  type: redis
  url: 127.0.0.1:6379

使用下面的代码获取Redis的URL。

YAML::Node config = YAML::LoadFile(options.config_file_path);

const auto redis_config = config["sink_config"];
const auto redis_url = redis_config["url"].as<std::string>();

生成JSON字符串

保存到Redis数据库的是JSON字符串,本项目使用nlohmann/json.hpp生成JSON字符串。

nlohmann::json value_json;

value_json["status_records"] = nlohmann::json::array();
for (const auto &record: records) {
    nlohmann::json record_json;
    record_json["path"] = record.path_;
    record_json["status"] = record.status_;
    value_json["status_records"].push_back(record_json);
}

保存到Reids

使用sewenew/redis-plus-plus将系统状态保存到Redis数据库。

该库支持多线程,所以只需要创建一个client,所有子线程都可以使用。

ConnectionOptions connection_options;
connection_options.host = redis_host_;
connection_options.port = redis_port_;
connection_options.password = password_;
connection_options.db = db_;

connection_options.socket_timeout = 1s;

client_ = std::make_unique<Redis>(connection_options);
client_->set(key, value);

参考

nwpc-oper/ecflow-client-cpp