使用来自 MQ 的消息并在 Spring JMS 中合并 [英] Consuming messages from MQ and merging in Spring JMS

查看:28
本文介绍了使用来自 MQ 的消息并在 Spring JMS 中合并的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 spring + MQ + Websphere 应用服务器.

I use spring + MQ + Websphere Application server.

我想异步使用来自 MQ 的消息并将这些消息组合起来,以便在每次提交中轻松将 N 个实体持久化到数据库中.(没有强调我的目标 Oracle 数据库太多提交)

I want to consume messages from MQ asynchronously and combine the messages to have the List of messages to be easy to persist N number of Entities to Database in every single commit. (without stressing my target Oracle database with too many commits)

我使用 DefaultMessageListenerContainer 并使 onMessage 方法同步以添加消息(直到批量大小),并创建线程以等待条件满足(时间/大小)并将消息推送到另一个线程业务逻辑和数据库保持不变.

I use The DefaultMessageListenerContainer and I made the onMessage method synchronized to add on the messages(Till batch size) and I create the Thread to wait for the condition to meet(time/size) and push the messages to another thread which does Business Logic and DB persist.

线程启动条件:

一旦第一条消息到达 onMessage 方法中,线程必须等待看到在 1000 毫秒内收到 25 条消息,如果在 1000 毫秒内没有收到 25 条消息,它会将可用数量的消息推送到另一个线程.

Once the first message arrived inside onMessage method Thread has to wait to see 25 messages are received within 1000 milliseconds and If 25 messages are not reached within 1000 milliseconds it pushes the available number messages to another thread.

问题:

我看到线程仅在服务器启动期间启动,而不是在第一次调用 onMessage 方法时启动.

I see the Thread is started only during the server stat up and not when onMessage method is invoked first time.

有什么建议/其他方式可以实现从队列中收集消息吗?

Any suggestions/ other way please to achieve collecting messages from Queue?

applicationContext.xml

applicationContext.xml

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

听众:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });

    {
       thread.start();
    }

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (messageCount == 0) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }
}

推荐答案

您也需要同步对 messagesFromQueue 的访问.

you need to synchronize access to messagesFromQueue too.

List messagesFromQueue = Collections.synchronizedList(new ArrayList());
      ...
  synchronized (messagesFromQueue) {
      Iterator i = messagesFromQueue.iterator(); // Must be in synchronized block
      while (i.hasNext())
      ...
  }

https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)

在每次调用 processMsgsFromQueue 时,您都会遇到 NullPointerException!

on each call to processMsgsFromQueue you'll have a NullPointerException!!

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()/*messageFromQueue is null!!*/) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }

最好保留消息,当提交没问题时,您清除列表并重置计数器.

it is better to persist messages and when commit is ok you clear the list and reset the counter.

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });


    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
       messageCount = 0;
       messagesFromQueue =  null;
   }
}

这篇关于使用来自 MQ 的消息并在 Spring JMS 中合并的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆