Spring Boot + Open Telemetry 实现 Kafka 追踪

本文将带你了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置追踪功能。我们会使用 Micrometer 库发送追踪信息,并使用Jaeger来存储和可视化这些数据。Spring Kafka内置了与 Micrometer 的集成,用于 KafkaTemplate 和监听容器。本文还会介绍何配置 Spring Kafka observability (可观察性),以在追踪中添加自定义标签(Tag)。

源码

你可以在 GitHub 上找到完整的源码。Clone 项目后,进入 kafka 目录,按照说明进行操作即可。

依赖

添加如下依赖,其中 Spring Boot Starter 和 Spring Kafka 用于发送或接收消息,Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥接器用于自动生成与每条消息相关的追踪,最后是 opentelemetry-exporter-otlp,用于将追踪导出到应用外。

对于本文中的两个示例 Spring Boot 应用,依赖都是相同的。

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
  </dependency>
  <dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-tracing-bridge-otel</artifactId>
  </dependency>
  <dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
  </dependency>
</dependencies>

生产者启用 Spring Boot Kafka 追踪

应用逻辑很简单,只是发送和接收消息。消息类 Info 如下:

public class Info {

    private Long id;
    private String source;
    private String space;
    private String cluster;
    private String message;

    public Info(Long id, String source, String space, String cluster, 
                String message) {
       this.id = id;
       this.source = source;
       this.space = space;
       this.cluster = cluster;
       this.message = message;
    }

   // get /set 方法

producer 应用开始。它每秒生成一条消息,并使用 KafkaTemplate Bean 发送。

SenderService Bean 如下:

@Service
public class SenderService {

   private static final Logger LOG = LoggerFactory
      .getLogger(SenderService.class);

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:info}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, 
                           namespace, 
                           cluster, 
                           "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
                LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
                         sr.getProducerRecord().value()));
    }

}

Spring Boot 提供了一个自动配置的 KafkaTemplate 实例。然而,要在 Spring Boot 中启用 Kafka 追踪,需要自定义该实例。以下是在 producer 应用的 main class 中实现的 KafkaTemplate Bean。

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   @Bean
   public NewTopic infoTopic() {
      return TopicBuilder.name("info")
             .partitions(1)
             .replicas(1)
             .build();
   }

   @Bean
   public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
      KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
      t.setObservationEnabled(true);
      t.setObservationConvention(new KafkaTemplateObservationConvention() {
         @Override
         public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
            return KeyValues.of("topic", context.getDestination(),
                    "id", String.valueOf(context.getRecord().key()));
         }
      });
      return t;
   }

}

调用 setObservationEnabled 方法启用追踪。默认情况下,Micrometer 模块会生成一些通用的标签。我们希望至少添加目标 Topic 的名称和 Kafka Message Key。因此,创建 KafkaTemplateObservationConvention 接口的自定义实现。它使用 KafkaRecordSenderContextProducerRecord 对象中获取 Topic 名称和 Message Key。

还需要设置 Jaeger 实例的地址,并决定导出 span 的百分比。下面是包含所需属性的 application.yml 文件:

producer/application.yml

spring:
  application.name: kafka-producer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

消费者启用 Spring Boot Kafka 追踪

consumer 应用中,它只接收并打印来自 Kafka Topic 的消息。

下面是 Listener @Service 的实现。除了整个消息内容,它还会打印 Message Key 和 Topic 分区号。

@Service
public class ListenerService {

   private static final Logger LOG = LoggerFactory
      .getLogger(ListenerService.class);

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
                         @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }

}

为了在消费者端生成和导出追踪,需要覆盖 ConcurrentKafkaListenerContainerFactory Bean。

对于容器监听器工厂,应该获取 ContainerProperties 实例,然后调用 setObservationEnabled 方法。与之前一样,可以创建 KafkaTemplateObservationConvention 接口的自定义实现,以包含附加 Tag(可选)。

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumer.class, args);
    }

    @Value("${app.in.topic}")
    private String topic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public NewTopic infoTopic() {
        return TopicBuilder.name(topic)
                .partitions(10)
                .replicas(3)
                .build();
    }

}

当然,还需要在 application.yml 文件中设置 Jaeger 地址:

consumer/application.yml

spring:
  application.name: kafka-consumer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

app.in.topic: ${TOPIC:info}

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

使用 Docker 进行测试

使用 Docker 容器运行 Kakfa 和 Jaeger。

首先,为 producerconsumer 应用构建容器镜像。Spring Boot 为此提供了内置工具。因此,只需执行以下命令即可:

$ mvn clean package spring-boot:build-image

然后,定义包含容器列表的 docker-compose.yml 文件。可以使用基于环境变量的方式来动态覆盖 Spring Boot 属性。因此,可以轻松更改容器的 Kafka 和 Jaeger 的地址。

docker-compose.yml 文件示例如下:

docker-compose.yml

version: "3.8"
services:
  broker:
    image: moeenz/docker-kafka-kraft:latest
    restart: always
    ports:
      - "9092:9092"
    environment:
      - KRAFT_CONTAINER_HOST_NAME=broker
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4317:4317"
      - "4318:4318"
  producer:
    image: library/producer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
  consumer:
    image: library/consumer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092

用以下命令运行所有已定义的容器:

$ docker compose up

应用启动,通过日志可以看到正在生产和消费的消息。

Spring Boot 运行日志

Jaeger 仪表板在 16686 端口提供服务。你可以看到几条带有 kafka-producerkafka-consumer span 的追踪记录。

Jaeger 仪表板

可以查看每个条目的细节。producer 应用生成的追踪总是与 consumer 应用为每一条消息生成的追踪相关联。此外,还有两个自定义标签(idtopic),其值由 KafkaTemplate Bean 添加。

追踪 条目的细节

在 Kubernetes 上运行

现在可以把示例应用部署到 Kubernetes 上了。你可以使用 Skaffold CLI 轻松完成部署。在此之前,需要在 Kubernetes 上安装 Kafka 和 Jaeger。

安装 Jaeger。第一步,需要添加以下 Helm 仓库:

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts

在默认情况下,Jaeger Helm chart 不公开 OTLP 端点。为了启用它们,需要覆盖一些默认设置。

YAML manifest 如下:

jaeger-values.yaml

collector:
  service:
    otlp:
      grpc:
        name: otlp-grpc
        port: 4317
      http:
        name: otlp-http
        port: 4318

使用 jaeger-values.yamljaeger 命名空间中安装 Jaeger:

$ helm install jaeger jaegertracing/jaeger -n jaeger \
    --create-namespace \
    -f jaeger-values.yaml

安装 Jaeger 后,就可以验证 Kubernetes 服务列表。

使用 jaeger-collector 服务为应用发送追踪,并通过 jaeger-query 服务访问 UI 仪表板。

$ kubectl get svc -n jaeger
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                           AGE
jaeger-agent       ClusterIP   10.96.147.104   <none>        5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP     14m
jaeger-cassandra   ClusterIP   None            <none>        7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP      14m
jaeger-collector   ClusterIP   10.96.111.236   <none>        14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP   14m
jaeger-query       ClusterIP   10.96.88.64     <none>        80/TCP,16685/TCP,16687/TCP                        14m

最后,运行连接到 Kafka 和 Jaeger 的示例 Spring Boot 应用。

下面是 producer 应用的 Deployment 对象。它通过定义 KAFKA_URLMANAGEMENT_OTLP_TRACING_ENDPOINT 环境变量来覆盖默认的 Kafka 和 Jaeger 地址。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

下面是 consumer 应用的类似 Deployment 对象:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-1
spec:
  selector:
    matchLabels:
      app: consumer-1
  template:
    metadata:
      labels:
        app: consumer-1
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: SPRING_APPLICATION_NAME
            value: kafka-consumer-1
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces

假设你在 Git 仓库的 kafka 目录中,只需运行以下命令即可部署两个应用。顺便说一下,这为消费者应用创建了两个 Deployment(consumer-1consumer-2),以便 Jaeger 进行可视化。

$ skaffold run -n strimzi --tail

运行应用后,就可以进入 Jaeger 面板,验证追踪列表。为了访问控制面板,可以为 jaeger-query 服务启用端口转发。

$ kubectl port-forward svc/jaeger-query 80:80

最后

Spring Kafka 与 Micrometer Tracing 的集成是自 3.0 版本以来的一个相对较新的功能,可能很快就会有新的改进。无论如何,目前它提供了一种从 Kafka 生产者和消费者生成和发送追踪的简单方式。


Ref:https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry