处理多条消息 [英] Multiple message processed

查看:33
本文介绍了处理多条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spring xd 源模块,它从 s3 中提取文件并逐行拆分.我的 spring 配置如下.但我有 3 个容器和 1 个管理服务器.现在我看到每个容器正在处理重复的消息他们每个人都在那里下载自己的副本.我可以通过将源 s3 模块部署计为 1 来解决,但我的消息处理速度变慢了.?有任何输入可以解决这个问题吗?

I have a spring xd source module which pulls file from s3 and splits line by line.I have my spring config as below.But I have 3 container and 1 admin server.Now I see duplicate message being processed by each container as each of them is downloading there own copy. I can solve with making source s3 module deployment count as 1 but my processing of message is getting slow.?Any inputs to solve this?

   <int:poller fixed-delay="${fixedDelay}" default="true">
    <int:advice-chain>
            <ref bean="pollAdvise"/>

            </int:advice-chain>
    </int:poller>


    <bean id="pollAdvise" 

    </bean>






    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
        <property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/>
        <property name="secretKey" value="${secretKey}"/>
    </bean>


    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <property name="proxyHost" value="${proxyHost}"/>
        <property name="proxyPort" value="${proxyPort}"/>
    <property name="preemptiveBasicProxyAuth" value="false"/> 
    </bean>

    <bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
        <constructor-arg index="0" ref="credentials"/>
        <constructor-arg index="1" ref="clientConfiguration"/>
        <property name="awsEndpoint" value="s3.amazonaws.com"/>
        <property name="temporaryDirectory" value="${temporaryDirectory}"/>
        <property name="awsSecurityKey"  value="${awsSecurityKey}"/>
    </bean>

    <bean id="encryptedDatum" class="abc"/>

    <!-- aws-endpoint="https://s3.amazonaws.com"  -->
    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
                                        bucket="${bucket}"
                                        s3-operations="s3Operations"
                                        credentials-ref="credentials"
                                        file-name-wildcard="${fileNameWildcard}"
                                        remote-directory="${remoteDirectory}"
                                        channel="splitChannel"
                                        local-directory="${localDirectory}"
                                        accept-sub-folders="false"
                                        delete-source-files="true"
                                        archive-bucket="${archiveBucket}"
                                        archive-directory="${archiveDirectory}">
    </int-aws:s3-inbound-channel-adapter>

  <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">

        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>

</int-file:splitter>

    <int:channel id="output"/>

[更新]我按照您的建议添加了元数据存储的幂等性.但是由于我的 xd 在 3 个容器集群中运行,rabbit 将简单的元数据存储工作吗?我想我应该使用 reds/mongo 元数据源.如果我使用 mongo/redis 元数据存储,我怎么能驱逐/删除消息,因为消息会随着时间的推移而堆积?

[Updated] I added the idempotency as suggested by you with a metadata store.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?I think I should use reds/mongo metadata source.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?

<int:idempotent-receiver id="expressionInterceptor" endpoint="output"
                              metadata-store="store"
                             discard-channel="nullChannel"
                             throw-exception-on-rejection="false"
                              key-expression="payload"/>

    <bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

推荐答案

我可以建议你看看 幂等接收器.

I can suggest you to take a look to the Idempotent Receiver.

您可以使用共享的 MetadataStore 并且不接受重复文件.

With that you can use shared MetadataStore and don't accept duplicate files.

应该为您的 配置.是的:使用丢弃逻辑来避免重复消息.

The <idempotent-receiver> should be configured for that your <int-file:splitter>. And yes: with the discard logic to avoid duplicate message.

更新

.但是由于我的 xd 运行在带有 rabbit 的 3 个容器集群中,所以简单的元数据存储可以工作吗?

.But since my xd is running in 3 container cluster with rabbit will simple metadatastore work?

这无关紧要,因为您从 S3 MessageSource 启动流,因此您应该过滤已经存在的文件.因此你需要外部共享MetadataStore.

That doesn't matter because you start the stream from the S3 MessageSource, so you should filter files already there. Therefore you need external shared MetadataStore.

.如果我使用 mongo/redis 元数据存储,我如何驱逐/删除消息,因为消息会随着时间的推移而堆积?

.If I use mongo/redis metadatastore howcan i evict/remove the messages because messages will pile up over time?

没错.这是幂等接收器逻辑的副作用.如果您使用数据库,不确定这对您有什么问题...

That's correct. It is a side affect of the Idempotent Receiver logic. Not sure how it is a problem for you if you use a DataBase...

您可以通过一些定期任务来清理集合/键.也许每周一次...

You can clean the collection/keys by some periodic task. Maybe once a week...

这篇关于处理多条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆