使用kafka-go连接Kafka
目录
本文属于介绍 NWPC 消息平台 系列文章。
本文介绍如何使用 kafka-go 库提供的高层 API 接口连接 Kafka。
准备
导入库
import "github.com/segmentio/kafka-go"
发送消息
使用 kafka.Writer
向 Kafka 服务器发送消息。
创建 Writer
w := kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
WriteTimeout: writeTimeout,
}
其中:
brokers
是 Kafka 服务地址列表topic
是主题writeTimeout
是超时时间
发送消息,如果发送失败,err
不为空
err := w.WriteMessages(
context.Background(),
kafka.Message{
Value: message,
},
)
其中 message
是消息字节流,类型是 []byte
。
消息发送完毕后,手动关闭 Writer
w.Close()
接收消息
使用 kafka.Reader
从 Kafka 服务器接收消息。
创建 Reader
r = kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupId,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
其中 groupId
是 consumer group 名称,每条消息只会向同一个 group 中某个 consumer 发送一次。
因为接收消息的程序要持续运行,所以使用 defer
语句执行关闭操作
defer r.Close()
使用无限 for
循环读取并打印每条消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
log.Printf("%s", string(m.Value))
}
应用
nwpc-oper/nmc-message-client 项目已对 NMC 统一业务监控平台中的数值预报产品消息开发消息发送和接收程序。
运行流程如下图所示:
- 每个产品分发任务会通过
Writer
向 Kafka 服务发送一条消息 - 使用
Reader
持续从 kafka 服务读取消息并处理
目前 NMC 统一业务监控平台的 Kafka 服务由三个节点构成。
日志消息发送到 monitor
主题,描述信息如下:
Topic: monitor PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: monitor Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: monitor Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: monitor Partition: 2 Leader: 0 Replicas: 0 Isr: 0
nmc-message-client 项目从 Kafka 中读取消息,将数值预报产品消息保存到 ElasticSearch 中。 详情请参考如下文章:
《使用Bulk API向ElasticSearch发送数据》