自定义 Apache Kafka Serializer(序列化器)

1、简介

在 Apache Kafka 中传输消息时,客户端和服务器会就使用共同的语法格式达成协议。Apache Kafka 提供了默认的转换器(Converter),如 StringLong。同时也支持针对特定用例的自定义序列化器 (Serializer)。

2、Apache Kafka 中的 Serializer

序列化是将对象转换为字节的过程。反序列化则是将字节流转换为对象的逆过程。简而言之,它将内容转换为可读和可解释的信息。

如上所述,Apache Kafka 为几种基本类型提供了默认序列化器,并允许我们实现自定义序列化器:

通过网络向 Kafka Topic 发送消息的过程

上图显示了通过网络向 Kafka Topic 发送消息的过程。在此过程中,生产者将消息发送到 Topic 之前,自定义序列化器会将对象转换成字节。同样,它也显示了反序列化器如何将字节转换回对象,以便消费者正确处理。

2.1、自定义 Serializer

Apache Kafka 为几种基本类型提供了预置的序列化器和反序列化器:

它也提供了实现自定义序列化器/反序列化器的功能。为了序列化自己的对象,需要实现 Serializer 接口。同样,要创建自定义的反序列化器,需要实现 Deserializer 接口。

这两个接口都有可覆写的方法:

  • configure:用于实现配置细节
  • serialize / deserialize:这些方法包括自定义序列化和反序列化的实际实现
  • close:使用该方法关闭 Kafka Session

3、实现自定义 Serializer

Kafka 提供了自定义序列化器的功能。可以为消息的 Key 和 Value 实现特定的转换器(Converter)。

3.1、依赖

pom.xml 中添加 Kafka Consumer API 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

3.2、自定义 Serializer

首先,使用 Lombok 来定义要通过 Kafka 发送的自定义对象:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
    private String message;
    private String version;
}

接下来,实现 Kafka 提供的 Serializer 接口,以便生产者发送消息:

public class CustomSerializer implements Serializer<MessageDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, MessageDto data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

覆写接口的 serialize 方法。使用 Jackson ObjectMapper 转换自定义对象。然后,返回字节流,以便将信息正确发送到网络

3.3、自定义 Deserializer

同样,在消费者实现 Deserializer 接口:

@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String topic, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            System.out.println("Deserializing...");
            return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

与上一节一样,覆写接口的 deserialize 方法,使用相同的 Jackson ObjectMapper 将字节流转换为自定义对象。

3.4、示例

一个使用自定义序列化器和反序列化器发送和接收消息的示例。

首先,创建并配置 Kafka Producer

private static KafkaProducer<String, MessageDto> createKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");

    return new KafkaProducer(props);
}

通过 VALUE_SERIALIZER_CLASS_CONFIG 属性配置 Value 的序列化器,使用默认的 StringSerializer 配置 Key 序列化器。

其次,创建 Kafka Consumer

private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");

    return new KafkaConsumer<>(props);
}

除了使用自定义类的 Key 和 Value 反序列化器外,还必须包含 GROUP_ID。除此之外,还将自动偏移重置配置设为 earliest(最早),以确保生产者在消费者启动前发送所有消息。

创建了生产者和消费者客户端后,就可以发送示例信息了:

MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();

KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();

可以通过订阅主题,在消费者端接收消息。

AtomicReference<MessageDto> msgCons = new AtomicReference<>();

KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));

ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
    msgCons.set(record.value());
    System.out.println("Message received " + record.value());
});

consumer.close();

控制台显示的结果如下:

Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)

4、总结

本文介绍了 Apache Kafka 中消息的序列化、反序列化机制,以及如何自定义 Serializer 和 Seserializer 实现。


参考:https://www.baeldung.com/kafka-custom-serializer