一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程 [英] one SCDF source, 2 processors but only 1 processes each item

查看:52
本文介绍了一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的用例是对此的一个变体:

processorA 在上面,processorB 在下面

当我部署并运行它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 接收 A、B 和 C

我很困惑,如果我想要的行为是我可以在 Spring Cloud Data Flow 中发生的事情,或者是否有 RabbitMQ 设置(如消息删除的答案所暗示的那样)

就是当你设置自动确认标志时发生的事情.这样,消息一被消费就会被确认——所以从队列中消失了."

如果是这样的话,我可以在我的 Spring Cloud Data Flow 源中设置它还是它是一个 RabbitMQ 设置还是完全其他的东西

更新:

我已添加

spring.cloud.stream.bindings.input.group=consumerGroup

到我的处理器的 application.properties 文件.

不幸的是,两个处理器都接收到完全相同的数据.

我是否需要向我的源的 application.properties 添加类似的条目?

我需要更改处理器上的注释吗?目前,它是:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

我是否需要以任何方式修改源代码上的注释?目前,它是:

@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller =@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

包含@Poller 是否会以任何方式改变这一点?

更新:

属性是否名为 spring.cloud.stream.instanceCount?

解决方案

对于流应用,您需要设置 ...consumer.group 属性,以便它们都在同一组中并竞争消息.

但这应该在 SCDF 中自动发生.

My use case is a variation on this:

Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow

In the example, 1 source emits an item to rabbitmq and both processors get it.

I want the opposite. I want the source to emit items to rabbitmq but only 1 processor handles each item.

Lets pretend I have:

1 source named source 2 processors named processor1 and processor2

So source emits: A, B, C to rabbitmq

RabbitMQ will emit A

Whichever processor gets A first will process it - lets say processor1 is the lucky one and handles A.

Then RabbitMQ will emit B

Since processor1 is busy with A and processor2 is idle processor2 handles B

RabbitMQ will emits C

processor1 finished with A and is idle so processor1 handles C

The Spring Cloud Data Flow graph I came up with is:

processorA is the one on top, processorB is the lower one

When I deploy this and run it, source emits A, B and C then both processor1 and processor2 receive A, B and then C

I'm confused if the behavior I want is something I can make happen in Spring Cloud Data Flow OR if there is a RabbitMQ setting for this as implied by the answer that says message removal

"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."

If that's the case, can I set it in my Spring Cloud Data Flow source OR is it a RabbitMQ setting or is it something else entirely

UPDATE:

I have added

spring.cloud.stream.bindings.input.group=consumerGroup

to the application.properties file of my processor.

Unfortunately, both processors are receiving the exact same data.

Do I need to add a similar entry to the application.properties of my source?

Do I need to change the annotation on the processor? Currently, it is:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

Do I need to modify the annotation on the source in any fashion? Currently, it is:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = 
     @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

Does the inclusion of @Poller change this in any fashion?

UPDATE:

Is the property named spring.cloud.stream.instanceCount?

解决方案

For stream apps, you need to set the ...consumer.group property so they are both in the same group and compete for messages.

But that should happen automatically with SCDF.

这篇关于一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆