Python中的Kafka:使用kafka-python库
介绍
项目:dpkp/kafka-python
kafka-python试用新版的kafka(0.10 或 0.9),也支持旧的版本(比如0.8.0)。实际使用中当我更新kafka-python后,原来的代码使用kafka 0.8.2会出错,所以最好将kafka升级到最新版本。
发送消息
使用KafkaProducer发送消息。
from kafka import KafkaProducer
kafka_host='127.0.0.1' # host
kafka_port=9092 # port
producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
kafka_host=kafka_host,
kafka_port=kafka_port
)])
message_string = 'some message'
response = producer.send(kafka_topic, message_string.encode('utf-8'))接收消息
使用KafkaComsuer接收消息。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'collector',
group_id='my-group',
bootstrap_servers=['{kafka_host}:{kafka_port}'.format(kafka_host=KAFKA_HOST, kafka_port=KAFKA_PORT)]
)
for message in consumer:
content = json.loads(message.value)
# ...
consumer可迭代,当队列中没有消息时,上面代码会一直等待。使用Control+C可以退出循环。
退出循环的处理方法
Control+C 会产生 KeyboardInterrupt 异常,在代码中捕获该异常,就可以执行必要的退出操作。
try:
for message in consumer:
content = json.loads(message.value)
#...
except KeyboardInterrupt, e:
print "Catch Keyboard interrupt"
#...踩过的坑
已经废弃的Simple APIs
新版kafka-python已经废弃的Simple APIs,包括KafkaClient和SimpleConsumer、SimpleProducer。
kafka = KafkaClient("{kafka_host}:{kafka_port}".format(kafka_host=KAFKA_HOST, kafka_port=KAFKA_PORT))
consumer = SimpleConsumer(kafka, kafka_group, kafka_topic)
for message in consumer:
# continue
content = json.loads(message.message.value)
timestamp = content['timestamp']
t = datetime.datetime.fromtimestamp(timestamp)
print t.strftime("%Y-%m-%d %H:%M:%S"), content['app'], content['data']['message']多次创建KafkaProducer产生的问题
我在一个Flask应用中提供一个API接口,该接口向Kafka发送一条消息。原来使用Simple API,在每个controller函数中创建一个SimpleProducer。切换到KafkaProducer后,依然在每个controller中创建新的KafkaProducer。如下所示:
try:
producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
kafka_host=app.config['KAFKA_HOST'],
kafka_port=app.config['KAFKA_PORT']
)])
message_string = json.dumps(message)
response = producer.send(kafka_topic, message_string.encode('utf-8'))
producer.close()但随后发生如下错误:
Traceback (most recent call last): File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1836, in __call__ File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1403, in handle_exception File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request File "/vagrant/nwpc_log_data_pipeline/producer_agent/agent/agent_controller.py", line 49, in get_collector_log File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 272, in __init__ File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 129, in __init__ File "/usr/lib/python2.7/site-packages/kafka/selectors34.py", line 422, in __init__ IOError: [Errno 24] Too many open files
原因是每次创建KafkaProducer都会占用一个文件符号,controller结束时,没有释放,导致后面无法继续创建新的KafkaProducer。
解决方法是创建全局KafkaProducer,供所有controller使用。
慎用RecordMetadata.get()
官方例子中有如下的代码
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
passKafkaProducer.send 返回 RecordMetadata 对象,RecordMetadata.get 可以获取 record 的信息。但在发送大量消息时,get方法可能会造成明显的延时。所以当我们不关心消息是否发送成功时,就不要调用get方法了。
