使用Flask SocketIO实现WebSocket
使用 HTML 5 的 WebSocket 实现实时交互通信功能,替代 Ajax 轮训等方法,个人觉得比较适合实时监控类网站。 我尝试使用 Flask-SocketIO 实现支持 WebSocket 的服务器端,使用 socket.io 库实现客户端。
一个简单的 Flask-SocketIO 应用
仅完成就简单的通讯功能。
服务器端
from flask import Flask, render_template
from flask.ext.socketio import SocketIO, emit
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'windroc-nwpc-project'
socketio = SocketIO(app)
@app.route('/')
def get_index_page():
return render_template('index.html')
@socketio.on('connect', namespace='/test')
def test_connect():
emit('my response', {'data': 'Connected', 'count': 0})
@socketio.on('my event', namespace='/test')
def test_message(message):
emit('my response', {'data': message['data'], 'count': 2})
if __name__ == "__main__":
socketio.run(app, host='0.0.0.0', port=5101)
创建 Flask-SocketIO 应用的方法与普通的 Flask 应用相似。
app = Flask(__name__)
socketio = SocketIO(app)
开发时简单运行的方法也与 Flask 应用相同。
if __name__ == "__main__":
socketio.run(app, host='0.0.0.0', port=5101)
SocketIO 使用 event
表示客户端和服务端接受到的消息。后面可以看到,客户端使用 Javascript 回调函数处理。服务器端使用类似视图的路由注册函数来处理事件。
@socketio.on('connect', namespace='/test')
def test_connect():
emit('my response', {'data': 'Connected', 'count': 0})
@socketio.on('my event', namespace='/test')
def test_message(message):
emit('my response', {'data': message['data'], 'count': 2})
客户端
使用 socket.io 的客户端 js 库。
监听 socket 事件的 js 代码:
$(document).ready(function () {
var socket = io.connect('//127.0.0.1:5101/test');
socket.on('connect', function() {
socket.emit('my event', {data: 'I\'m connected!'});
});
});
当 socket 连接成功时,向服务器发送 my event 事件,上一节的服务端使用 test_message 函数处理 my event 事件,向客户端发送 my response 事件。
下面看一个稍微复杂一些的应用,实现一个有实际价值的功能。
一个简单的实时监控应用
单位服务器使用 LoadLeveler 管理任务调度,绝大部分作业脚本都会提交到 LoadLeveler 上运行,很多时候需要运维使用 llq
命令查看 LoadLeveler 队列中的任务状态。或许可以开发一种可以实时获取信息的工具,以更直观的方式展示数据。
从最简单入手,实时获取 LoadLeveler 队列中任务数量和每种状态的任务数量。
数据生成
一个典型的 llq 输出如下:
Id Owner Submitted ST PRI Class Running On
------------------------ ---------- ----------- -- --- ------------ -----------
cma20n01.1737149.0 nwp_qu 5/11 03:20 R 100 serial_op cma18n05
cma20n02.2425650.0 nwp_qu 5/11 03:20 R 100 serial_op cma18n07
...
cma20n01.1738085.0 bcccsmxp 5/11 06:41 I 50 special2
...
149 job step(s) in queue, 57 waiting, 0 pending, 92 running, 0 held, 0 preempted
其中...
表示省略的条目。上述要求其实就是解析 llq 输出的最后一行,可以使用正则表达式解析。
total_pattern = "^(\d+) job step\(s\) in queue, (\d+) waiting, (\d+) pending, (\d+) running, (\d+) held, (\d+) preempted"
total_prog = re.compile(total_pattern)
total_prog_result = total_prog.match(result_lines[-2])
llq_summary = dict()
llq_summary['in_queue'] = total_prog_result.group(1)
llq_summary['waiting'] = total_prog_result.group(2)
llq_summary['pending'] = total_prog_result.group(3)
llq_summary['running'] = total_prog_result.group(4)
llq_summary['held'] = total_prog_result.group(6)
llq_summary['preempted'] = total_prog_result.group(5)
另外,该程序不在单位的服务器上运行,需要用 ssh 连接到服务器。我使用 paramiko 库执行远程命令。
bin_path = 'llq'
bin_param = ''
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, port, username, password)
ssh_command = bin_path + ' ' + bin_param
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(ssh_command)
result_lines = ssh_stdout.read().split("\n")
ssh.close()
except paramiko.SSHException, e:
print e
如此可以得到数据。
应用结构
socket 服务器接受外界传来的 llq 队列消息,并将该消息发送给连接到 socket 服务器的所有客户端。
因为对 socket 不够了解,我还是使用传统的 HTTP 方式传送消息到服务器,再使用 Flask-SocketIO 发送消息到客户端。
服务器端
添加一个 http 路由函数,Flask-SocketIO 对象可以使用 emit 函数直接向客户端发送消息。
@app.route('/api/v1/hpc/llq/info', methods=['POST'])
def get_hpc_llq_info():
r = request
hpc_llq_info_message = json.loads(request.form['message'])
print "Receive llq info:", hpc_llq_info_message
socketio.emit('send_llq_info', hpc_llq_info_message, namespace='/hpc')
result = {
'status': 'ok'
}
return jsonify(result)
数据收集
每隔一段时间收集一次数据:
while True:
quota_result = hpcloadleveler.get_llq(hostname, port, username, password)
logging.info(quota_result['total'])
post_data = {
'message': json.dumps(quota_result['total'])
}
requests.post(post_url, data=post_data)
time.sleep(2)
客户端
接受数据后,更新界面显示
var socket = io.connect('//127.0.0.1:5101/hpc');
socket.on('connect', function() {
console.log('I\'m connected!');
});
socket.on('send_llq_info', function(msg){
/*console.log(msg);*/
var total = parseInt(msg.in_queue);
var waiting = parseInt(msg.waiting);
var held = parseInt(msg.held);
var running = parseInt(msg.running);
var pending = parseInt(msg.pending);
var preempted = parseInt(msg.preempted);
var llq_info = [
{name: 'total', value: total},
{name: 'waiting', value: waiting},
{name: 'pending', value: pending},
{name: 'running', value: running},
{name: 'held', value: held},
{name: 'preempted', value: preempted}
];
update_llq_type_chart(llq_info);
$('#total_llq_job_number').html(total);
$('#waiting_llq_job_number').html(waiting);
$('#held_llq_job_number').html(held);
$('#running_llq_job_number').html(running);
$('#pending_llq_job_number').html(pending);
$('#preempted_llq_job_number').html(preempted);
});
网页如下所示