Fork me on GitHub

Kafka业务监控

1. 前言

本文只探讨 Kafka 业务监控,基础监控(CPU、内存、JVM等)以及服务端 Broker 不在本文考虑范围内,这些一般属于中间件团队。

2. 业务指标

2.1 Producer

生产者主要关注的是发送速率和延迟,具体指标如下:

  • 发送消息数 / {time}:在 time 时间段内一共发送的消息数
  • batch-size-avg:Sender 线程实际发送消息时一个批次(ProducerBatch)的平均大小
  • batch-size-max:Sender 线程时间发送消息时一个批次的最大大小
  • 平均发送延时: 执行了 send 函数到返回结果的平均耗时
  • 最高发送延时:这段时间内 send 执行后返回结果最长的一次耗时

如果 batch-size-avg 值远小于 batch.size,说明吞吐量不够,可以适当调大 linger.ms

如果平均发送延时较高,比如超过 10ms,说明网络或者 Kafka 服务端负载过高,及时联系中间件和运维团队。

如果经常出现生产者发送消息时报 TimeOutException:Failed to allocate memory within the configured max blocking time,检查 buffer.memory 是否设置过小或者消息过大。

2.2 Consumer

消费者主要关注消费等待时间、业务逻辑耗时、拉取数据量、Rebalance、私信队列等指标,具体如下:

  • 平均消费等待时间:从消息发送到 Broker 到被开始被 Consumer 拉取的间隔时间
  • 最大消费等待时间:语义同上,某一个时间段内的最大等待时间
  • 平均业务逻辑耗时
  • 最大业务逻辑耗时
  • 指定时间内消费数
  • 消费失败次数、消费重试次数
  • 死信队列消息数
  • Rebalance 次数
  • 平均每次拉取消息数量
  • 平均每次拉取消息大小

平均等待时间如果过长,需要考虑优化延时,一般 100ms 以下比较正常,需要结合自身业务情况分析。

业务逻辑耗时尽量不要超过 200ms,如果最大业务逻辑耗时有特别大的,要定位到原因,避免出现极端情况下导致 Rebalance。

死信队列大于 0,一般来说是业务代码问题,需要分析相关日志。

Rebalance 一般只会在重启、发布、下线等操作才会发生,如果日常会发生 Rebalance,说明消费不正常,需要调节参数或优化代码解决。

3. 监控工具

常见监控工具有 JMX、Kafka Manager、Prometheus + Grafana,一般来讲,中间件团队都会提供基础的监控,在 2.2 中,除了 平均每次拉取消息数量平均每次拉取消息大小 外,其余指标,我司中间件团队的监控中均有采集。

3.1 自定义 Kafka 指标采集

Kafka 的 metrics 监控提供了一个接口 org.apache.kafka.common.metrics.MetricsReporter,只要实现该接口,可以获取到监控数据 org.apache.kafka.common.metrics.KafkaMetric。

然后定期将 KafkaMetric 中的数据解析出来,收集到 Prometheus + Grafana 的监控平台中。

以下是 Demo,生产环境解析收集会复杂的多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ConsumerMetricMonitor implements MetricsReporter {
private Logger logger = LoggerFactory.getLogger(ConsumerMetricMonitor.class);

@Override
public void init(List<KafkaMetric> list) {
if (CollectionUtils.isNotEmpty(list)) {
for (KafkaMetric kafkaMetric : list) {
logSingleMetric(kafkaMetric, "init");
}
}
}

@Override
public void metricChange(KafkaMetric kafkaMetric) {
logSingleMetric(kafkaMetric, "change");
}

@Override
public void metricRemoval(KafkaMetric kafkaMetric) {

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}

private void logSingleMetric(KafkaMetric kafkaMetric, String type) {
String name = kafkaMetric.metricName().name();
String clientId = kafkaMetric.metricName().tags().get("client-id");
String topic = kafkaMetric.metricName().tags().get("topic");
logger.info("监控kafka数据,type:{},topic:{}, name:{},clientId:{},metric:{}",
type, topic, name, clientId, kafkaMetric.metricName().tags());
if (null == topic || null == clientId) {
return;
}
ConsumerMetricLog.registerMetric(topic, name, clientId, kafkaMetric);
}

}

按照 topic、监控指标名称、consumer 存储 KafkaMetric 数据,定期统计到监控台。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ConsumerMetricLog {
private static Logger logger = LoggerFactory.getLogger(ConsumerMetricLog.class);

private static Map<String/*topic*/, Map<String/*name*/, Map<String/*consumerId*/, KafkaMetric>>> metricMap = new ConcurrentHashMap<>();

/**
* 定时线程池
*/
private static final ScheduledThreadPoolExecutor THREAD_POOL_EXECUTOR =
new ScheduledThreadPoolExecutor(1, new DefaultManagedAwareThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

static {
THREAD_POOL_EXECUTOR.scheduleAtFixedRate(() -> {
doLog();
}, 3, 10, TimeUnit.SECONDS);
}

private static void doLog() {
metricMap.forEach((topic, nameMap) -> {
if (MapUtils.isEmpty(nameMap)) {
return;
}
nameMap.forEach((name, consumerMap) -> {
if (MapUtils.isEmpty(consumerMap)) {
return;
}
double avg = consumerMap.values().stream()
.mapToDouble(mm -> (Double) mm.metricValue())
.filter(d -> !Double.isNaN(d))
.average()
.orElse(-1D);
if (Double.isNaN(avg)) {
avg = -1D;
}
if (avg >= 0) {
logger.info("kafka monitor,topic={},name={},value={}", topic, name, avg);
}
});
});
}

/**
* 注册
* @param topic
* @param name
* @param consumerId
* @param kafkaMetric
*/
public static void registerMetric(String topic, String name, String consumerId, KafkaMetric kafkaMetric) {
if (null == metricMap) {
metricMap = new ConcurrentHashMap<>();
}
Map<String, Map<String, KafkaMetric>> nameMap = metricMap.get(topic);
if (null == nameMap) {
nameMap = new ConcurrentHashMap<>();
}
Map<String, KafkaMetric> consumerMap = nameMap.get(consumerId);
if (null == consumerMap) {
consumerMap = new ConcurrentHashMap<>();
}
consumerMap.put(consumerId, kafkaMetric);
nameMap.put(name, consumerMap);
metricMap.put(topic, nameMap);
}
}

Consumer 的代码中只需要添加 metric.reporters 的配置即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "333");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put("fetch.min.bytes", 1000);
props.setProperty("metric.reporters", ConsumerMetricMonitor.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(3));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
Thread.sleep(100);
}
}

}

运行后,部分日志输出如下:

2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=bytes-consumed-rate,value=2.363349896982184
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag-avg,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=fetch-size-max,value=78.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-consumed-total,value=6.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead-min,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead-avg,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=bytes-consumed-total,value=78.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag-max,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-consumed-rate,value=0.18179614592170648
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-per-request-avg,value=6.0
2022-01-04 11:53:45.276 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=fetch-size-avg,value=78.0

其中,records-per-request-avg 是”平均每次拉取消息数量“,fetch-size-avg 是 ”平均每次拉取消息大小“。

这里只是给了一个简单的示例,详细的 Kafka 统计指标可参见 org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry。可根据自身业务需要统计对应指标。

参考

关于 Kafka 的系统监控,可以参考腾讯云的文章:https://cloud.tencent.com/developer/article/1554002

《Apache Kafka 实战》

本文标题:Kafka业务监控

原始链接:https://zhaoxiaofa.com/2021/11/25/Kafka业务监控/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。