异步接收来自MQ的多条消息 [英] Receiving multiple messages from MQ asynchronously

查看:95
本文介绍了异步接收来自MQ的多条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在应用程序中使用Spring + Hibernate + JPA.

I use Spring + Hibernate + JPA in my application.

我需要从Websphere MQ中读取消息,并将消息插入DB. 有时可能有连续的消息可用,有时消息的数量很少,有时我们期望没有来自Queue的消息.

I need to read the message from Websphere MQ and insert the message to DB. Sometimes there may be continuous messages available and sometimes very less number of messages and sometimes we can expect no message from Queue.

当前,我正在逐一阅读消息并将其插入数据库.但这对性能没有太大帮助.

Currently I'm reading the message one by one and inserting them to Database. But it does not help much in terms of performance.

我的意思是当我有大量消息时(例如Queue中的300k消息),我无法更快地插入它们.每秒插入数据库的实体数量不是很高.因为我确实对每个实体都进行了承诺.

I mean when I have chunk of messages(Example 300k messages in Queue) I could not insert them faster. Number of entities inserted to DB per second is not so high. Because I do commit for every single entity.

我想使用休眠批处理,以便可以在一次提交中插入实体列表. (例如:每次提交30至40条消息)

I want to use hibernate batch processing, so that I can insert list of entities in a single commit. (Example: 30 to 40 messages per commit)

问题:

  1. 如何从Queue接收多条消息? (我检查过BatchMessageListenerContainer可能会有所帮助.但是我无法获得一些参考)

  1. How to receive multiple messages from Queue? (I have checked that BatchMessageListenerContainer may be helpful. But I could not get some reference)

我应该在onMessage方法之外分离数据库插入过程吗?这样该线程将被释放到池中,并且可用于从Queue中挑选下一条消息吗?

Should I separate the db insertion process out side onMessage method? So that thread will be released to pool and be available for picking next messages from Queue?

并行线程使用情况?

当前实施:

消息监听器:

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

监听器类:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    @Autowired
    private MyService myService;

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage textMessage = (TextMessage) message;
             // parse the message
             // Process the message to DB
        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }
}

推荐答案

目前尚不清楚您的要求是什么.

It's not clear what your requirements are.

Spring Batch项目提供了 BatchMessageListenerContainer .

The Spring Batch project provides a BatchMessageListenerContainer.

消息侦听器容器,用于通过配置提供的建议来拦截消息接收. 要在单个事务中启用消息批处理,请在通知链中使用TransactionInterceptor和RepeatOperationsInterceptor(在基类中设置或不设置事务管理器).然后,容器将不使用接收单个消息并对其进行处理,而将使用RepeatOperations在同一线程中接收多个消息.与RepeatOperations和事务拦截器一起使用.如果事务拦截器使用XA,则使用XA连接工厂,或者使用TransactionAwareConnectionFactoryProxy来将JMS会话与正在进行的事务同步(在失败后增加重复消息的可能性).在后一种情况下,您将不需要在基类中提供事务管理器-它仅在进行时就阻止了JMS会话与数据库事务的同步.

Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.

这篇关于异步接收来自MQ的多条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆