使用kafka-go连接Kafka

目录

本文属于介绍 NWPC 消息平台 系列文章。

本文介绍如何使用 kafka-go 库提供的高层 API 接口连接 Kafka。

准备

导入库

import "github.com/segmentio/kafka-go"

发送消息

使用 kafka.Writer 向 Kafka 服务器发送消息。

创建 Writer

w := kafka.Writer{
	Addr:         kafka.TCP(brokers...),
	Topic:        topic,
	Balancer:     &kafka.LeastBytes{},
	WriteTimeout: writeTimeout,
}

其中:

  • brokers 是 Kafka 服务地址列表
  • topic 是主题
  • writeTimeout 是超时时间

发送消息,如果发送失败,err 不为空

err := w.WriteMessages(
    context.Background(),
	kafka.Message{
		Value: message,
	},
)

其中 message 是消息字节流,类型是 []byte

消息发送完毕后,手动关闭 Writer

w.Close()

接收消息

使用 kafka.Reader 从 Kafka 服务器接收消息。

创建 Reader

r = kafka.NewReader(kafka.ReaderConfig{
	Brokers:   brokers,
	Topic:     topic,
	GroupID:   groupId,
	MinBytes:  10e3, // 10KB
	MaxBytes:  10e6, // 10MB
})

其中 groupId 是 consumer group 名称,每条消息只会向同一个 group 中某个 consumer 发送一次。

因为接收消息的程序要持续运行,所以使用 defer 语句执行关闭操作

defer r.Close()

使用无限 for 循环读取并打印每条消息

for {
	m, err := r.ReadMessage(context.Background())
	if err != nil {
		break
	}
    log.Printf("%s", string(m.Value))
}

应用

nwpc-oper/nmc-message-client 项目已对 NMC 统一业务监控平台中的数值预报产品消息开发消息发送和接收程序。

运行流程如下图所示:

  • 每个产品分发任务会通过 Writer 向 Kafka 服务发送一条消息
  • 使用 Reader 持续从 kafka 服务读取消息并处理

使用 kafka-go 连接 NMC 监控平台的 Kafka 服务

目前 NMC 统一业务监控平台的 Kafka 服务由三个节点构成。 日志消息发送到 monitor 主题,描述信息如下:

Topic: monitor  PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: monitor  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: monitor  Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: monitor  Partition: 2    Leader: 0       Replicas: 0     Isr: 0

nmc-message-client 项目从 Kafka 中读取消息,将数值预报产品消息保存到 ElasticSearch 中。 详情请参考如下文章:

使用Bulk API向ElasticSearch发送数据

参考

nwpc-oper/nmc-message-client

nwpc-oper/nwpc-message-client