Spring Boot + Open Telemetry 实现 Kafka 追踪
本文将带你了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置追踪功能。我们会使用 Micrometer 库发送追踪信息,并使用Jaeger来存储和可视化这些数据。Spring Kafka内置了与 Micrometer 的集成,用于 KafkaTemplate
和监听容器。本文还会介绍何配置 Spring Kafka observability (可观察性),以在追踪中添加自定义标签(Tag)。
源码
你可以在 GitHub 上找到完整的源码。Clone 项目后,进入 kafka
目录,按照说明进行操作即可。
依赖
添加如下依赖,其中 Spring Boot Starter 和 Spring Kafka 用于发送或接收消息,Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥接器用于自动生成与每条消息相关的追踪,最后是 opentelemetry-exporter-otlp
,用于将追踪导出到应用外。
对于本文中的两个示例 Spring Boot 应用,依赖都是相同的。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
</dependencies>
生产者启用 Spring Boot Kafka 追踪
应用逻辑很简单,只是发送和接收消息。消息类 Info
如下:
public class Info {
private Long id;
private String source;
private String space;
private String cluster;
private String message;
public Info(Long id, String source, String space, String cluster,
String message) {
this.id = id;
this.source = source;
this.space = space;
this.cluster = cluster;
this.message = message;
}
// get /set 方法
从 producer
应用开始。它每秒生成一条消息,并使用 KafkaTemplate
Bean 发送。
SenderService
Bean 如下:
@Service
public class SenderService {
private static final Logger LOG = LoggerFactory
.getLogger(SenderService.class);
AtomicLong id = new AtomicLong();
@Autowired
KafkaTemplate<Long, Info> template;
@Value("${POD:kafka-producer}")
private String pod;
@Value("${NAMESPACE:empty}")
private String namespace;
@Value("${CLUSTER:localhost}")
private String cluster;
@Value("${TOPIC:info}")
private String topic;
@Scheduled(fixedRate = 1000)
public void send() {
Info info = new Info(id.incrementAndGet(),
pod,
namespace,
cluster,
"HELLO");
CompletableFuture<SendResult<Long, Info>> result = template
.send(topic, info.getId(), info);
result.whenComplete((sr, ex) ->
LOG.info("Sent({}): {}", sr.getProducerRecord().key(),
sr.getProducerRecord().value()));
}
}
Spring Boot 提供了一个自动配置的 KafkaTemplate
实例。然而,要在 Spring Boot 中启用 Kafka 追踪,需要自定义该实例。以下是在 producer
应用的 main class 中实现的 KafkaTemplate
Bean。
@SpringBootApplication
@EnableScheduling
public class KafkaProducer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaProducer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaProducer.class, args);
}
@Bean
public NewTopic infoTopic() {
return TopicBuilder.name("info")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
t.setObservationEnabled(true);
t.setObservationConvention(new KafkaTemplateObservationConvention() {
@Override
public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
return KeyValues.of("topic", context.getDestination(),
"id", String.valueOf(context.getRecord().key()));
}
});
return t;
}
}
调用 setObservationEnabled
方法启用追踪。默认情况下,Micrometer
模块会生成一些通用的标签。我们希望至少添加目标 Topic 的名称和 Kafka Message Key。因此,创建 KafkaTemplateObservationConvention
接口的自定义实现。它使用 KafkaRecordSenderContext
从 ProducerRecord
对象中获取 Topic 名称和 Message Key。
还需要设置 Jaeger 实例的地址,并决定导出 span
的百分比。下面是包含所需属性的 application.yml
文件:
producer/application.yml
spring:
application.name: kafka-producer
kafka:
bootstrap-servers: ${KAFKA_URL:localhost}:9092
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://jaeger:4318/v1/traces
消费者启用 Spring Boot Kafka 追踪
在 consumer
应用中,它只接收并打印来自 Kafka Topic 的消息。
下面是 Listener @Service
的实现。除了整个消息内容,它还会打印 Message Key 和 Topic 分区号。
@Service
public class ListenerService {
private static final Logger LOG = LoggerFactory
.getLogger(ListenerService.class);
@KafkaListener(id = "info", topics = "${app.in.topic}")
public void onMessage(@Payload Info info,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
LOG.info("Received(key={}, partition={}): {}", key, partition, info);
}
}
为了在消费者端生成和导出追踪,需要覆盖 ConcurrentKafkaListenerContainerFactory
Bean。
对于容器监听器工厂,应该获取 ContainerProperties
实例,然后调用 setObservationEnabled
方法。与之前一样,可以创建 KafkaTemplateObservationConvention
接口的自定义实现,以包含附加 Tag(可选)。
@SpringBootApplication
@EnableKafka
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory
.getLogger(KafkaConsumer.class);
public static void main(String[] args) {
SpringApplication.run(KafkaConsumer.class, args);
}
@Value("${app.in.topic}")
private String topic;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public NewTopic infoTopic() {
return TopicBuilder.name(topic)
.partitions(10)
.replicas(3)
.build();
}
}
当然,还需要在 application.yml
文件中设置 Jaeger 地址:
consumer/application.yml
spring:
application.name: kafka-consumer
kafka:
bootstrap-servers: ${KAFKA_URL:localhost}:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
app.in.topic: ${TOPIC:info}
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://jaeger:4318/v1/traces
使用 Docker 进行测试
使用 Docker 容器运行 Kakfa 和 Jaeger。
首先,为 producer
和 consumer
应用构建容器镜像。Spring Boot 为此提供了内置工具。因此,只需执行以下命令即可:
$ mvn clean package spring-boot:build-image
然后,定义包含容器列表的 docker-compose.yml
文件。可以使用基于环境变量的方式来动态覆盖 Spring Boot 属性。因此,可以轻松更改容器的 Kafka 和 Jaeger 的地址。
docker-compose.yml
文件示例如下:
docker-compose.yml
version: "3.8"
services:
broker:
image: moeenz/docker-kafka-kraft:latest
restart: always
ports:
- "9092:9092"
environment:
- KRAFT_CONTAINER_HOST_NAME=broker
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "4317:4317"
- "4318:4318"
producer:
image: library/producer:1.0-SNAPSHOT
links:
- broker
- jaeger
environment:
MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
consumer:
image: library/consumer:1.0-SNAPSHOT
links:
- broker
- jaeger
environment:
MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
用以下命令运行所有已定义的容器:
$ docker compose up
应用启动,通过日志可以看到正在生产和消费的消息。
Jaeger 仪表板在 16686 端口提供服务。你可以看到几条带有 kafka-producer
和 kafka-consumer
span 的追踪记录。
可以查看每个条目的细节。producer
应用生成的追踪总是与 consumer
应用为每一条消息生成的追踪相关联。此外,还有两个自定义标签(id
和 topic
),其值由 KafkaTemplate
Bean 添加。
在 Kubernetes 上运行
现在可以把示例应用部署到 Kubernetes 上了。你可以使用 Skaffold CLI 轻松完成部署。在此之前,需要在 Kubernetes 上安装 Kafka 和 Jaeger。
安装 Jaeger。第一步,需要添加以下 Helm 仓库:
$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
在默认情况下,Jaeger Helm chart 不公开 OTLP 端点。为了启用它们,需要覆盖一些默认设置。
YAML manifest 如下:
jaeger-values.yaml
collector:
service:
otlp:
grpc:
name: otlp-grpc
port: 4317
http:
name: otlp-http
port: 4318
使用 jaeger-values.yaml
在 jaeger
命名空间中安装 Jaeger:
$ helm install jaeger jaegertracing/jaeger -n jaeger \
--create-namespace \
-f jaeger-values.yaml
安装 Jaeger 后,就可以验证 Kubernetes 服务列表。
使用 jaeger-collector
服务为应用发送追踪,并通过 jaeger-query
服务访问 UI 仪表板。
$ kubectl get svc -n jaeger
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
jaeger-agent ClusterIP 10.96.147.104 <none> 5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP 14m
jaeger-cassandra ClusterIP None <none> 7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP 14m
jaeger-collector ClusterIP 10.96.111.236 <none> 14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP 14m
jaeger-query ClusterIP 10.96.88.64 <none> 80/TCP,16685/TCP,16687/TCP 14m
最后,运行连接到 Kafka 和 Jaeger 的示例 Spring Boot 应用。
下面是 producer
应用的 Deployment 对象。它通过定义 KAFKA_URL
和 MANAGEMENT_OTLP_TRACING_ENDPOINT
环境变量来覆盖默认的 Kafka 和 Jaeger 地址。
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer
spec:
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
containers:
- name: producer
image: piomin/producer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: MANAGEMENT_OTLP_TRACING_ENDPOINT
value: http://jaeger-collector.jaeger:4318/v1/traces
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: CLUSTER
value: c1
- name: TOPIC
value: test-1
- name: POD
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
下面是 consumer
应用的类似 Deployment 对象:
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer-1
spec:
selector:
matchLabels:
app: consumer-1
template:
metadata:
labels:
app: consumer-1
spec:
containers:
- name: consumer
image: piomin/consumer
resources:
requests:
memory: 200Mi
cpu: 100m
ports:
- containerPort: 8080
env:
- name: SPRING_APPLICATION_NAME
value: kafka-consumer-1
- name: TOPIC
value: test-1
- name: KAFKA_URL
value: my-cluster-kafka-bootstrap
- name: MANAGEMENT_OTLP_TRACING_ENDPOINT
value: http://jaeger-collector.jaeger:4318/v1/traces
假设你在 Git 仓库的 kafka
目录中,只需运行以下命令即可部署两个应用。顺便说一下,这为消费者应用创建了两个 Deployment(consumer-1
和 consumer-2
),以便 Jaeger 进行可视化。
$ skaffold run -n strimzi --tail
运行应用后,就可以进入 Jaeger 面板,验证追踪列表。为了访问控制面板,可以为 jaeger-query
服务启用端口转发。
$ kubectl port-forward svc/jaeger-query 80:80
最后
Spring Kafka 与 Micrometer Tracing 的集成是自 3.0 版本以来的一个相对较新的功能,可能很快就会有新的改进。无论如何,目前它提供了一种从 Kafka 生产者和消费者生成和发送追踪的简单方式。
Ref:https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry