rabbitmq执行长时间任务
目录
使用 pika 连接 rabbitmq 在执行长时间任务时,会出现下面的错误:
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
原因是 rabbitmq 会关闭长时间没有通信的 pika 连接。
网上说可以通过将 heartbeat 设为 0,关闭 rabbitmq 的心跳检测,但我试了下没有效果,看来还是应该手动执行 heartbeat。
看文档发现 pika 的 connection 有 process_data_events 方法,类似 heartbeat 操作,可以保持与 rabbitmq 的通信。在执行长时间任务时,定时调用 process_data_events 方法,就不会丢失连接。
实现
在 consumer 程序的子线程中执行长时间的任务,在主线程中定时检测子线程是否结束,如果没结束,则调用 process_data_events 方法。代码如下:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
port=port
)
)
channel = connection.channel()
channel.exchange_declare(
exchange=exchange_name,
exchange_type="topic"
)
channel.queue_declare(
queue=queue_name
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key
)
def consume_message(ch, method, properties, body):
logger.info('receive new message')
message_string = body.decode('utf-8')
message = json.loads(message_string)
message_thread = threading.Thread(target=process_message, args=(message, config))
message_thread.start()
while message_thread.is_alive():
time.sleep(10)
connection.process_data_events()
# logger.info("waiting for message thread...")
logger.info("message thread done")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
consume_message,
queue=queue_name,
no_ack=True
)
try:
logger.info("starting receiving message...")
channel.start_consuming()
except KeyboardInterrupt as e:
logger.info(e)
finally:
logger.info("Warm shutdown...")
connection.close()
logger.info("Warm shutdown...Done")