本站(springdoc.cn)中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

Spring框架为集成消息系统提供了广泛的支持,从使用 JmsTemplate 简化JMS API的使用到异步接收消息的完整基础设施。 Spring AMQP为高级消息队列协议提供了一个类似的功能集。 Spring Boot还为 RabbitTemplate 和RabbitMQ提供了自动配置选项。 Spring WebSocket原生包括对STOMP消息传递的支持,Spring Boot通过starter和少量的自动配置对其进行支持。 Spring Boot还支持Apache Kafka。

1. JMS

jakarta.jms.ConnectionFactory 接口提供了一个创建 jakarta.jms.Connection 的标准方法,用于与JMS broker进行交互。 尽管Spring需要一个 ConnectionFactory 来与JMS一起工作,但你一般不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。 (详见Spring框架参考文档的 相关部分 。) Spring Boot还自动配置了必要的基础设施来发送和接收消息。

1.1. 支持 ActiveMQ

ActiveMQ 在classpath 上可用时,Spring Boot 可以配置一个 ConnectionFactory

如果你使用 spring-boot-starter-activemq,就会提供连接到ActiveMQ实例的必要依赖,以及与JMS集成的Spring基础设施。

ActiveMQ的配置由 spring.activemq.* 的外部配置属性控制。默认情况下,ActiveMQ被自动配置为使用 TCP transport,默认连接到 tcp://localhost:61616。下面的例子显示了如何改变默认的 broker URL:

Properties
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

默认情况下,CachingConnectionFactory 用合理的设置包装了原生 ConnectionFactory,你可以通过 spring.jms.* 中的外部配置属性来控制:

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果你想使用原生连接池,你可以通过添加 org.messaginghub:pooled-jms 的依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下例所示:

Properties
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
参见 ActiveMQProperties 以了解更多支持的选项。你也可以注册任意数量的实现 ActiveMQConnectionFactoryCustomizer 的 Bean 来进行更高级的定制。

默认情况下,如果 destination 还不存在,ActiveMQ会创建一个 destination,以便根据其提供的名称来解析 destination。

1.2. 支持ActiveMQ Artemis

当Spring Boot检测到Classpath上有 ActiveMQ Artemis 时,它可以自动配置一个 ConnectionFactory。如果存在broker,就会自动启动和配置嵌入式broker(除非明确设置了mode属性)。支持的mode有 embedded(明确表示需要嵌入式broker,如果broker在classpath上不可用,就会出现错误)和 native(使用 netty 传输协议连接到broker)。当配置了后者时,Spring Boot会配置一个 ConnectionFactory,以默认设置连接到本地机器上运行的broker。

如果你使用 spring-boot-starter-artemis,将提供连接到现有ActiveMQ Artemis实例的必要依赖,以及与JMS集成的Spring基础架构。 添加 org.apache.activemq:artemis-jakarta-server 到你的应用程序中,你可以使用嵌入式模式。

ActiveMQ Artemis的配置是由 spring.artemis.* 中的外部配置属性控制。 例如,你可以在 application.properties 中声明以下部分。

Properties
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

在嵌入代理时,你可以选择是否要启用持久性,并列出应该被提供的目的地。 这些可以指定为逗号分隔的列表,用默认选项创建,或者你可以定义 org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration 类型的bean,分别用于高级队列和主题配置。

默认情况下,CachingConnectionFactory 用合理的设置包装了本地的 ConnectionFactory,你可以通过 spring.jms.* 的外部配置属性来控制。

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果你想使用native pool,你可以通过添加对 org.messaginghub:pooled-jms 的依赖,并相应配置 JmsPoolConnectionFactory 来实现,如下面的例子所示。

Properties
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

参见 ArtemisProperties 了解更多支持的选项。

不涉及JNDI查询,目的地是根据它们的名字来解决的,使用Artemis配置中的 name 属性或通过配置提供的名字。

1.3. 使用JNDI ConnectionFactory

如果你在应用服务器中运行你的应用程序,Spring Boot会尝试使用JNDI来定位JMS ConnectionFactory。 默认情况下,会检查 java:/JmsXAjava:/XAConnectionFactory 位置。 如果你需要指定另一个位置,可以使用 spring.jms.jndi-name 属性,如下例所示。

Properties
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. 发送消息

Spring的 JmsTemplate 是自动配置的,你可以将其直接自动连接到你自己的Bean中,如下例所示。

Java
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

    // ...

    fun someMethod() {
        jmsTemplate.convertAndSend("hello")
    }

}
JmsMessagingTemplate 可以以类似的方式注入。 如果定义了 DestinationResolverMessageConverter Bean,它就会自动与自动配置的 JmsTemplate 相关联。

1.5. 收取消息

当JMS基础设施存在时,任何Bean都可以用 @JmsListener 来注解,以创建一个监听器端点。 如果没有定义 JmsListenerContainerFactory,会自动配置一个默认的。 如果定义了 DestinationResolverMessageConverterjakarta.jms.ExceptionListener Bean,它们将自动与默认工厂相关联。

默认情况下,默认工厂是事务性的。 如果你在一个存在 JtaTransactionManager 的基础设施中运行,它就会默认与监听器容器关联。 如果没有,sessionTransacted 标志将被启用。 在后一种情况下,你可以通过在你的监听器方法(或其委托)上添加 @Transactional,将你的本地数据存储事务与传入消息的处理联系起来。 这可以确保在本地事务完成后,传入的消息被确认。 这也包括发送在同一JMS会话中执行的响应消息。

下面的组件在 someQueue 目标上创建一个监听器端点。

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }

}
更多细节请参见 @EnableJms 的Javadoc。

如果你需要创建更多的 JmsListenerContainerFactory 实例,或者你想覆盖默认值,Spring Boot提供了一个 DefaultJmsListenerContainerFactoryConfigurer,你可以用它来初始化一个 DefaultJmsListenerContainerFactory,设置与自动配置的那个相同。

例如,下面的例子暴露了另一个使用特定 MessageConverter 的工厂。

Java
import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后你可以在任何 @JmsListener 注解的方法中使用该工厂,如下所示。

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

2. AMQP

高级消息队列协议(AMQP)是一个平台中立、面向消息的中间件的线级协议。 Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 Spring Boot为通过RabbitMQ与AMQP合作提供了一些便利,包括 spring-boot-starter-amqp “Starter”。

2.1. 支持 RabbitMQ

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展和可移植的消息代理。Spring 使用 RabbitMQ 来通过 AMQP 协议进行通信。

RabbitMQ 配置由 spring.rabbitmq.* 中的外部配置属性控制。 例如,你可以在 application.properties 中声明以下部分。

Properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

另外,你也可以使用 addresses 属性配置相同的连接。

Properties
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
当以这种方式指定地址时,hostport 属性会被忽略。 如果地址使用 amqps 协议,SSL支持将自动启用。

请参阅 RabbitProperties 以了解更多支持的基于属性的配置选项。 要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的低级细节,请定义一个 ConnectionFactoryCustomizer bean。

如果上下文中存在一个 ConnectionNameStrategy bean,它将被自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行全应用程序的、附加的定制,请使用 RabbitTemplateCustomizer Bean。

请参阅 了解 AMQP(RabbitMQ 使用的协议)以了解更多细节。

2.2. 发送消息

Spring的 AmqpTemplateAmqpAdmin 是自动配置的,你可以将它们直接自动连接到你自己的Bean中,如下例所示。

Java
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

    // ...

    fun someMethod() {
        amqpAdmin.getQueueInfo("someQueue")
    }

    fun someOtherMethod() {
        amqpTemplate.convertAndSend("hello")
    }

}
RabbitMessagingTemplate 可以以类似的方式注入。 如果定义了一个 MessageConverter bean,它将自动与自动配置的 AmqpTemplate 相关联。

如果有必要,任何定义为 Bean 的 org.springframework.amqp.core.Queue 都会自动用于在 RabbitMQ 实例上声明一个相应的队列。

为了重试操作,你可以在 AmqpTemplate 上启用重试(例如,在broker连接丢失的情况下)。

Properties
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下,重试是禁用的。 你也可以通过声明一个 RabbitRetryTemplateCustomizer bean,以编程方式定制 RetryTemplate

如果你需要创建更多的 RabbitTemplate 实例,或者你想覆盖默认值,Spring Boot提供了一个 RabbitTemplateConfigurer bean,你可以用它来初始化 RabbitTemplate,设置与自动配置使用的工厂相同。

2.3. 发送消息到一个流(Stream)

要向一个特定的流发送消息,请指定该流的名称,如下例所示。

Properties
spring.rabbitmq.stream.name=my-stream
Yaml
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer Bean,它将自动与自动配置的 RabbitStreamTemplate 相关联。

如果你需要创建更多的 RabbitStreamTemplate 实例,或者你想覆盖默认值,Spring Boot提供了一个 RabbitStreamTemplateConfigurer bean,你可以用它来初始化 RabbitStreamTemplate,设置与自动配置使用的工厂相同。

2.4. 接收消息

当Rabbit基础设施存在时,任何Bean都可以用 @RabbitListener 来注解以创建一个监听器端点。 如果没有定义 RabbitListenerContainerFactory,就会自动配置一个默认的 SimpleRabbitListenerContainerFactory,你可以使用 spring.rabbitmq.listener.type 属性切换到直接容器。 如果定义了一个 MessageConverterMessageRecoverer Bean,它将自动与默认工厂相关联。

下面的示例组件在 someQueue 队列上创建一个监听器端点。

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }

}
更多细节见 @EnableRabbit 的Javadoc

如果你需要创建更多的 RabbitListenerContainerFactory 实例,或者你想覆盖默认值,Spring Boot提供了一个 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,你可以用它来初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,设置与自动配置使用的工厂一样。

你选择哪种容器类型并不重要。 这两个Bean是由自动配置暴露的。

例如,下面的配置类暴露了另一个使用特定 MessageConverter 的工厂。

Java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后你可以在任何 @RabbitListener 注解的方法中使用该工厂,如下所示

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

你可以启用重试以处理你的监听器抛出异常的情况。 默认情况下,使用 RejectAndDontRequeueRecoverer,但你可以自己定义一个 MessageRecoverer。 当重试用尽时,消息会被拒绝,或者被丢弃,或者被路由到一个死信exchange,如果broker被配置为这样做的话。 默认情况下,重试被禁用。 你也可以通过声明一个 RabbitRetryTemplateCustomizer bean,以编程方式定制 RetryTemplate

默认情况下,如果重试被禁用,并且监听器抛出了一个异常,那么将无限期地重试交付。 你可以通过两种方式修改这种行为。将 defaultRequeueRejected 属性设置为 false,以便尝试零次重发,或者抛出一个 AmqpRejectAndDontRequeueException,以示该消息应被拒绝。 后者是在启用重试和达到最大发送尝试次数时使用的机制。

3. 支持 Apache Kafka

Apache Kafka是通过提供 spring-kafka 项目的自动配置来支持的。

Kafka配置由 spring.kafka.* 的外部配置属性控制。例如,你可以在 application.properties 中声明以下部分。

Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建一个主题,需要添加一个 NewTopic 类型的bean。 如果该主题已经存在,该Bean将被忽略。

更多支持的选项见 KafkaProperties

3.1. 发送消息

Spring的 KafkaTemplate 是自动配置的,你可以在自己的Bean中直接自动连接它,如下例所示。

Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

    // ...

    fun someMethod() {
        kafkaTemplate.send("someTopic", "Hello")
    }

}
如果定义了 spring.kafka.producer.transaction-id-prefix 这个属性,就会自动配置一个 KafkaTransactionManager。 另外,如果定义了 RecordMessageConverter bean,它将自动与自动配置的 KafkaTemplate 相关联。

3.2. 接收消息

当Apache Kafka基础设施存在时,任何Bean都可以用 @KafkaListener 来注解,以创建一个监听器端点。 如果没有定义 KafkaListenerContainerFactory,就会用 spring.kafka.listener.* 中定义的key自动配置一个默认的。

下面的组件在 someTopic 主题上创建了一个监听器端点。

Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

如果定义了 KafkaTransactionManager Bean,它将自动与容器工厂相关联。 同样,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener bean,它将自动与默认工厂关联。

根据监听器的类型,一个 RecordMessageConverterBatchMessageConverter bean被关联到默认工厂。 如果批处理监听器只有一个 RecordMessageConverter Bean,它将被包裹在一个 BatchMessageConverter 中。

自定义的 ChainedKafkaTransactionManager 必须被标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean。

3.3. Kafka Streams

Spring for Apache Kafka提供了一个工厂bean来创建 StreamsBuilder 对象并管理其流的生命周期。 只要 kafka-streams 在classpath上并且Kafka Streams被 @EnableKafkaStreams 注解启用,Spring Boot就会自动配置所需的 KafkaStreamsConfiguration bean。

启用Kafka流意味着必须设置应用ID和bootstrap server。 前者可以使用 spring.kafka.streams.application-id 进行配置,如果没有设置,则默认为 spring.application.name。 后者可以全局设置,也可以只对流进行特别重写。

几个额外的属性可以使用专用属性;其他任意的Kafka属性可以使用 spring.kafka.streams.properties 命名空间来设置。 更多信息请参见额外的Kafka属性

要使用工厂Bean,请将 StreamsBuilder 接入你的 @Bean,如下例所示。

Java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

    @Bean
    fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
        val stream = streamsBuilder.stream<Int, String>("ks1In")
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
        return stream
    }

    private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
        return KeyValue(key, value.uppercase())
    }

}

默认情况下,由 StreamBuilder 对象管理的 Stream 会自动启动。 你可以使用 spring.kafka.streams.auto-startup 属性来定制这种行为。

3.4. 额外的Kafka属性

自动配置支持的属性显示在附录的 “集成 Properties” 部分。 注意,在大多数情况下,这些属性(连字符或驼峰)直接映射到Apache Kafka的点状属性。 详情见Apache Kafka文档。

这些属性中的前几个适用于所有组件(生产者、消费者、管理员和流),但如果你想使用不同的值,可以在组件级别上指定。 Apache Kafka将属性的重要性指定为高、中、低。 Spring Boot的自动配置支持所有高重要性属性、一些选定的中度和低度属性,以及没有默认值的任何属性。

只有Kafka支持的一部分属性可以通过 KafkaProperties 类直接使用。 如果你想用不直接支持的额外属性来配置生产者或消费者,请使用以下属性。

Properties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这将普通的 prop.one Kafka属性设置为 first(适用于生产者、消费者和管理员),prop.two 管理员属性为 secondprop.three 消费者属性为 thirdprop.four 生产者属性为 fourthprop.five streams 属性为 fifth

你也可以按以下方式配置Spring Kafka的 JsonDeserializer

Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样地,你可以禁用 JsonSerializer 默认行为,即在header中发送类型信息。

Properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的Properties将覆盖Spring Boot明确支持的任何configuration项目。

3.5. 使用嵌入式Kafka进行测试

Spring for Apache Kafka提供了一种方便的方式来测试带有嵌入式Apache Kafka代理的项目。 要使用这个功能,请用 spring-kafka-test 模块中的 @EmbeddedKafka 注解一个测试类。 更多信息,请参见Spring for Apache Kafka 参考手册

为了让Spring Boot自动配置与上述嵌入式Apache Kafka代理一起工作,你需要将嵌入式代理地址的系统属性(由 EmbeddedKafkaBroker 填充)重新映射为Apache Kafka的Spring Boot配置属性。 有几种方法可以做到这一点。

  • 提供一个系统属性,将嵌入式代理地址映射到测试类中的 spring.kafka.bootstrap-servers

Java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
  • @EmbeddedKafka 注解上配置一个属性名称。

Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
  • 在配置属性中使用一个占位符。

Properties
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4. RSocket

RSocket 是一个用于字节流传输的二进制协议。它通过单个连接上的异步消息传递实现对称的交互模型。

Spring框架的 spring-messaging 模块在客户端和服务器端都提供了对RSocket请求者和响应者的支持。 参见Spring框架参考资料中的 RSocket部分,以了解更多细节,包括RSocket协议的概述。

4.1. RSocket策略的自动配置

Spring Boot自动配置了一个 RSocketStrategies bean,它提供了编码和解码RSocket有效载荷所需的所有基础设施。 默认情况下,自动配置将尝试配置以下内容(按顺序)。

  1. 使用Jackson的 CBOR 编解码器

  2. 使用Jackson的JSON编解码器

spring-boot-starter-rsocket 启动器提供了这两种依赖。 请参阅Jackson支持部分,了解更多关于定制的可能性。

开发人员可以通过创建实现 RSocketStrategiesCustomizer 接口的bean来定制 RSocketStrategies 组件。 请注意,它们的 @Order 很重要,因为它决定了编解码器的顺序。

4.2. RSocket服务器的自动配置

Spring Boot提供RSocket服务器的自动配置。 所需的依赖由 spring-boot-starter-rsocket 提供。

Spring Boot允许从WebFlux服务器通过WebSocket暴露RSocket,或建立一个独立的RSocket服务器。 这取决于应用程序的类型及其配置。

对于WebFlux应用程序(即类型为 WebApplicationType.REACTIVE ),只有在以下属性相匹配时,RSocket服务器才会被插入(以插件形式)Web服务器。

Properties
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
将RSocket插入web服务器只支持Reactor Netty,因为RSocket本身就是用该库构建的。

另外,RSocket TCP或websocket服务器作为一个独立的、嵌入式的服务器被启动。 除了依赖要求,唯一需要的配置是为该服务器定义一个端口。

Properties
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

4.3. 支持Spring Messaging RSocket

Spring Boot将为RSocket自动配置Spring Messaging基础设施。

这意味着Spring Boot将创建一个 RSocketMessageHandler bean,以处理对你的应用程序的RSocket请求。

4.4. 用RSocketRequester调用RSocket服务

一旦服务器和客户端之间建立了 RSocket channel,任何一方都可以向对方发送或接收请求。

作为一个服务器,你可以在RSocket @Controller 的任何处理方法上被注入一个 RSocketRequester 实例。 作为客户端,你需要首先配置并建立一个RSocket连接。 Spring Boot为这种情况自动配置一个 RSocketRequester.Builder,并应用任何 RSocketConnectorConfigurer Bean。

RSocketRequester.Builder 实例是一个原型Bean,这意味着每个注入点将为你提供一个新的实例。 这样做是有目的的,因为这个构建器是有状态的,你不应该用同一个实例创建具有不同设置的Requester。

下面的代码显示了一个典型的例子。

Java
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}
Kotlin
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {

    private val rsocketRequester: RSocketRequester

    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }

    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }

}

5. Spring Integration

Spring Boot为与 Spring Integration 合作提供了一些便利,包括 spring-boot-starter-integration “Starter”。 Spring Integration提供了对消息传递的抽象,也提供了其他传输方式,如HTTP、TCP等。 如果Spring Integration在你的classpath上可用,它将通过 @EnableIntegration 注解被初始化。

Spring Integration的轮询逻辑依赖于自动配置的 TaskScheduler。 默认的 PollerMetadata(每秒轮询无限制数量的消息)可以通过 spring.integration.poller.* 配置属性来定制。

Spring Boot还配置了一些由额外的Spring Integration模块的存在所触发的功能。 如果 spring-integration-jmx 也在classpath上,消息处理的统计数据会通过JMX发布。 如果 spring-integration-jdbc 可用,可以在启动时创建默认的数据库模式,如下面一行所示。

Properties
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果 spring-integration-rsocket 可用,开发者可以使用 "spring.rsocket.server.*" 属性配置RSocket服务器,让它使用 IntegrationRSocketEndpointRSocketOutboundGateway 组件来处理传入的RSocket消息。 这个基础设施可以处理Spring Integration RSocket channel 适配器和 @MessageMapping 处理程序(鉴于 "spring.integration.rsocket.server.message-mapping-enabled" 已被配置)。

Spring Boot还可以使用配置属性自动配置 ClientRSocketConnector

Properties
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
Properties
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

更多细节请参见 IntegrationAutoConfigurationIntegrationProperties 类。

6. WebSockets

Spring Boot为嵌入式Tomcat、Jetty和Undertow提供WebSockets自动配置。 如果你将war文件部署到独立的容器中,Spring Boot会假定容器负责配置其WebSocket支持。

Spring框架为MVC Web应用程序提供了 丰富的WebSocket支持,可以通过 spring-boot-starter-websocket 模块轻松访问。

WebSocket支持也可用于 响应式web应用程序,需要在 spring-boot-starter-webflux 中包含WebSocket API。

<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
</dependency>

7. 接下来要读什么

下一节描述了如何在你的应用程序中启用IO功能。你可以在这一节中读到关于 缓存邮件验证rest 客户端等内容。