如何异步从Activemq提取消息 [英] How can I pull messages from Activemq Asynchronously
问题描述
我想编写代码以从Activemq
提取消息.我不想一次从Activemq
提取所有消息,因为我的要求是每当我的Java应用程序从Activemq
收到1条消息时,基于消息正文,我将找到对应的HTTP Link
并转发到该链接.对于整个逻辑,我写了2个.java
文件名
I want to write code for pulling messages from Activemq
.I don't want to pull all the messages from Activemq
at a time,because my requirement is whenever my Java Application receives 1 message from Activemq
,based on message body I will find corresponding HTTP Link
and forward to that Link. For this entire logic I wrote 2 .java
files names are
MessageConsumer.java
MessageConsumer.java
MyListener.java
MyListener.java
MessageConsumer.java文件仅用于建立连接.相应的代码在下面.
MessageConsumer.java file only for connection establishing.The corresponding code is in below.
package PackageName;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
throws ServletException, IOException {
try {
//creating connectionfactory object for way
ConnectionFactory connectionFactory=new
ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=session.createQueue("MessageTesing");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//fetching queues from Activemq
MessageListener listener = new MyListener();
consumer.setMessageListener(listener);
connection.start();
System.out.println("Press a key to terminate");
}
catch (Exception e) {
// TODO: handle exception
}
}
}
MyListener.java文件用于触发相应的应用程序.代码在下面
MyListener.java file is for triggering corresponding Applications.code is in below
package PackageName;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
try {
TextMessage msg1=(TextMessage)msg;
//just for your understanding I mention dummy code
System.out.println(msg1.getText());
if (msg1.getText()=="Google") {
System.out.println("Forwarding http link to Google");
}
else {
System.out.println("Forwarding http link to Facebook");
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
在我的帖子中,我正在触发Google和Facebook链接.但就我的要求而言,我将调用自己的应用程序.每个应用程序都需要20分钟以上的时间,因此我想一个一地提取消息.那么只有它会收到来自Activemq
的另一条消息.
in my post, I am triggering Google and Facebook links.But As far my requirements I will call my own Applications.each Application taking more than 20 min.So I want to pull messages one by one.once previous message process completed then only it will receive another message from Activemq
.
但是正确的知道我一次收到所有消息.我该如何解决.我看过Activemq-Hellowworld
程序.我不理解.
But right know I am getting all the messages at a time.How can I fix this.I seen Activemq-Hellowworld
program.I didn't understand.
对不起,我是Java
技术的新手,谁能帮助我.
Sorry I am new to Java
technology.can anyone help me.
谢谢.
推荐答案
如果使用的是 MessageListener ,则实际上是异步接收消息(在另一个线程中)
If you are using a MessageListener, then you are actually receiving messages asynchronously (in another thread).
您可能正在寻找同步消息接收,因此请在您的主线程中尝试此操作:
You are probably looking for synchronous message reception, so try this in your main thread:
final QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
while (true) {
Message message = queueReceiver.receive();
// Process your message: insert the code from MyListener.onMessage here
// Possibly add an explit message.acknowledge() here,
// if you want to make sure that in case of an exception no message is lost
// (requires Session.CLIENT_ACKNOWLEDGE, when you create the queue session)
// Possibly terminate program, if a certain condition/situation arises
}
没有MessageListener
.
receive()
阻塞,直到消息可用为止,因此您的主线程(以及您的程序)将在receive
方法中等待.如果有消息到达,它将接收并处理它.
receive()
blocks until a message is available, so your main thread (and thus your program) waits in the receive
method. If a message arrives, it will receive and process it.
更新
如果您使用
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
那你应该打电话
message.acknowledge()
消息完全处理后.
在Session.AUTO_ACKNOWLEDGE
情况下,该消息立即从队列中删除(因此,如果程序在处理消息时终止,则消息会丢失).
Whereas in case of Session.AUTO_ACKNOWLEDGE
the message is removed from the queue immediately (and is therefore lost, if the program terminates whilte processing the message).
这篇关于如何异步从Activemq提取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!