1. 背景
在重 Kafka 的项目中,中间件提供的 Kafka 告警过于单一、无法多元配置化。
比如,无法根据不同的 topic 单独设置消息堆积告警阈值。而实际业务中,有些 topic 在某些时刻(如整点)堆积几十万条消息都属于正常的,但是某些 topic 在任何时刻堆积一条消息都是极其不正常的。
因此,需要根据自身的业务对不同的 topic 制定不同的告警策略和阈值。此时,需要在应用中实现 Consumer 的阻塞监听。
2. 使用
Spring 在整合 Kafka 之后,提供了对应的事件机制,只需要实现对应的接口即可。
消费者 Demo 代码示例:
1 | "applicationName", topics = KafkaConstant.TOPIC) (groupId = |
每次消费时线程休眠 3s,模拟业务逻辑异常耗时。
Kafka 消息阻塞事件监听器:
1 |
|
消费日志如下,中间有部分消费正常的日志省略。
2021-12-30 10:26:14.586 WARN 1825 —- [TaskScheduler-1] c.x.k.demo.event.ConsumerBlockListener : 消费者堵塞,堵塞时间:29928毫秒,partition:[xiaofa-0],source:KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-0, clientIndex=-0, topicPartitions=[xiaofa-0]]
2021-12-30 10:26:14.678 INFO 1825 —- [ntainer#0-0-C-1] c.x.kafka.demo.consumer.SpringConsumer : 消费消息,value:13,offset:423,record:ConsumerRecord(topic = xiaofa, partition = 0, leaderEpoch = 0, offset = 423, CreateTime = 1640831007825, serialized key size = 2, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = 13, value = 13)
……
2021-12-30 10:26:41.693 INFO 1825 —- [ntainer#0-0-C-1] c.x.kafka.demo.consumer.SpringConsumer : 消费消息,value:22,offset:432,record:ConsumerRecord(topic = xiaofa, partition = 0, leaderEpoch = 0, offset = 432, CreateTime = 1640831008744, serialized key size = 2, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = 22, value = 22)
2021-12-30 10:26:44.585 WARN 1825 —- [TaskScheduler-1] c.x.k.demo.event.ConsumerBlockListener : 消费者堵塞,堵塞时间:59928毫秒,partition:[xiaofa-0],source:KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-0, clientIndex=-0, topicPartitions=[xiaofa-0]]
如日志所示,每隔 30s 会打印一次消费堵塞。在实际项目中,可以解析出 topic、partition,针对不同的 topic,制定告警策略。
3. 原理
在 Kafka Consumer 容器启动的时候,会注册一系列的监听器,代码参见 org.springframework.kafka.listener.KafkaMessageListenerContainer#doStart 中
1 | this.listenerConsumer = new ListenerConsumer(this.listener, listenerType); |
其中一个监听任务为 checkConsumer,很明显是监听和 Consumer 相关的。
1 | this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> checkConsumer(), |
该定时任务会每隔 30s 执行一次,因为 monitorInterval 的值默认 30s,正好和实际效果一致。
1 | /** |
实际的 checkConsumer 方法如下:
1 | protected void checkConsumer() { |
当 poll 之间间隔达到阈值时,就会发送 NonResponsiveConsumerEvent 事件。