Fork me on GitHub

Kafka消费阻塞事件

1. 背景

在重 Kafka 的项目中,中间件提供的 Kafka 告警过于单一、无法多元配置化。

比如,无法根据不同的 topic 单独设置消息堆积告警阈值。而实际业务中,有些 topic 在某些时刻(如整点)堆积几十万条消息都属于正常的,但是某些 topic 在任何时刻堆积一条消息都是极其不正常的。

因此,需要根据自身的业务对不同的 topic 制定不同的告警策略和阈值。此时,需要在应用中实现 Consumer 的阻塞监听。

2. 使用

Spring 在整合 Kafka 之后,提供了对应的事件机制,只需要实现对应的接口即可。

消费者 Demo 代码示例:

1
2
3
4
5
6
@KafkaListener(groupId = "applicationName", topics = KafkaConstant.TOPIC)
public void consumer(ConsumerRecord<String, String> record) throws InterruptedException {
logger.info("消费消息,value:{},offset:{},record:{}", record.value(), record.offset(), record.toString());
// 模拟消费堵塞
Thread.sleep(3 * 1000L);
}

每次消费时线程休眠 3s,模拟业务逻辑异常耗时。

Kafka 消息阻塞事件监听器:

1
2
3
4
5
6
7
8
9
10
11
@Component
public class ConsumerBlockListener implements ApplicationListener<NonResponsiveConsumerEvent> {

private Logger logger = LoggerFactory.getLogger(ConsumerBlockListener.class);

@Override
public void onApplicationEvent(NonResponsiveConsumerEvent event) {
logger.warn("消费者堵塞,堵塞时间:{}毫秒,partition:{},source:{}",
event.getTimeSinceLastPoll(), event.getTopicPartitions(), event.getSource().toString());
}
}

消费日志如下,中间有部分消费正常的日志省略。

2021-12-30 10:26:14.586 WARN 1825 —- [TaskScheduler-1] c.x.k.demo.event.ConsumerBlockListener : 消费者堵塞,堵塞时间:29928毫秒,partition:[xiaofa-0],source:KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-0, clientIndex=-0, topicPartitions=[xiaofa-0]]
2021-12-30 10:26:14.678 INFO 1825 —- [ntainer#0-0-C-1] c.x.kafka.demo.consumer.SpringConsumer : 消费消息,value:13,offset:423,record:ConsumerRecord(topic = xiaofa, partition = 0, leaderEpoch = 0, offset = 423, CreateTime = 1640831007825, serialized key size = 2, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = 13, value = 13)
……
2021-12-30 10:26:41.693 INFO 1825 —- [ntainer#0-0-C-1] c.x.kafka.demo.consumer.SpringConsumer : 消费消息,value:22,offset:432,record:ConsumerRecord(topic = xiaofa, partition = 0, leaderEpoch = 0, offset = 432, CreateTime = 1640831008744, serialized key size = 2, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = 22, value = 22)
2021-12-30 10:26:44.585 WARN 1825 —- [TaskScheduler-1] c.x.k.demo.event.ConsumerBlockListener : 消费者堵塞,堵塞时间:59928毫秒,partition:[xiaofa-0],source:KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-0, clientIndex=-0, topicPartitions=[xiaofa-0]]

如日志所示,每隔 30s 会打印一次消费堵塞。在实际项目中,可以解析出 topic、partition,针对不同的 topic,制定告警策略。

3. 原理

在 Kafka Consumer 容器启动的时候,会注册一系列的监听器,代码参见 org.springframework.kafka.listener.KafkaMessageListenerContainer#doStart 中

1
this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);

其中一个监听任务为 checkConsumer,很明显是监听和 Consumer 相关的。

1
2
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> checkConsumer(),
this.containerProperties.getMonitorInterval() * 1000);

该定时任务会每隔 30s 执行一次,因为 monitorInterval 的值默认 30s,正好和实际效果一致。

1
2
3
4
/**
* The default {@link #setMonitorInterval(int) monitorInterval} (s).
*/
public static final int DEFAULT_MONITOR_INTERVAL = 30;

实际的 checkConsumer 方法如下:

1
2
3
4
5
6
7
protected void checkConsumer() {
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()
> this.containerProperties.getNoPollThreshold()) {
publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
}
}

当 poll 之间间隔达到阈值时,就会发送 NonResponsiveConsumerEvent 事件。

本文标题:Kafka消费阻塞事件

原始链接:https://zhaoxiaofa.com/2021/10/27/Kafka消费阻塞事件/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。