通过 CLI 命令行发送 Key/Value 消息到 Kafka
1、概览
本文将带你了解从 Kafka 命令行(CLI)发送 Key/Value 消息的两种方法。
在处理金融交易、预订、在线购物等实时事件驱动系统中,确保特定 Topic 上消息的有序性是一个常见需求。在这种情况下,我们应该为发送到这些 Topic 的事件使用 Kafka Message Key。
2、先决条件
首先,需要一个正在运行的 Kafka 实例。如果没有,可以使用 Kafka Docker 或根据 Kafka 快速入门指南 建立一个环境。下面的章节将假定我们已经有一个运行中的 Kafka 实例,且可以通过 kafka-server:9092 访问。
接下来,假设我们开发的是一个 “支付系统”,而且需要从命令行发送消息。
以下是对应的 Model 类:
// 支付事件
public class PaymentEvent {
private String reference;
private BigDecimal amount;
private Currency currency;
// 标准的 Getter / Setter 方法
}
另一个前提条件是要有访问 Kafka CLI 工具的权限,这很简单。首先,下载 Kafka 发行版,然后解压下载的文件,并进入解压后的文件夹中。在 bin
文件夹下就可以找到 Kafka CLI 工具。
本文后面内容中的所有 CLI 命令,都假设是在 Kafka 解码文件夹下执行的。
你可以考虑把
bin
目录添加到PATH
环境变量,从而可以在任意目录下都可以访问 Kafka CLI。
接下来,创建 payments Topic,如下:
bin/kafka-topics.sh --create --topic payments --bootstrap-server kafka-server:9092
控制台返回信息如下,表明 Topic 已成功创建:
Created topic payments.
最后,在 payments Topic 上创建一个 Kafka Consumer,以测试消息是否正确发送:
bin/kafka-console-consumer.sh --topic payments --bootstrap-server kafka-server:9092 --property "print.key=true" --property "key.separator=="
注意,上一条命令末尾指定了 print.key
属性。如果不明确将该属性设置为 true
,消费者就不会打印 message key。我们还覆盖了 key.separator
属性的默认值(\t Tab 制表符),以便与后续章节中生成消息的方式保持一致。
现在,我们可以开始从命令行发送 Key/Value 消息了。
3、从命令行发送 Key/Value 消息
使用 Kafka Console Producer 从命令行发送 Key/Value 信息:
bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator=="
在命令行(CLI)中想要与 Message Payload 一起提供 Message Key 时,需要使用上一条命令结尾的 parse.key
和 key.separator
属性。
运行前一条命令后,会出现一个提示,我们可以在其中提供 Message Key 和 Message Payload:
>KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
>KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}
从消费者输出中看到,Message Key 和 Message Payload 都成功地从命令行发送:
KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}
4、从文件发送 Key/Value 消息
从命令行发送 Key/Value 消息的另一种方法是使用文件。
首先,创建 payment-events.txt
文件,其内容如下:
KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"}
KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}
现在,启动 Console Producer,并使用 payment-events.txt
文件作为输入:
bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator==" < payment-events.txt
查看消费者输出,可以看到这次 Message Key 和 Message Payload 也都正确发送:
KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"}
KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}
5、总结
本文介绍了如何在 Kafka 中通过命令行(CLI)发送 Key/Value 消息,以及如何从文件发送 Key/Value 消息。
Ref:https://www.baeldung.com/kafka-emit-key-value-message-cli