使用Flask SocketIO实现WebSocket

目录

使用 HTML 5 的 WebSocket 实现实时交互通信功能,替代 Ajax 轮训等方法,个人觉得比较适合实时监控类网站。我就尝试使用 Flask-SocketIO 实现支持 WebSocket 的服务器端,使用 socket.io 库实现客户端。

一个简单的 Flask-SocketIO 应用

仅完成就简单的通讯功能。

服务器端

from flask import Flask, render_template
from flask.ext.socketio import SocketIO, emit
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'windroc-nwpc-project'
socketio = SocketIO(app)
@app.route('/')
def get_index_page():
    return render_template('index.html')
@socketio.on('connect', namespace='/test')
def test_connect():
    emit('my response', {'data': 'Connected', 'count': 0})
@socketio.on('my event', namespace='/test')
def test_message(message):
    emit('my response', {'data': message['data'], 'count': 2})
if __name__ == "__main__":
    socketio.run(app, host='0.0.0.0', port=5101)

创建 Flask-SocketIO 应用的方法与 Flask 应用相似

app = Flask(__name__)
socketio = SocketIO(app)

开发时简单运行的方法也与不同的 Flask 应用相同。

if __name__ == "__main__":
    socketio.run(app, host='0.0.0.0', port=5101)

SocketIO 使用 event 表示客户端和服务端接受到的消息。后面可以看到,客户端使用 Javascript 回调函数处理。服务器端使用类似视图的路由注册函数来处理事件。

@socketio.on('connect', namespace='/test')
def test_connect():
    emit('my response', {'data': 'Connected', 'count': 0})
@socketio.on('my event', namespace='/test')
def test_message(message):
    emit('my response', {'data': message['data'], 'count': 2})

客户端

使用 socket.io 的客户端 js 库

监听 socket 事件的 js 代码:

$(document).ready(function () {
    var socket = io.connect('//127.0.0.1:5101/test');
    socket.on('connect', function() {
        socket.emit('my event', {data: 'I\'m connected!'});
    });
});

当 socket 连接成功时,向服务器发送 my event 事件,上一节的服务端使用 test_message 函数处理 my event 事件,向客户端发送 my response 事件。
下面看一个稍微复杂一些的应用,实现一个有实际价值的功能。

一个简单的实时监控应用

单位服务器使用 LoadLeveler 管理任务调度,绝大部分作业脚本都会提交到 LoadLeveler 上运行,很多时候需要运维使用 llq 命令查看 LoadLeveler 队列中的任务状态。或许可以开发一种可以实时获取信息的工具,以更直观的方式展示数据。
从最简单入手,实时获取 LoadLeveler 队列中任务数量和每种状态的任务数量。

数据生成

一个典型的 llq 输出如下:

Id                       Owner      Submitted   ST PRI Class        Running On
------------------------ ---------- ----------- -- --- ------------ -----------
cma20n01.1737149.0       nwp_qu      5/11 03:20 R  100 serial_op    cma18n05
cma20n02.2425650.0       nwp_qu      5/11 03:20 R  100 serial_op    cma18n07
...
cma20n01.1738085.0       bcccsmxp    5/11 06:41 I  50  special2
...
149 job step(s) in queue, 57 waiting, 0 pending, 92 running, 0 held, 0 preempted

其中…表示省略的条目。上述要求其实就是解析 llq 输出的最后一行,使用正则表达式解析。

total_pattern = "^(\d+) job step\(s\) in queue, (\d+) waiting, (\d+) pending, (\d+) running, (\d+) held, (\d+) preempted"
total_prog = re.compile(total_pattern)
total_prog_result = total_prog.match(result_lines[-2])
llq_summary = dict()
llq_summary['in_queue'] = total_prog_result.group(1)
llq_summary['waiting'] = total_prog_result.group(2)
llq_summary['pending'] = total_prog_result.group(3)
llq_summary['running'] = total_prog_result.group(4)
llq_summary['held'] = total_prog_result.group(6)
llq_summary['preempted'] = total_prog_result.group(5)

另外,该程序不在单位的服务器上运行,需要用 ssh 连接到服务器。我使用 paramiko 库执行远程命令。

bin_path = 'llq'
bin_param = ''
try:
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname, port, username, password)
    ssh_command = bin_path + ' ' + bin_param
    ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(ssh_command)
except paramiko.SSHException, e:
    print e
    return "{'error':'error'}"
result_lines = ssh_stdout.read().split("\n")
ssh.close()

如此可以得到数据。

应用结构

socket 服务器接受外界传来的 llq 队列消息,并将该消息发送给连接到 socket 服务器的所有客户端。因为对 socket 不够了解,我还是使用传统的 HTTP 方式传送消息到服务器,再使用 Flask-SocketIO 发送消息到客户端。

服务器端

添加一个 http 路由函数,Flask-SocketIO 对象可以使用 emit 函数直接向客户端发送消息。

@app.route('/api/v1/hpc/llq/info', methods=['POST'])
def get_hpc_llq_info():
    r = request
    hpc_llq_info_message = json.loads(request.form['message'])
    print "Receive llq info:", hpc_llq_info_message
    socketio.emit('send_llq_info', hpc_llq_info_message, namespace='/hpc')
    result = {
        'status': 'ok'
    }
    return jsonify(result)

数据收集

每隔一段时间收集一次数据:

while True:
    quota_result = hpcloadleveler.get_llq(hostname, port, username, password)
    logging.info(quota_result['total'])
    post_data = {
        'message': json.dumps(quota_result['total'])
    }
    requests.post(post_url, data=post_data)
    time.sleep(2)

客户端

接受数据后,更新界面显示

var socket = io.connect('//127.0.0.1:5101/hpc');
socket.on('connect', function() {
    console.log('I\'m connected!');
});
socket.on('send_llq_info', function(msg){
    /*console.log(msg);*/
    var total = parseInt(msg.in_queue);
    var waiting = parseInt(msg.waiting);
    var held = parseInt(msg.held);
    var running = parseInt(msg.running);
    var pending = parseInt(msg.pending);
    var preempted = parseInt(msg.preempted);
    var llq_info = [
        {name: 'total', value: total},
        {name: 'waiting', value: waiting},
        {name: 'pending', value: pending},
        {name: 'running', value: running},
        {name: 'held', value: held},
        {name: 'preempted', value: preempted}
    ];
    update_llq_type_chart(llq_info);
    $('#total_llq_job_number').html(total);
    $('#waiting_llq_job_number').html(waiting);
    $('#held_llq_job_number').html(held);
    $('#running_llq_job_number').html(running);
    $('#pending_llq_job_number').html(pending);
    $('#preempted_llq_job_number').html(preempted);
});

截图