Python中的Kafka:使用kafka-python库

目录

介绍

项目:dpkp/kafka-python
kafka-python试用新版的kafka(0.10 或 0.9),也支持旧的版本(比如0.8.0)。实际使用中当我更新kafka-python后,原来的代码使用kafka 0.8.2会出错,所以最好将kafka升级到最新版本。

发送消息

使用KafkaProducer发送消息。

接收消息

使用KafkaComsuer接收消息。

consumer可迭代,当队列中没有消息时,上面代码会一直等待。使用Control+C可以退出循环。

退出循环的处理方法

Control+C 会产生 KeyboardInterrupt 异常,在代码中捕获该异常,就可以执行必要的退出操作。

踩过的坑

已经废弃的Simple APIs

新版kafka-python已经废弃的Simple APIs,包括KafkaClient和SimpleConsumer、SimpleProducer。

多次创建KafkaProducer产生的问题

我在一个Flask应用中提供一个API接口,该接口向Kafka发送一条消息。原来使用Simple API,在每个controller函数中创建一个SimpleProducer。切换到KafkaProducer后,依然在每个controller中创建新的KafkaProducer。如下所示:

但随后发生如下错误:

原因是每次创建KafkaProducer都会占用一个文件符号,controller结束时,没有释放,导致后面无法继续创建新的KafkaProducer。
解决方法是创建全局KafkaProducer,供所有controller使用。

慎用RecordMetadata.get()

官方例子中有如下的代码

KafkaProducer.send 返回 RecordMetadata 对象,RecordMetadata.get 可以获取 record 的信息。但在发送大量消息时,get方法可能会造成明显的延时。所以当我们不关心消息是否发送成功时,就不要调用get方法了。