Kafka应用:批量接收消息保存到MySQL中
项目需要从Kafka中接收消息,并保存到MySQL数据库中。
最初的程序每接收到一条消息,就将该消息保存到数据库中。
for message in consumer:
content = json.loads(message.value)
a_record = record_table()
session.add(a_record)
session.commit()每条消息都会申请一个数据库连接,执行一个事务。面对大量的消息,处理速度太慢,效率太低。
一种可行的解决方法是使用流处理系统批量添加数据到数据库。在流处理系统中,每隔一定时间或每接收到一定量的数据,都会启动处理程序。但我对流处理系统不熟悉,只好借用里面的思想,模拟一个简单的批量程序。
先将消息读入到一个数组中,当数组中数据个数大于某个阈值时,批量将数据添加到数据库中。如下所示
MAX_COMMIT_TO_DB_COUNT = 500
for message in consumer:
content = json.loads(message.value)
a_record = record_table()
record_object_list.append(a_record)
if len(record_object_list) >= MAX_COMMIT_TO_DB_COUNT:
for a_record in record_object_list:
session.add(a_record)
session.commit()
record_object_list = []但应该考虑到某些特殊情况,比如插入重复数据、退出时对尚未保存的数据的处理等。需要增加对异常的处理。
record_object_list = []
print "Receiving messages from Kafka..."
try:
for message in consumer:
content = json.loads(message.value)
a_record = record_table()
record_object_list.append(a_record)
if len(record_object_list) >= MAX_COMMIT_TO_DB_COUNT:
print "commit records...",
for a_record in record_object_list:
session.add(a_record)
try:
session.commit()
record_object_list = []
consumer.commit()
except IntegrityError, e:
print e
orig = e.orig
if orig.args[0] == 1062:
# IntegrityError(1062, "Duplicate entry '7-1-6644519' for key 'repo_version_line_index'")
params = e.params
local_repo_id = params[0]
local_version_id = params[1]
local_line_no = params[2]
# find record and del
local_record_index = -1
for local_index in range(0,len(record_object_list)):
local_record = record_object_list[local_index]
if local_record.repo_id == local_repo_id \
and local_record.version_id == local_version_id \
and local_record.line_no == local_line_no :
local_record_index = local_index
break
if local_record_index == -1:
raise
print "delete line no:", record_object_list[local_record_index].line_no
del record_object_list[local_record_index]
else:
session.rollback()
raise
session.rollback()
except OperationalError, e:
print e
orig = e.orig
if orig.args[0] == 2006:
# connection lost
# OperationalError (2006, 'MySQL server has gone away')
session.rollback()
print "reconnecting database..."
create_mysql_engine()
print "add records in next term"
continue
else:
raise
finally:
print "Done"
except KeyboardInterrupt, e:
print "Catch Keyboard interrupt"
print "Start warm shutdown..."
# TODO: check if session need rollback
for a_record in record_object_list:
try:
session.add(a_record)
session.commit()
except IntegrityError, e:
print e
orig = e.orig
if orig.args[0] == 1062:
# IntegrityError(1062, "Duplicate entry '7-1-6644519' for key 'repo_version_line_index'")
continue
else:
session.rollback()
raise
except OperationalError, e:
print e
orig = e.orig
if orig.args[0] == 2006:
# connection lost
# OperationalError (2006, 'MySQL server has gone away')
session.rollback()
print "reconnecting database..."
create_mysql_engine()
print "add records in next term"
continue
else:
raise
print "Warm shutdown...Done"
consumer.close()当然,上面的程序仍有许多问题,无法和流处理系统比较。后面我会考虑使用流处理系统替换上面的程序。
