具有JMS队列的同步使用者 [英] Synchronous Consumer with JMS Queue

查看:114
本文介绍了具有JMS队列的同步使用者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想以同步方式处理来自Glassfish 3中JMS队列的所有消息,因此我试图将GlassFish窗口中JMS Physical Destination中的Maximum Active Consumers属性从-1更改为1。我认为设置这个我将只有一个消费者同时执行OnMessage()。我遇到的问题是,当我更改该属性时,出现此错误:

  [I500]:捕获的JVM异常:org .xml.sax.SAXParseException:prolog中不允许使用内容。 

[I500]:捕获的JVM异常:com.sun.messaging.jms.JMSException:内容在prolog中不被允许。

sendMessage错误[C4038]:com.sun.messaging.jms.JMSException:在prolog中不允许使用内容。

如果有人知道使方法onmessage()同步的另一种方法,这是我的消费者类:

  @MessageDriven(mappedName =QueueListener,activationConfig = {
@ActivationConfigProperty(propertyName =确认模式,propertyValue =自动确认),
@ActivationConfigProperty(propertyName =destinationType,propertyValue =javax.jms.Queue)
})
public class MessageBean实现MessageListener {



@覆盖
public void onMessage(消息消息){
long t1 = System.currentTimeMillis();
写(MessageBean已收到+消息);
尝试{

TextMessage结果=(TextMessage)消息;
String text = result.getText();
write(OTAMessageBean消息ID已解析为+ text);
int messageID = Integer.valueOf(text);

AirProcessing aP = new AirProcessing();
aP.pickup(messageID);


$ b catch(Exception e){
raiseError(OTAMessageBean error+ e.getMessage());
}
long t2 = System.currentTimeMillis();
write(MessageBean已经在+(t2-t1)中完成);

}



}


Schedule ,它轮询来自每10秒排队:

  @Stateless 
public class MyReceiver {
@Resource(mappedName = jms / MyQueueFactory)
private QueueConnectionFactory connectionFactory;
@Resource(mappedName =jms / MyQueue)
private Queue myQueue;
private QueueConnection qc;
私人QueueSession会话;
私人MessageConsumer消费者;


@PostConstruct
void init(){
try {
qc = connectionFactory.createQueueConnection();
session = qc.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
consumer = session.createConsumer(myQueue);
qc.start();
catch(JMSException e){
抛出新的RuntimeException(e);



@PreDestroy
void cleanup()throws JMSException {
qc.close();
}

@Schedule(hour =*,minute =*,second =* / 10,persistent = false)
public void onMessage()throws JMSException {
消息消息; ((message = consumer.receiveNoWait())!= null){
ObjectMessage objMsg =(ObjectMessage)消息;
可序列化的内容;
尝试{
content = objMsg.getObject();

//做某事。与内容在这里

message.acknowledge();
} catch(JMSException ex){
ex.printStackTrace();
}
}
}
}


I want to process all the messages from a JMS Queue in Glassfish 3 in a synchronous way so I have tried to change the property Maximum Active Consumers from -1 to 1 in JMS Physical Destination in Glassfish window. I think setting this I will have only one Consumer executing OnMessage() at the same time. The problem I have reached its that when I change that property I got this error:

[I500]: Caught JVM Exception: org.xml.sax.SAXParseException: Content is not allowed in prolog.

[I500]: Caught JVM Exception: com.sun.messaging.jms.JMSException: Content is not allowed in prolog.

sendMessage Error [C4038]: com.sun.messaging.jms.JMSException: Content is not allowed in prolog.

If anyone know another way to make the method onmessage() synchronous will be appreciated. This is my Consumer Class:

@MessageDriven(mappedName = "QueueListener", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageBean implements MessageListener {



@Override
public void onMessage(Message message) {
    long t1 = System.currentTimeMillis();
    write("MessageBean has received " + message);
    try{

        TextMessage result=(TextMessage)message;
        String text=result.getText();
        write("OTAMessageBean message ID has resolved to " + text);
        int messageID=Integer.valueOf(text);

        AirProcessing aP=new AirProcessing();
        aP.pickup(messageID);


    }
    catch(Exception e){
        raiseError("OTAMessageBean error " + e.getMessage());
    }
   long t2 = System.currentTimeMillis();
   write("MessageBean has finished in " + (t2-t1)); 

}



}

解决方案

I had the same problem, the only solution I found was to set up a Schedule which polls the messages from the queue every ten seconds:

@Stateless
public class MyReceiver {
   @Resource(mappedName = "jms/MyQueueFactory")
   private QueueConnectionFactory connectionFactory;
   @Resource(mappedName = "jms/MyQueue")
   private Queue myQueue;
   private QueueConnection qc;
   private QueueSession session;
   private MessageConsumer consumer;


   @PostConstruct
   void init() {
       try {
         qc = connectionFactory.createQueueConnection();
         session = qc.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
         consumer = session.createConsumer(myQueue);
         qc.start();
       } catch (JMSException e) {
         throw new RuntimeException(e);
       }
   }

   @PreDestroy
   void cleanup() throws JMSException {
     qc.close();
   }

   @Schedule(hour = "*", minute = "*", second = "*/10", persistent = false)
   public void onMessage() throws JMSException {
     Message message;
     while ((message = consumer.receiveNoWait()) != null) {
       ObjectMessage objMsg = (ObjectMessage) message;
       Serializable content;
       try {
         content = objMsg.getObject();

         //Do sth. with "content" here

         message.acknowledge();
       } catch (JMSException ex) {
         ex.printStackTrace();
       }
    }
  }
}

这篇关于具有JMS队列的同步使用者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆