本站(springdoc.cn)中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

Spring Cloud Bus将分布式系统的节点与一个轻量级的 message broker 联系起来。然后,这个 broker 可以用来广播状态变化(如配置变化)或其他管理指令。一个关键的想法是,总线(bus)就像Spring Boot应用的一个分布式执行器,是可以扩展的。然而,它也可以被用作应用程序之间的通信 channel。这个项目提供了 AMQP broker 或 Kafka 作为传输的 starter。

Spring Cloud是在非限制性的Apache 2.0许可下发布的。如果你想为这部分文档做出贡献,或者发现错误,请在 github 的项目中找到源代码和 issue tracker。

1. 快速入门

如果Spring Cloud Bus在classpath上检测到自己,它就会通过添加Spring Boot的自动配置来工作。要启用总线(Bus),请将 spring-cloud-starter-bus-amqpspring-cloud-starter-bus-kafka 添加到你的依赖管理中。Spring Cloud 会处理剩下的事情。确保代理(broker)(RabbitMQ 或 Kafka)可用并已配置。当在本地主机上运行时,你不需要做任何事情。如果你远程运行,请使用Spring Cloud连接器(Connector)或Spring Boot约定来定义 broker 凭证,如下面Rabbit的例子中所示。

application.yml
spring:
  rabbitmq:
    host: mybroker.com
    port: 5672
    username: user
    password: secret

总线目前支持向所有监听的节点或某一特定服务的所有节点发送消息(由Eureka定义)。/bus/* actuator 命名空间有一些HTTP端点。目前,有两个已经实现。第一个,/bus/env,发送 key/value 对以更新每个节点的Spring Environment。第二个,/bus/refresh,重新加载每个应用程序的 configuration,就像它们都被 ping 到了它们的 /refresh 端点。

Spring Cloud Bus 的 starter 涵盖了Rabbit和Kafka,因为这是最常见的两种实现方式。然而,Spring Cloud Stream 是相当灵活的,而且该 binder 与 spring-cloud-bus 一起工作。

2. Bus 端点(Endpoint)

Spring Cloud Bus 提供了两个端点,/actuator/busrefresh/actuator/busenv,它们分别对应于 Spring Cloud Commons 中的单个 actuator 端点,/actuator/refresh/actuator/env

2.1. Bus Refresh 端点

/actuator/busrefresh 端点清除 RefreshScope 缓存并重新绑定 @ConfigurationProperties。更多信息请参见 Refresh Scope 文档。

为了暴露 /actuator/busrefresh 端点,你需要向你的应用程序添加以下配置。

management.endpoints.web.exposure.include=busrefresh

2.2. Bus Env 端点

/actuator/busenv 端点用指定的 key/value 对在多个实例中更新每个实例 environment。

为了暴露 /actuator/busenv 端点,你需要向你的应用程序添加以下配置。

management.endpoints.web.exposure.include=busenv

/actuator/busenv 端点接受 POST 请求,如下。

{
    "name": "key1",
    "value": "value1"
}

3. 找到一个实例

应用程序的每个实例都有一个服务 ID,其值可以用 spring.cloud.bus.id 设置,其值应是一个用冒号分隔的标识符列表,按从最不具体到最具体的顺序。默认值从环境中构建,作为 spring.application.nameserver.port(或 spring.application.index,如果设置)的组合。ID的默认值以 app:index:id 的形式构建,其中。

  • appvcap.application.name(如果存在的话),或者是 spring.application.name

  • index 是指 vcap.application.instance_index(如果存在的话)、spring.application.indexlocal.server.portserver.port0(按这个顺序)。

  • idvcap.application.instance_id,如果它存在的话,或者是一个随机值。

HTTP端点接受一个 “destination” 路径参数,如 /busrefresh/customers:9000,其中 destination 是一个服务ID。如果该ID为总线上的一个实例所拥有,它就会处理该消息,而其他所有实例都会忽略它。

4. 找到一个服务的所有实例

“destination” 参数在Spring PathMatcher 中使用(路径分隔符为冒号-:),以确定一个实例是否处理该消息。使用前面的例子,/busenv/customers:** 针对 “customers” 服务的所有实例,而不考虑服务ID的其他部分。

5. Service ID 必须唯一

总线尝试两次来消除对一个事件的处理—​一次来自原始 ApplicationEvent,一次来自队列。为此,它对照当前的服务ID检查发送的服务ID。如果一个服务的多个实例具有相同的ID,事件就不会被处理。在本地机器上运行时,每个服务都在一个不同的端口上,该端口是 ID 的一部分。Cloud Foundry 提供了一个索引来进行区分。为确保 ID 在 Cloud Foundry 之外是唯一的,请将 spring.application.index 设置为服务的每个实例的唯一内容。

6. 自定义 Message Broker

Spring Cloud Bus 使用 Spring Cloud Stream 来广播消息。因此,为了让消息流动起来,你只需要在classpath中包含你选择的 binder 实现。有一些方便的 starter 用于AMQP(RabbitMQ)和Kafka(spring-cloud-starter-bus-[amqp|kafka])的总线。一般来说,Spring Cloud Stream 依靠 Spring Boot 的自动配置约定来配置中间件。例如,AMQP Broker 地址可以通过 spring.rabbitmq.* 配置属性来改变。Spring Cloud Bus 在 spring.cloud.bus.* 中有少量的本地配置属性(例如,spring.cloud.bus.destination 是作为外部中间件使用的 topic 名称)。通常情况下,默认值就足够了。

要了解更多关于如何定制 message broker 设置的信息,请查阅 Spring Cloud Stream 文档。

7. 追踪总线事件

可以通过设置 spring.cloud.bus.trace.enabled=true 追踪总线事件(RemoteApplicationEvent 的子类)。如果你这样做,Spring Boot TraceRepository(如果它存在的话)会显示每个服务实例发送的每个事件和所有的acks。下面的例子来自于 /trace 端点。

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}

前面的追踪显示,一个 RefreshRemoteApplicationEventcustomers:9000 发出,广播给所有服务,并由 customers:9000stores:8081 接收(acked)。

要自己处理Ack信号,你可以为 AckRemoteApplicationEventSentApplicationEvent 类型添加一个 @EventListener 到你的应用程序(并启用跟踪)。另外,你也可以进入 TraceRepository,从那里挖掘数据。

任何总线应用都可以跟踪acks。然而,有时候,在一个中央服务中进行这项工作是很有用的,它可以对数据进行更复杂的查询,或者将其转发给专门的追踪服务。

8. 广播你自己的事件

总线可以携带任何类型的 RemoteApplicationEvent 的事件。默认的传输方式是JSON,deserializer 需要提前知道哪些类型将被使用。要注册一个新的类型,你必须把它放在 org.springframework.cloud.bus.event 的一个子包中。

要自定义事件名称,你可以在你的自定义类上使用 @JsonTypeName,或者依靠默认策略,即使用类的简单名称。

生产者和消费者都需要访问类的定义。

8.1. 在自定义包中注册事件

如果你不能或不想为你的自定义事件使用 org.springframework.cloud.bus.event 的子包,你必须通过使用 @RemoteApplicationEventScan 注解指定哪些包要扫描 RemoteApplicationEvent 类型的事件。用 @RemoteApplicationEventScan 指定的包,包括子包。

例如,考虑以下自定义事件,称为 MyEvent

package com.acme;

public class MyEvent extends RemoteApplicationEvent {
    ...
}

你可以通过以下方式向 deserializer 注册该事件。

package com.acme;

@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
    ...
}

如果不指定值,则使用 @RemoteApplicationEventScan 的类的包被注册。在这个例子中, com.acme 是通过使用 BusConfiguration 的包注册的。

你也可以通过使用 @RemoteApplicationEventScan 上的 valuebasePackagesbasePackageClasses 属性显式地指定要扫描的包,如下面的例子所示。

package com.acme;

@Configuration
//@RemoteApplicationEventScan({"com.acme", "foo.bar"})
//@RemoteApplicationEventScan(basePackages = {"com.acme", "foo.bar", "fizz.buzz"})
@RemoteApplicationEventScan(basePackageClasses = BusConfiguration.class)
public class BusConfiguration {
    ...
}

所有前面的 @RemoteApplicationEventScan 的例子都是等价的,即通过明确指定 @RemoteApplicationEventScan 上的包来注册 com.acme 包。

你可以指定多个 base package 进行扫描。

9. 配置属性

要查看所有与总线相关的配置属性列表,请查看 附录页面