通过 CLI 命令行发送 Key/Value 消息到 Kafka

1、概览

本文将带你了解从 Kafka 命令行(CLI)发送 Key/Value 消息的两种方法。

在处理金融交易、预订、在线购物等实时事件驱动系统中,确保特定 Topic 上消息的有序性是一个常见需求。在这种情况下,我们应该为发送到这些 Topic 的事件使用 Kafka Message Key

2、先决条件

首先,需要一个正在运行的 Kafka 实例。如果没有,可以使用 Kafka Docker 或根据 Kafka 快速入门指南 建立一个环境。下面的章节将假定我们已经有一个运行中的 Kafka 实例,且可以通过 kafka-server:9092 访问。

接下来,假设我们开发的是一个 “支付系统”,而且需要从命令行发送消息。

以下是对应的 Model 类:

// 支付事件
public class PaymentEvent {
    private String reference;
    private BigDecimal amount;
    private Currency currency;

    // 标准的 Getter / Setter 方法
}

另一个前提条件是要有访问 Kafka CLI 工具的权限,这很简单。首先,下载 Kafka 发行版,然后解压下载的文件,并进入解压后的文件夹中。在 bin 文件夹下就可以找到 Kafka CLI 工具。

本文后面内容中的所有 CLI 命令,都假设是在 Kafka 解码文件夹下执行的。

你可以考虑把 bin 目录添加到 PATH 环境变量,从而可以在任意目录下都可以访问 Kafka CLI。

接下来,创建 payments Topic,如下:

bin/kafka-topics.sh --create --topic payments --bootstrap-server kafka-server:9092

控制台返回信息如下,表明 Topic 已成功创建:

Created topic payments.

最后,在 payments Topic 上创建一个 Kafka Consumer,以测试消息是否正确发送:

bin/kafka-console-consumer.sh --topic payments --bootstrap-server kafka-server:9092 --property "print.key=true" --property "key.separator=="

注意,上一条命令末尾指定了 print.key 属性。如果不明确将该属性设置为 true,消费者就不会打印 message key。我们还覆盖了 key.separator 属性的默认值(\t Tab 制表符),以便与后续章节中生成消息的方式保持一致。

现在,我们可以开始从命令行发送 Key/Value 消息了。

3、从命令行发送 Key/Value 消息

使用 Kafka Console Producer 从命令行发送 Key/Value 信息:

bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator=="

在命令行(CLI)中想要与 Message Payload 一起提供 Message Key 时,需要使用上一条命令结尾的 parse.keykey.separator 属性。

运行前一条命令后,会出现一个提示,我们可以在其中提供 Message Key 和 Message Payload:

>KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
>KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}

从消费者输出中看到,Message Key 和 Message Payload 都成功地从命令行发送:

KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}

4、从文件发送 Key/Value 消息

从命令行发送 Key/Value 消息的另一种方法是使用文件。

首先,创建 payment-events.txt 文件,其内容如下:

KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"}
KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}

现在,启动 Console Producer,并使用 payment-events.txt 文件作为输入:

bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator==" < payment-events.txt

查看消费者输出,可以看到这次 Message Key 和 Message Payload 也都正确发送:

KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"}
KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}

5、总结

本文介绍了如何在 Kafka 中通过命令行(CLI)发送 Key/Value 消息,以及如何从文件发送 Key/Value 消息。


Ref:https://www.baeldung.com/kafka-emit-key-value-message-cli