如何从Java程序创建对ibm mq amqp主题的持久订阅? [英] How to create durable subscription to ibm mq amqp topic from java program?

查看:104
本文介绍了如何从Java程序创建对ibm mq amqp主题的持久订阅?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们通过提供 clientId 和订户名称,使用 createDurableSubscriber 以编程方式创建了IBM MQ AMQP主题的订户.

We have programmatically created subscriber to IBM MQ AMQP TOPIC with createDurableSubscriber by providing clientId and subscriber name.

我们启动了程序,因此它订阅了TOPIC并停止了该程序.然后将msg发送到主题,然后再次启动接收器程序,但是我们无法接收已发送的msg,并释放消息(如果持久订阅则不会发生此消息).

We start the program so it subscribes to TOPIC and stop the program. Then send the msgs to topic and again start the receiver program again but we cannot receive the msgs sent and loose the messages which should not happen in case of durable subscription..

使用mqsc命令连接订户时,我们可以看到amqp主题及其持久订阅. DISPLAY TOPIC , DISPLAY TPSTATUS DISPLAY TPSTATUS SUB DISPLAY SUB SUBID ,但在停止订户程序时不显示.我们定义了属性 DEFPSIST(YES),并且客户端(生产者到主题)正在发送持久消息.

We can see amqp topic and its durable subscription when subscriber is connected using mqsc commands DISPLAY TOPIC, DISPLAY TPSTATUS, DISPLAY TPSTATUS SUB, DISPLAY SUB SUBID but not when subscriber program is stopped. We have defined attribute DEFPSIST(YES) and client(producer to topic) is sending persistent messages.

消息消失了,因为我们无法在订阅者的持久队列中看到消息?是否取决于到期属性?

Where are the messages gone as we cannot see messages in durable queues of subscriber? Does it depends on expiry attribute?

我们的订户在连接时 DISPLAY SUB SUBID 的输出.

Output of DISPLAY SUB SUBID for our subscriber when it is connected.

AMQ8096: WebSphere MQ subscription inquired.


SUBID("hex sub id")
   SUB(:private:CLINET01:TOPIC01)            TOPICSTR(TOPIC01)
   TOPICOBJ(SYSTEM.BASE.TOPIC)             DISTYPE(RESOLVED)
   DEST(SYSTEM.MANAGED.DURABLE.5F6B5C2524FB9AED)
   DESTQMGR(qm.name)                   PUBAPPID( )
   SELECTOR( )                             SELTYPE(NONE)
   USERDATA(010)
   PUBACCT(***************************************************)
   DESTCORL(***************************************************)
   DESTCLAS(MANAGED)                       DURABLE(YES)
   EXPIRY(0)                               PSPROP(MSGPROP)
   PUBPRTY(ASPUB)                          REQONLY(NO)
   SUBSCOPE(ALL)                           SUBLEVEL(1)
   SUBTYPE(API)                            VARUSER(FIXED)
   WSCHEMA(TOPIC)                          SUBUSER(mqm)
   CRDATE(2020-09-28)                      CRTIME(04:14:09)
   ALTDATE(2020-09-28)                     ALTTIME(04:14:09)

订户ID具有私有(不确定原因)和客户端ID,但没有订户名称Sub4.

Subscriber id has private(not sure why) and client id but not subscriber name which is sub4

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;

import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;

public class AMQPQueueExample1 implements Runnable  {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;

public void run(){
try{
 Connection connection = null;
 Context context = new InitialContext();
 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
 connection = connectionFactory.createConnection();
 connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
 MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420"); //"sub3");
System.out.println("TOPIC "+priceTopic);

connection.start();
while(true){
TextMessage   message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
           System.out.println("Subscriber 1 received : " + message1.getText());


}
}catch(Exception e){
e.printStackTrace();
}
}

 public static void main(String[] args)  {

AMQPQueueExample1 amp=new AMQPQueueExample1();
 Thread thread = new Thread(amp);
thread.start();


 }
}

值从jndi.properties文件中获取,用于上下文工厂和提供者url.

Values are taken from jndi.properties file for context factory and provider url.

推荐答案

从注释中看起来好像您正在使用MQ 8.0.0.5?如果是这种情况,则该版本的MQ不支持Apache Qpid JMS客户端.我相信使用该版本可以实现非常持久的非持久订阅 工作,但是其他任何JMS方法都不太可能.

It looks from the comments like you're using MQ 8.0.0.5? If that's the case then Apache Qpid JMS clients aren't supported with that version of MQ. I believe with that version a very basic non-durable subscribe might work, but any other JMS methods are unlikely to.

我怀疑那个版本的MQ无法完全理解Qpid JMS的AMQP 1.0流,所以订阅的有效期设置为0,而不是无限.

I suspect what is happening is the AMQP 1.0 flows from Qpid JMS aren't fully understood by that version of MQ, so the expiry of the subscription is being set to 0 rather than unlimited.

MQ 9.2添加了对更多JMS 2.0规范的支持-尽管不是每个JMS功能都支持.这里提供了有关所支持方法的更多信息:

MQ 9.2 added support for more of the JMS 2.0 spec - although not every JMS feature. There is more information about the methods which are supported here:

https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/com.ibm.mq.dev.doc/q125050_.htm

创建持久的订户和/或消费者应该可以按预期工作.

Creating durable subscribers and/or consumers should work as you expect.

这篇关于如何从Java程序创建对ibm mq amqp主题的持久订阅?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆