rabbitmq执行长时间任务

目录

使用 pika 连接 rabbitmq 在执行长时间任务时,会出现下面的错误:

pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")

原因是 rabbitmq 会关闭长时间没有通信的 pika 连接。
网上说可以通过将 heartbeat 设为 0,关闭 rabbitmq 的心跳检测,但我试了下没有效果,看来还是应该手动执行 heartbeat。
看文档发现 pika 的 connection 有 process_data_events 方法,类似 heartbeat 操作,可以保持与 rabbitmq 的通信。在执行长时间任务时,定时调用 process_data_events 方法,就不会丢失连接。

实现

在 consumer 程序的子线程中执行长时间的任务,在主线程中定时检测子线程是否结束,如果没结束,则调用 process_data_events 方法。代码如下:

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host=host,
        port=port
    )
)

channel = connection.channel()
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="topic"
)
channel.queue_declare(
    queue=queue_name
)
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)

def consume_message(ch, method, properties, body):
    logger.info('receive new message')
    message_string = body.decode('utf-8')
    message = json.loads(message_string)
    message_thread = threading.Thread(target=process_message, args=(message, config))
    message_thread.start()
    while message_thread.is_alive():
        time.sleep(10)
        connection.process_data_events()
        # logger.info("waiting for message thread...")
    logger.info("message thread done")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    consume_message,
    queue=queue_name,
    no_ack=True
)

try:
    logger.info("starting receiving message...")
    channel.start_consuming()
except KeyboardInterrupt as e:
    logger.info(e)
finally:
    logger.info("Warm shutdown...")
    connection.close()
    logger.info("Warm shutdown...Done")