Java JMS 读写 IBM MQ 队列

1、简介

本文将会带你了解如何使用 Java JMS(Java Message Service)从 IBM MQ 队列读写消息。

2、设置环境

我们可以在 Docker 容器中运行 IBM MQ,以避免手动安装和配置的复杂性。

使用以下命令以基本配置运行容器:

docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq

接下来,需要在 pom.xml 文件中添加 IBM MQ 客户端

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.4.0.0</version>
</dependency>

3、配置 JMS Connection

首先,我们需要用 QueueConnectionFactory 建立 JMS Connection(连接),用于创建与队列管理器(Queue Manager)的连接:

public class JMSSetup {
    public QueueConnectionFactory createConnectionFactory() throws JMSException {
        MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
        factory.setHostName("localhost");
        factory.setPort(1414);
        factory.setQueueManager("QM1");
        factory.setChannel("SYSTEM.DEF.SVRCONN"); 
        
        return factory;
    }
}

首先创建一个 MQQueueConnectionFactory 实例,用于配置和创建与 IBM MQ 服务器的连接。我们将主机名设置为 localhost,因为 MQ 服务器是在本地 Docker 容器内运行的。暴露的映射端口为 1414

然后,使用默认的 SYSTEM.DEF.SVRCONN channel。这是客户端连接 IBM MQ 的常用 channel。

4、写入消息到 IBM MQ 队列

本节将带你了解向 IBM MQ 队列发送消息的过程。

4.1、建立 JMS 连接

首先,创建 MessageSender 类。该类负责设置与 IBM MQ 服务器的连接、管理会话和处理消息发送操作。我们声明 QueueConnectionFactoryQueueConnectionQueueSessionQueueSender 实例的变量,这些变量将用于与 IBM MQ 服务器交互。

下面是 IBM MQ 连接设置、会话创建和消息发送的示例:

public class MessageSender {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;

    public MessageSender() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        sender = session.createSender(queue);
        connection.start();
    }

    // ...
}

接下来,在 MessageSender 的构造函数中,我们使用 JMSSetup 类初始化 QueueConnectionFactory。然后使用该工厂创建 QueueConnection。该连接允许我们与 IBM MQ 服务器交互。

连接建立后,我们使用 createQueueSession() 创建一个 QueueSession。该会话允许我们与队列通信。这里,我们通过 false 参数表示会话是非事务性的,通过 Session.AUTO_ACKNOWLEDGE 表示在收到消息时自动确认。

然后,定义特定队列 QUEUE1,并创建一个 QueueSender 来处理消息发送。最后,启动连接,以确保会话处于活动状态并准备好发送消息。

4.2、写入文本消息

现在,我们已经建立了连接、创建了会话、定义了队列并创建了消息生产者,可以向队列发送文本消息了:

public void sendMessage(String messageText) {
    try {
        TextMessage message = session.createTextMessage();
        message.setText(messageText);
        sender.send(message);
    } catch (JMSException e) {
        // 异常处理
    } finally {
        // 资源释放
    }
}

首先,创建一个接收 messageText 参数的 sendMessage() 方法。sendMessage() 方法负责向队列发送文本消息。它会创建一个 TextMessage 对象,并使用 setText() 方法设置消息内容。

接下来,使用 QueueSender 对象的 send() 方法将消息发送到定义的队列。由于只要 MessageSender 对象存在,连接和会话就会一直处于打开状态,因此这种设计可以实现高效的消息传输。

4.3、消息类型

除了 TextMessage 之外,IBM MQ 还支持其他多种消息类型,以满足不同的使用情况。例如,我们可以发送以下消息:

  • BytesMessage:以字节形式保存的原始二进制消息。
  • ObjectMessage:序列化的 Java 对象消息。
  • MapMessage:包含键值对的消息。
  • StreamMessage:包含原始数据类型流的消息。

5、从 IBM MQ 队列读取消息

接下来看看如何从队列中读取消息。

5.1、建立 JMS 连接并创建会话

首先,需要建立连接并创建会话,这与发送消息时的操作类似。

首先创建一个 MessageReceiver 类。该类处理与 IBM MQ 服务器的连接,并设置消息消费所需的组件:

public class MessageReceiver {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueReceiver receiver;

    public MessageReceiver() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        receiver = session.createReceiver(queue);
        connection.start();
    }

    // ...
}

在该类中,我们首先创建一个 QueueConnectionFactory 来建立与 IBM MQ 服务器的连接。然后,使用此连接创建一个 QueueSession,它允许我们与队列交互。

最后,定义了特定队列 QUEUE1,并创建了一个 QueueReceiver 来处理从队列传入的消息。

5.2、读取文本消息

连接、会话和 receiver 设置完成后,就可以开始从队列中接收消息了。

使用 QueueReceiverreceive() 方法从指定队列中读取消息:

public void receiveMessage() {
    try {
        Message message = receiver.receive(1000);
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
        } else {
            // ...
        }
    } catch (JMSException e) {
        // 异常处理
    } finally {
        // 资源释放
    }
}

receiveMessage() 方法中,我们使用 receive() 函数等待队列中的消息,超时时间为 1000 毫秒。收到消息后,会检查它是否是 TextMessage 类型。

如果是,则可以使用 getText() 方法获取实际的消息内容,该方法会以字符串形式返回文本内容。

6、消息属性和 Header

本节将介绍一些常用的 消息属性和 Header,我们可以在发送或接收消息时使用它们。

6.1、消息属性

消息属性(Message properties)可用于存储和检索消息正文以外的其他信息。这对于过滤消息或向消息中添加上下文数据特别有用。

以下是在发送消息时设置自定义属性的示例:

TextMessage message = session.createTextMessage();
message.setText(messageText);

// 设置属性值
message.setStringProperty("OrderID", "12345");

接下来,可以在接收消息时检索属性:

Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;

    // 读取属性值
    String orderID = message.getStringProperty("OrderID");
} 

6.2、消息 Header

消息 Header 供预定义字段,其中包括有关消息的元数据。一些常用的 消息 Header 包括:

  • JMSMessageID:由 JMS Provider 分配给每条消息的唯一标识符。可以使用此 ID 跟踪和记录消息。
  • JMSExpiration:定义消息过期时间(毫秒)。如果消息在此时间内未送达,则会被丢弃。
  • JMSTimestamp:消息发送时间。
  • JMSPriority:消息的优先级。

在接收消息时检索消息 Header:

Message message = receiver.receive(1000);

if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String messageId = message.getJMSMessageID();
    long timestamp = message.getJMSTimestamp();
    long expiration = message.getJMSExpiration();
    int priority = message.getJMSPriority();
}

7、使用 Mockito 进行模拟测试

在本节中,我们使用 Mockito 来模拟依赖,并验证 MessageSenderMessageReceiver 类的交互。

首先,使用 @Mock 注解创建依赖的 Mock 实例。

接下来,验证 sendMessage() 方法是否正确与 Mock(模拟)的 QueueSender 交互。我们模拟了 QueueSendersend() 方法,并验证 TextMessage 是否被正确创建:

String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));

messageSender.sendMessage(messageText);

verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);

最后,验证 receiveMessage() 方法能否与 Mock(模拟)的 QueueReceiver 正确交互。我们模拟 receive() 方法来返回预定义的 TextMessage,结果如我们所料,消息文本被检索到了:

when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");

messageReceiver.receiveMessage();
verify(textMessage).getText();

8、总结

本文介绍了如何使用 Java JMS 读写 IBM MQ 队列,详细介绍了如何设置 JMS 连接、会话以及消息生产者/接收者,还介绍了 IBM MQ 支持的几种消息类型以及如何使用自定义消息属性和消息 Header。


Ref:https://www.baeldung.com/java-message-service-ibm-mq-read-write