如何将消息从Activemq推送到Java应用程序 [英] How can I push messages from Activemq to Java Application
问题描述
嗨开发者,
我想用 JMS编写2
库名称是 .java
文件 MessageProducer
和 MessageConsumer
。
我添加了 activemq-all-5.8.0.jar
和 commons-io-2.4.jar
我的 lib
文件夹中的文件。我从 61616 $更改了
Activemq
的端口号c $ c>到 61617
。
I added activemq-all-5.8.0.jar
and commons-io-2.4.jar
files in my lib
folder.and I changed port number of Activemq
from 61616
to 61617
.
使用 MessageProducer.java
文件,我将发送消息到 Activemq
。为此,我编写的代码工作正常。如果你想看到点击这个Link 。
using MessageProducer.java
file,I will send messages to Activemq
.For this I wrote code it's working fine.If you want to see click on this Link.
我想从 Activemq
到 MessageConsumer.java
。这是应用程序在 Apache Tomcat
(的http://本地主机:8080 / ExecutableF ileProcess / MessageConsumer
)
I want to send messages from Activemq
to MessageConsumer.java
.This is Application is in Apache Tomcat
(http://localhost:8080/ExecutableFileProcess/MessageConsumer
)
一旦 MessageConsumer
收到消息,它就会将消息分开 - 来自消息的正文,它只是在控制台上打印(仅供我测试)。为此我编写了以下2 java
files.But它不起作用。
Once MessageConsumer
receives the message, it separates the message-body from message and it just print on console(just for my testing).For this I wrote the following 2 java
files.But it's not working.
MessageConsumer.java:
MessageConsumer.java :
package PackageName;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
throws ServletException, IOException {
try {
//creating connectionfactory object for way
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
//establishing the connection b/w this Application and Activemq
Connection connection=connectionFactory.createConnection();
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=session.createQueue("MessageTesing");
javax.jms.MessageConsumer consumer=session.createConsumer(queue);
//fetching queues from Activemq
MessageListener listener = new MyListener();
consumer.setMessageListener(listener);
connection.start();
} catch (Exception e) {
// TODO: handle exception
}
}
}
MyListener.java:
MyListener.java :
package PackageName;
import javax.jms.Message;
import javax.jms.MessageListener;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
System.out.println(msg);
}
};
我没有在 Activemq控制台中为队列配置目的地
并且在从 MessageProducer.java
发送消息时我也没有提到目的地。
I didn't configure destination for Queue in Activemq console
and also I didn't mention "destination" while sending message from MessageProducer.java
.
I我正在使用Eclipse。如何在控制台中打印messagebody,实际上基于messagebody我将在我的 MessageConsumer.java
中执行一些操作。但是对于我的测试,我需要看到messagebody。
I am using Eclipse.How can I print messagebody in console,Actually based on messagebody I will do some operations in my MessageConsumer.java
.but for my testing I need to see messagebody.
我希望,你明白我在尝试什么。
I hope,you understand what I am trying.
我是的新手JMS
和 Java
,所以你能解释清楚。到目前为止,我用谷歌搜索编写了代码。但我没有找到任何问题。
I am new to JMS
and Java
,so can you explain clearly.So far I wrote the code using Google search.But I didn't find anywhere this issue.
任何人都可以建议我。
谢谢。
推荐答案
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class ActiveMQConnection {
private static final long timeout = 3000L;
private Logger logger = Logger.getLogger(ActiveMQConnection.class);
private String activeMQUser;
private String activeMQPassword;
private String activeMQURI;
private String activeMQQueueName;
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private boolean isConnected;
private boolean transacted = false;
private Destination destinationQueue;
private boolean isQueueAvailable;
private boolean isProducerAvailable;
private boolean isConsumerAvailable;
private MessageProducer producerForQueue;
private MessageConsumer consumerForQueue;
/**
* Create Default service
* @throws Exception
*/
public ActiveMQConnection() throws Exception
{
try {
throw new JMSException("No Parameters defined for creating connection. Try constructor with parameters.");
} catch (JMSException e) {
logger.error("JMS Exception in Active MQ Connection",e);
throw e;
} catch (Exception e) {
logger.error("JMS Exception in Active MQ Connection",e);
throw e;
}
}
/**
* Create a service with desired parameters.
* @param activeMQUser
* @param activeMQPassword
* @param activeMQURI
* @param activeMQQueueName
* @throws Exception
*/
public ActiveMQConnection(String activeMQUser, String activeMQPassword, String activeMQURI) throws Exception
{
try {
this.activeMQUser = activeMQUser;
this.activeMQPassword = activeMQPassword;
this.activeMQURI = activeMQURI;
setUpActiveMQConnection();
} catch (JMSException e) {
logger.error("JMS Exception in Active MQ Connection",e);
throw e;
} catch (Exception e) {
logger.error("Exception in Active MQ Connection",e);
throw e;
}
}
/**
* @throws JMSException, Exception
*/
private void setUpActiveMQConnection() throws JMSException, Exception
{
connectionFactory = new ActiveMQConnectionFactory(
this.activeMQUser,
this.activeMQPassword,
this.activeMQURI );
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
isConnected = true;
}catch (JMSException e) {
isConnected = false;
throw e;
}catch(Exception e){
isConnected = false;
throw e;
}
}
/**
* @throws Exception
*/
public void setUpQueue(String queueName ) throws Exception
{
this.activeMQQueueName = queueName;
createQueue();
createProducer();
createConsumer();
}
/**
* @throws Exception
*/
private void createQueue() throws Exception
{
try {
if(destinationQueue == null)
{
destinationQueue = session.createQueue(this.activeMQQueueName);
isQueueAvailable = true;
}
} catch (JMSException e) {
isQueueAvailable = false;
throw e;
}catch(Exception e){
isQueueAvailable = false;
throw e;
}
}
/**
* @throws JMSException
*
*/
private void createProducer() throws JMSException
{
if(producerForQueue == null)
{
try {
producerForQueue = session.createProducer(destinationQueue);
producerForQueue.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
isProducerAvailable = true;
} catch (JMSException e) {
isProducerAvailable = false;
throw e;
}
}
}
/**
* @throws JMSException
*/
private void createConsumer() throws JMSException
{
if(consumerForQueue == null)
{
try {
consumerForQueue = session.createConsumer(destinationQueue);
isConsumerAvailable = true;
} catch (JMSException e) {
isConsumerAvailable = false;
throw e;
}
}
}
/**
* @param objectToQueue
* @throws JMSException
*/
public void sendMessage(Serializable objectToQueue) throws JMSException
{
ObjectMessage message = session.createObjectMessage();
message.setObject(objectToQueue);
producerForQueue.send(message);
}
/**
* @param objectToQueue
* @throws JMSException
*/
public Serializable receiveMessage() throws JMSException
{
Message message = consumerForQueue.receive(timeout);
if (message instanceof ObjectMessage)
{
ObjectMessage objMsg = (ObjectMessage) message;
Serializable sobject = objMsg.getObject();
return sobject;
}
return null;
}
/**
* close-MQ-Connection
*/
public void closeMQConnection()
{
try
{
if(consumerForQueue != null)
{
consumerForQueue.close();
}
if(producerForQueue != null)
{
producerForQueue.close();
}
if(session != null)
{
session.close();
}
if(connection != null )
{
connection.close();
}
}
catch (JMSException e)
{
logger.info("Error while closing connection.",e);
}
finally
{
consumerForQueue = null;
producerForQueue = null;
destinationQueue = null;
session = null;
connection = null;
activeMQUser = null;
activeMQPassword = null;
activeMQQueueName = null;
activeMQURI = null;
}
}
public boolean isConnected() {
return isConnected;
}
public void setConnected(boolean isConnected) {
this.isConnected = isConnected;
}
public boolean isQueueAvailable() {
return isQueueAvailable;
}
public void setQueueAvailable(boolean isQueueAvailable) {
this.isQueueAvailable = isQueueAvailable;
}
public boolean isProducerAvailable() {
return isProducerAvailable;
}
public void setProducerAvailable(boolean isProducerAvailable) {
this.isProducerAvailable = isProducerAvailable;
}
public MessageConsumer getConsumerForQueue() {
return consumerForQueue;
}
public void setConsumerForQueue(MessageConsumer consumerForQueue) {
this.consumerForQueue = consumerForQueue;
}
public boolean isConsumerAvailable() {
return isConsumerAvailable;
}
public void setConsumerAvailable(boolean isConsumerAvailable) {
this.isConsumerAvailable = isConsumerAvailable;
}
}
以上是
-
创建连接
创建/连接到队列
发送消息
接收消息
- 您可以使用其中的方法发送或收到任何Serializable POJO。
creating connection
creating/connecting to queue
sending message
receiving message
- You can use methods in it to send or receive any Serializable POJO.
这篇关于如何将消息从Activemq推送到Java应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!