Spring Kafka 的 “Trusted Packages” 特性

1、概览

本文将带你了解 Spring Kafka 中的 “Trusted Packages” 功能,了解其背后的动机以及用法。

2、先决条件

一般来说,Spring Kafka 模块允许我们指定一些关于发送的 POJO 的元数据。它通常采用 Kafka Message Header 的形式。

例如,可以这样配置 ProducerFactory

@Bean
public ProducerFactory<Object, SomeData> producerFactory() {
    JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>();
    jsonSerializer.setAddTypeInfo(true);
    return new DefaultKafkaProducerFactory<>(
      producerFactoryConfig(),
      new StringOrBytesSerializer(),
      jsonSerializer
    );
}

@Data
@AllArgsConstructor
static class SomeData {

    private String id;
    private String type;
    private String status;
    private Instant timestamp;
}

然后,使用上面用 producerFactory 配置的 KafkaTemplate 在一个 Topic 中生产一条新消息:

public void sendDataIntoKafka() {
    SomeData someData = new SomeData("1", "active", "sent", Instant.now());
    kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData));
}

此时,我们会在 Kafka 消费者的控制台中收到以下信息:

CreateTime:1701021806470 __TypeId__:com.baeldung.example.SomeData null {"id":"1","type":"active","status":"sent","timestamp":1701021806.153965150}

可以看到,消息中 POJO 的类型信息就在 Header 信息中。当然,这是 Spring Kafka 独有的特性。也就是说,从 Kafka 或其他框架的角度来看,这些 Header 只是元数据。因此,我们可以假设消费者和生产者都使用 Spring 来处理 Kafka 消息。

3、Trusted Packages 特性

在某些情况下,这是一个非常有用的功能。当 Topic 中的消息具有不同的 Payload 类型时,为消费者提示 Payload 的类型将非常有用。

Kafka 消息流程

不过,一般来说,我们知道 Topic 中会出现哪些类型的消息。因此,限制消费者可能接受的 Payload 类型可能是个好主意。这就是 Spring Kafka “Trusted Packages” 的意义所在。

4、用法

“Trusted Packages” 是 Spring Kafka 中的一个功能,它在反序列化器(deserializer)级别进行配置。如果配置了 Trusted Packages,Spring 将查找传入消息的 type Header。然后,它会检查消息中提供的所有类型是否都是受信任的 — 包括 Key 和 Value。

这主要是指在 相应 Header 中指定的 Key 和 Value 的 Java 类必须位于受信任的包内。如果一切正常,Spring 就会将消息传递,进一步反序列化。如果 Header 不存在,Spring 将直接反序列化对象,而不会检查 “Trusted Packages”:

@Bean
public ConsumerFactory<String, SomeData> someDataConsumerFactory() {
    JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
    payloadJsonDeserializer.addTrustedPackages("com.baeldung.example");
    return new DefaultKafkaConsumerFactory<>(
      consumerConfigs(),
      new StringDeserializer(),
      payloadJsonDeserializer
    );
}

如果用星号(*) 代替具体的 package,Spring 会信任所有包:

JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
payloadJsonDeserializer.trustedPackages("*");

然而,在这种情况下,使用 “Trusted packages” 并没有任何作用,只会产生额外的开销。

5.1、第一个动机: 一致性

这个功能之所以很棒,有两个主要原因。首先,如果集群中出现问题,我们可以快速失败。想象一下,某个生产者意外地向一个他不应该发布消息的 Topic 发布了消息。这可能会造成很多问题,特别是如果我们成功地对传入的消息进行反序列化。在这种情况下,整个系统的行为可能是不确定的。

因此,如果生产者发布的消息中包含类型信息,并且消费者知道它信任的类型,那么所有这些问题都可以避免。当然,这假设生产者的消息类型与消费者期望的类型不同。但是这个假设是相当合理的,因为这个生产者本来就不应该在这个主题中发布消息。

5.2、第二个动机: 安全

但最重要的还是安全问题。在前面的例子中,我们强调生产者无意中向 Topic 发布了信息。但这也可能是一种有意攻击。恶意生产者可能故意将信息发布到特定 Topic 中,以利用反序列化漏洞。因此,通过防止对不需要的消息进行反序列化,Spring 提供了额外的安全措施来降低安全风险。

在这里非常重要的一点是,“Trusted Packages” 功能并不能解决 “Header 伪造” 攻击。在这种情况下,攻击者篡改消息的 Header,使接收者误以为该消息是合法的并且来自受信任的源头。因此,通过提供正确的类型 Header,攻击者仍然可能欺骗 Spring,后者将继续进行消息的反序列化。但这个问题非常复杂,不是本文讨论的主题。总的来说,Spring 只是提供了额外的安全措施,以最大程度减少黑客得逞的风险。

6、总结

本文介绍了Spring Kafka 的 “Trusted Packages” 特性,该功能为分布式消息系统提供了额外的一致性和安全性。


Ref:https://www.baeldung.com/spring-kafka-trusted-packages-feature