Spring Boot 配置 Kafka SSL 双向认证

1、简介

本文将带你了解在 Spring Boot 中如何配置 SSL 认证以连接到 Apache Kafka Broker。

安全套接字层(SSL)实际上已被弃用,自 2015 年起被传输层安全(TLS)所取代。不过,由于历史原因,Kafka(和 Java)仍然使用 “SSL”。

2、SSL 概览

默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证。

首先,可以为 Broker 和客户端之间的加密配置 SSL。默认情况下,这需要使用公钥加密进行单向身份认证,由客户端验证服务器证书。

此外,服务器还可以使用单独的机制(如 SSL 或 SASL)对客户端进行身份认证,从而实现双向身份认证或相互 TLS(mTLS)。基本上,双向 SSL 认证确保客户端和服务器都使用 SSL 证书来认证对方的身份,并在双向上相互信任。

在本文中,Broker 使用 SSL 对客户端进行身份验证,使用 Keystore 和 Truststore 保存证书和密钥。

每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。

Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。

3、依赖和设置

创建一个简单的 Spring Boot 示例应用,在 pom.xml 中添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

使用 Docker Compose 文件来配置和测试 Kafka 服务器设置。

首先,不配置任何 SSL:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.2.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

启动容器:

docker-compose up

这会以默认配置拉起 Broker。

4、Broker 配置

来看看建立安全连接所需的最低配置。

4.1、单机 Broker

本例使用单机 Broker 来介绍启用 SSL 身份认证所需的最低配置。

首先,需要在 server.properties 中配置 Broker 通过 9093 端口监听 SSL 连接:

listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

接着,使用 keystoretruststore 相关属性配置证书和凭证。

ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password

最后,配置 Broker 对客户端进行身份认证,从而实现双向身份认证:

ssl.client.auth=required

4.2、Docker Compose

由于我们使用 Docker Compose 来管理 Broker,现将上述所有属性添加到 docker-compose.yml 文件的 environment 中:

kafka:
  image: confluentinc/cp-kafka:6.2.0
  depends_on:
    - zookeeper
  ports:
    - 9092:9092
    - 9093:9093
  environment:
    ...
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
    KAFKA_SSL_CLIENT_AUTH: 'required'
    KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
    KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
    KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
    KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
    KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
  volumes:
    - ./certs/:/etc/kafka/secrets/certs

如上,在 ports 配置公开了 SSL 端口(9093)。此外,还在 volumes 配置挂载了 certs 项目文件夹。其中包含所需的证书和相关凭证。

现在,使用 Compose 重新启动应用。可以在 Broker 日志中看到 SSL 相关的信息:

...
kafka_1      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1      | ===> Configuring ...
<strong>kafka_1      | SSL is enabled.</strong>
....
kafka_1      | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1      |  advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1      |  ssl.client.auth = required</strong>
<strong>kafka_1      |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1      |  ssl.endpoint.identification.algorithm = https
kafka_1      |  ssl.key.password = [hidden]
kafka_1      |  ssl.keymanager.algorithm = SunX509
<strong>kafka_1      |  ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1      |  ssl.keystore.password = [hidden]
kafka_1      |  ssl.keystore.type = JKS
kafka_1      |  ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1      |  ssl.protocol = TLSv1.3</strong>
kafka_1      |  ssl.trustmanager.algorithm = PKIX
kafka_1      |  ssl.truststore.certificates = null
<strong>kafka_1      |  ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1      |  ssl.truststore.password = [hidden]
kafka_1      |  ssl.truststore.type = JKS
....

5、Spring Boot 客户端

服务器设置完成后,现在来创建和 Broker 进行 SSL 双向认证所需的 Spring Boot 组件。

5.1、Producer

首先,使用 KafkaTemplate 向指定的 Topic 发送一条消息:

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) {
        log.info("Producing message: {}", message);
        kafkaTemplate.send(topic, "key", message)
          .addCallback(
            result -> log.info("Message sent to topic: {}", message),
            ex -> log.error("Failed to send message", ex)
          );
    }
}

send 方法是一个异步操作。因此,添加了一个简单的回调,一旦 Broker 收到消息,它就会输出日志。

5.2、Consumer

接着,使用 @KafkaListener 创建一个简单的消费者。它会连接到 Broker,并从和生产者相同的 Topic 中消费消息:

public class KafkaConsumer {

    public static final String TOPIC = "test-topic";

    public final List<String> messages = new ArrayList<>();

    @KafkaListener(topics = TOPIC)
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received payload: '{}'", consumerRecord.toString());
        messages.add(consumerRecord.value());
    }
}

在本例中,为了简单,消费者只需将消息存储在 List 中。在实际系统中,消费者会接收消息,并根据应用的业务逻辑对其进行处理。

5.3、配置

最后,在 application.yml 中添加必要的配置:

spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: localhost:9093
    ssl:
      trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
      trust-store-password: <password>
      key-store-location:  classpath:/client-certs/kafka.client.keystore.jks
      key-store-password: <password>
    
    #  producer/consumer 的额外配置

如上,通过 Spring Boot 提供的属性配置了生产者和消费者。由于这两个组件都连接到同一个 Broker,可以将所有必要的属性声明在 spring.kafka 下。然而,如果生产者和消费者连接到不同的 Broker,则需要分别在 spring.kafka.producerspring.kafka.consumer 下指定这些属性。

在配置的 ssl 部分,指定了 JKS Truststore,以验证 Kafka Broker。其中包含签署了 Broker 证书的 CA 的证书。此外,还提供了 Spring 客户端 Keystore 的路径,其中包含由 CA 签发的证书,该证书应存在于 Broker 端的 Truststore 中。

5.4、测试

由于使用的是 Compose,因此可以使用 Testcontainers 框架为生产者和消费者创建一个端到端的测试:

@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {

    private static final String KAFKA_SERVICE = "kafka";
    private static final int SSL_PORT = 9093;  

    @Container
    public DockerComposeContainer<?> container =
      new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
        .withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());

    @Autowired
    private KafkaProducer kafkaProducer;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Test
    void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
        String message = generateSampleMessage();
        kafkaProducer.sendMessage(message, TOPIC);

        await().atMost(Duration.ofMinutes(2))
          .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
    }

    private static String generateSampleMessage() {
        return UUID.randomUUID().toString();
    }
}

运行测试时,Testcontainers 会使用 Compose 文件(包括 SSL 配置)启动 Kafka Broker。应用也会启动并使用其 SSL 配置,通过加密和身份认证的连接与 Broker 建立连接。由于这是一连串异步事件,所以使用 await 轮询消费者消息存储中的预期消息。以此来验证所有配置以及 Broker 和客户端之间的双向身份认证是否成功。

6、总结

本文介绍了 Kafka Broker 如何配置 SSL 双向认证,以及如何在 Spring Boot 中配置 SSL 双向认证连接到 Kafka Borker 进行生产和消费。


Ref:https://www.baeldung.com/spring-boot-kafka-ssl