1. 前言
本文只探讨 Kafka 业务监控,基础监控(CPU、内存、JVM等)以及服务端 Broker 不在本文考虑范围内,这些一般属于中间件团队。
2. 业务指标
2.1 Producer
生产者主要关注的是发送速率和延迟,具体指标如下:
- 发送消息数 / {time}:在 time 时间段内一共发送的消息数
- batch-size-avg:Sender 线程实际发送消息时一个批次(ProducerBatch)的平均大小
- batch-size-max:Sender 线程时间发送消息时一个批次的最大大小
- 平均发送延时: 执行了 send 函数到返回结果的平均耗时
- 最高发送延时:这段时间内 send 执行后返回结果最长的一次耗时
如果 batch-size-avg
值远小于 batch.size
,说明吞吐量不够,可以适当调大 linger.ms
。
如果平均发送延时较高,比如超过 10ms,说明网络或者 Kafka 服务端负载过高,及时联系中间件和运维团队。
如果经常出现生产者发送消息时报 TimeOutException:Failed to allocate memory within the configured max blocking time,检查 buffer.memory
是否设置过小或者消息过大。
2.2 Consumer
消费者主要关注消费等待时间、业务逻辑耗时、拉取数据量、Rebalance、私信队列等指标,具体如下:
- 平均消费等待时间:从消息发送到 Broker 到被开始被 Consumer 拉取的间隔时间
- 最大消费等待时间:语义同上,某一个时间段内的最大等待时间
- 平均业务逻辑耗时
- 最大业务逻辑耗时
- 指定时间内消费数
- 消费失败次数、消费重试次数
- 死信队列消息数
- Rebalance 次数
- 平均每次拉取消息数量
- 平均每次拉取消息大小
平均等待时间如果过长,需要考虑优化延时,一般 100ms 以下比较正常,需要结合自身业务情况分析。
业务逻辑耗时尽量不要超过 200ms,如果最大业务逻辑耗时有特别大的,要定位到原因,避免出现极端情况下导致 Rebalance。
死信队列大于 0,一般来说是业务代码问题,需要分析相关日志。
Rebalance 一般只会在重启、发布、下线等操作才会发生,如果日常会发生 Rebalance,说明消费不正常,需要调节参数或优化代码解决。
3. 监控工具
常见监控工具有 JMX、Kafka Manager、Prometheus + Grafana,一般来讲,中间件团队都会提供基础的监控,在 2.2 中,除了 平均每次拉取消息数量 和 平均每次拉取消息大小 外,其余指标,我司中间件团队的监控中均有采集。
3.1 自定义 Kafka 指标采集
Kafka 的 metrics 监控提供了一个接口 org.apache.kafka.common.metrics.MetricsReporter,只要实现该接口,可以获取到监控数据 org.apache.kafka.common.metrics.KafkaMetric。
然后定期将 KafkaMetric 中的数据解析出来,收集到 Prometheus + Grafana 的监控平台中。
以下是 Demo,生产环境解析收集会复杂的多。
1 | public class ConsumerMetricMonitor implements MetricsReporter { |
按照 topic、监控指标名称、consumer 存储 KafkaMetric 数据,定期统计到监控台。
1 | public class ConsumerMetricLog { |
Consumer 的代码中只需要添加 metric.reporters
的配置即可。
1 | public static void main(String[] args) throws InterruptedException { |
运行后,部分日志输出如下:
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=bytes-consumed-rate,value=2.363349896982184
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag-avg,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=fetch-size-max,value=78.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-consumed-total,value=6.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead-min,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead-avg,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lead,value=205223.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=bytes-consumed-total,value=78.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-lag-max,value=0.0
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-consumed-rate,value=0.18179614592170648
2022-01-04 11:53:45.275 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=records-per-request-avg,value=6.0
2022-01-04 11:53:45.276 [DefaultManagedAwareThreadFactory-1] INFO c.x.kafka.demo.consumer.metric.ConsumerMetricLog - kafka monitor,topic=xiaofa,name=fetch-size-avg,value=78.0
其中,records-per-request-avg
是”平均每次拉取消息数量“,fetch-size-avg
是 ”平均每次拉取消息大小“。
这里只是给了一个简单的示例,详细的 Kafka 统计指标可参见 org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry。可根据自身业务需要统计对应指标。
参考
关于 Kafka 的系统监控,可以参考腾讯云的文章:https://cloud.tencent.com/developer/article/1554002
《Apache Kafka 实战》