Kafka应用:批量接收消息保存到MySQL中

目录

项目需要从Kafka中接收消息,并保存到MySQL数据库中。
最初的程序每接收到一条消息,就将该消息保存到数据库中。

for message in consumer:
    content = json.loads(message.value)
    a_record = record_table()
    session.add(a_record)
    session.commit()

每条消息都会申请一个数据库连接,执行一个事务。面对大量的消息,处理速度太慢,效率太低。
一种可行的解决方法是使用流处理系统批量添加数据到数据库。在流处理系统中,每隔一定时间或每接收到一定量的数据,都会启动处理程序。但我对流处理系统不熟悉,只好借用里面的思想,模拟一个简单的批量程序。
先将消息读入到一个数组中,当数组中数据个数大于某个阈值时,批量将数据添加到数据库中。如下所示

MAX_COMMIT_TO_DB_COUNT = 500
for message in consumer:
    content = json.loads(message.value)
    a_record = record_table()
    record_object_list.append(a_record)
    if len(record_object_list) >= MAX_COMMIT_TO_DB_COUNT:
        for a_record in record_object_list:
            session.add(a_record)
        session.commit()
        record_object_list = []

但应该考虑到某些特殊情况,比如插入重复数据、退出时对尚未保存的数据的处理等。需要增加对异常的处理。

record_object_list = []
print "Receiving messages from Kafka..."
try:
    for message in consumer:
        content = json.loads(message.value)
        a_record = record_table()
        record_object_list.append(a_record)
        if len(record_object_list) >= MAX_COMMIT_TO_DB_COUNT:
            print "commit records...",
            for a_record in record_object_list:
                session.add(a_record)
            try:
                session.commit()
                record_object_list = []
                consumer.commit()
            except IntegrityError, e:
                print e
                orig = e.orig
                if orig.args[0] == 1062:
                    # IntegrityError(1062, "Duplicate entry '7-1-6644519' for key 'repo_version_line_index'")
                    params = e.params
                    local_repo_id = params[0]
                    local_version_id = params[1]
                    local_line_no = params[2]
                    # find record and del
                    local_record_index = -1
                    for local_index in range(0,len(record_object_list)):
                        local_record = record_object_list[local_index]
                        if local_record.repo_id == local_repo_id \
                            and local_record.version_id == local_version_id \
                            and local_record.line_no == local_line_no :
                            local_record_index = local_index
                            break
                    if local_record_index == -1:
                        raise
                    print "delete line no:", record_object_list[local_record_index].line_no
                    del record_object_list[local_record_index]
                else:
                    session.rollback()
                    raise
                session.rollback()
            except OperationalError, e:
                print e
                orig = e.orig
                if orig.args[0] == 2006:
                    # connection lost
                    # OperationalError (2006, 'MySQL server has gone away')
                    session.rollback()
                    print "reconnecting database..."
                    create_mysql_engine()
                    print "add records in next term"
                    continue
                else:
                    raise
            finally:
                print "Done"
except KeyboardInterrupt, e:
    print "Catch Keyboard interrupt"
    print "Start warm shutdown..."
    # TODO: check if session need rollback
    for a_record in record_object_list:
        try:
            session.add(a_record)
            session.commit()
        except IntegrityError, e:
            print e
            orig = e.orig
            if orig.args[0] == 1062:
                # IntegrityError(1062, "Duplicate entry '7-1-6644519' for key 'repo_version_line_index'")
                continue
            else:
                session.rollback()
                raise
        except OperationalError, e:
            print e
            orig = e.orig
            if orig.args[0] == 2006:
                # connection lost
                # OperationalError (2006, 'MySQL server has gone away')
                session.rollback()
                print "reconnecting database..."
                create_mysql_engine()
                print "add records in next term"
                continue
            else:
                raise
    print "Warm shutdown...Done"
consumer.close()

当然,上面的程序仍有许多问题,无法和流处理系统比较。后面我会考虑使用流处理系统替换上面的程序。