ZeroMQ使用小记

目录

前言

最近在开发一个离线日志收集系统,考虑如何高效收集信息。
当前,我读取其它系统生成的日志文件,插入到mysql数据库中。这样性能太差,只适合获取离线日志,无法构建在线日志系统。实时日志系统需要考虑并发性、高效率,不能每添加一条日志都建立数据库连接,执行一条insert语句,当大量任务同时写日志时,数据库就会成为性能瓶颈。虽然我们目前并发量不大,直接连接数据库也没有性能问题,但我们无法回避另一个问题:大数据量时的mysql查询效率。
当前我将所有原始日志保存在mysql的数据表中,访问统计数据时用SQL语句查询。我使用个人工作电脑搭建整个开发环境,日志条目也就在百万级,就遇到严重的性能问题:查询结果返回过慢,有时干脆连接超时,交互体验很差。网上查过一些资料,MySQL完全可以胜任百万级别的数据量,但目前我开发环境没法显著提升,除非进行过期清除,否则很难将所有原始日志保存在数据库而不影响性能。我一直在考虑自动生成并记录统计信息,而不是目前这样每次访问都在线生成,这样会大幅度提高页面响应速度。不过,我有点儿跑偏,这点与本文关系不大,本文主要关注数据收集,而不是之后的数据应用。
想到使用消息队列来做数据收集的中介。见下图

采用消息队列可以支持多个后端应用,比如日志分析和系统监控。
之前看到有个项目使用ZMQ作消息队列,也听说过ZeroMQ是一种简单、高速的消息队列,同时发现ZeroMQ可以在AIX下编译通过,就尝试使用ZeroMQ改进目前我的日志收集系统,不过并不成功。下面就是我使用ZeroMQ的小记。

准备阶段

编译ZeroMQ
安装ZeroMQ的Python接口
以上两项均不需要特殊修改就可以在AIX上编译通过。

参考教程

官网的《ZeroMQ – The Guide
指南项目及示例代码:https://github.com/imatix/zguide
安居客一位工程师翻译版《ZeroMQ指南

第一个示例

我从读ZeroMQ指南中文版开始接触ZeroMQ,改写了一个最简单的示例:接收消息返回World。
服务器端

import time
import json
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
    #  Wait for next request from client
    message = socket.recv()
    request_body = json.loads(message)
    print("Received request: %s" % message)
    #  Do some 'work'
    time.sleep(0.001)
    #  Send reply back to client
    socket.send(b"World")

客户端

import zmq
import json
context = zmq.Context()
#  Socket to talk to server
print("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s ..." % request)
    request_body = {
        'command': 'list',
        'type': 'command'
    }
    socket.send(json.dumps(request_body))
    #  Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

先运行服务器端,再运行客户端,server的运行结果:

$ python server.py
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}
Received request: {"type": "command", "command": "list"}

client的运行结果:

$ python client.py
Connecting to hello world server...
Sending request 0 ...
Received reply 0 [ World ]
Sending request 1 ...
Received reply 1 [ World ]
Sending request 2 ...
Received reply 2 [ World ]
Sending request 3 ...
Received reply 3 [ World ]
Sending request 4 ...
Received reply 4 [ World ]
Sending request 5 ...
Received reply 5 [ World ]
Sending request 6 ...
Received reply 6 [ World ]
Sending request 7 ...
Received reply 7 [ World ]
Sending request 8 ...
Received reply 8 [ World ]
Sending request 9 ...
Received reply 9 [ World ]

这是最简单的socket通信。
之后尝试了几个指南第一章提到的例子,粗略看了下第二章,后面的三至五章没看。

改写日志收集数据流

以目前掌握的知识写一个基于ZeroMQ的数据收集系统,但不太成功。
上面配图中给出我的需求,多个app产生日志信息,并有多个订阅者接收日志信息。采用下图的结构

app和collector之间用REQ-REP连接,collector和customer之间用PUB-SUB连接。app将产生的日志发送给collector,collector将日志不加改动地转发到PUB端口中,发送给各个customer,customer接收到日志消息后,进一步处理。
这样设计结构主要针对慢速订阅者(即将日志保存进mysql),但ZeroMQ没有消息持久化,无法确保消息到达,测试时也发现慢速订阅者有消息丢失现象,使该结构成为鸡肋,没达到预期效果。ZeroMQ或许有机制来处理慢速订阅者,有待后续研究。
PS. 具体代码以后再贴,睡觉去了。