Mule File入站流:控制线程数 [英] Mule File Inbound Flow : Control Number of threads
问题描述
我想控制文件入站和消息处理器中的线程数.假设如果我的输入目录中有5个文件,那么我应该能够一次处理2个文件.一旦处理了这些文件(文件内容由消息处理器处理),则只能选择其他文件.我曾尝试在流级别上使用syncnus处理策略,但是它只处理一个文件,我想要多个线程,但是每个线程将从接收文件开始就处理消息,以发送响应.我尝试了大卫建议的方法,但是它也不起作用.一次只拾取一个文件.
I want to control the number of threads in file inbound and message processor . Suppose if i have 5 files in my input directory then i should be able to process 2 files at a time . Once these files are processed (files content is processed by message processor ) , then onlly it should pick up other files . I have tried using synchronus processing strategy at flow level , but it processes only one file , i want multiple threads but each thread will process the message right from receiving file to send the response . I tried the method which david has suggested , but it is also not working . picks up only one file at a time .
<flow name="fileInboundTestFlow2" doc:name="fileInboundTestFlow2" processingStrategy="synchronous">
<poll frequency="1000">
<component class="FilePollerComponent" doc:name="File Poller"></component>
</poll>
<collection-splitter />
<request-reply >
<vm:outbound-endpoint path="out"/>
<vm:inbound-endpoint path="response">
<collection-aggregator/>
</vm:inbound-endpoint>
</request-reply>
<file:outbound-endpoint path="E:/fileTest/processed" />
</flow>
public class FilePollerComponent implements Callable{
private String pollDir="E://fileTest" ;
private int numberOfFiles = 3;
public String getPollDir()
{
return pollDir;
}
public void setPollDir(String pollDir)
{
this.pollDir = pollDir;
}
public int getNumberOfFiles()
{
return numberOfFiles;
}
public void setNumberOfFiles(int numberOfFiles)
{
this.numberOfFiles = numberOfFiles;
}
@Override
public Object onCall(MuleEventContext eventContext) throws Exception
{
File f = new File(pollDir);
List<File> filesToReturn = new ArrayList<File>(numberOfFiles);
if(f.isDirectory())
{
File[] files = f.listFiles();
int i = 0;
for(File file : files)
{
if(file.isFile())
filesToReturn.add(file);
if(i==numberOfFiles)
break ;
i++;
}
}
else
{
throw new Exception("Invalid Directory");
}
return filesToReturn;
}}
推荐答案
文件入站端点是一个轮询器,因此它使用一个线程.如果使流同步,则意味着要搭载该单个线程,从而一次处理一个文件.
The file inbound endpoint is a poller so it uses one thread. If you make the flow synchronous, you're piggybacking this single thread, thus process one file at a time.
您需要创建仅允许2个线程的流处理策略.使用允许500个线程的以下示例为例:
You need to create a flow processing strategy that allows 2 threads only. Use the following that allows 500 threads as an example: http://www.mulesoft.org/documentation/display/current/Flow+Processing+Strategies#FlowProcessingStrategies-Fine-TuningaQueued-AsynchronousProcessingStrategy
编辑:以上建议不满足此要求:
EDIT: The above proposal doesn't satisfy this requirement:
如果我配置了3个线程,则将从输入目录中选择三个文件,直到处理完这些文件,再不选择其他文件
if i configure 3 threads , then three files will be picked from input directory and and untill these files are processed , no other file should be picked
实际上,以上建议始终将并行处理3个文件,而不是每3个文件进行处理.
Indeed, the above proposal would always have 3 files being processed in parallel instead of being processed in groups of 3.
所以我正在提议这种替代方法:
So I'm proposing this alternative approach:
- 将流处理策略配置为同步
- 使用
poll
元素作为来源 - 在其中放置一个自定义组件,该组件将从可配置目录中选择3个不同的文件.无需锁定任何内容,因为流的同步策略可防止重新进入.返回
java.util.List
的java.io.File
s. - 在其后添加
collection-splitter
. - 在入站端点中添加带有
aggregator
的request-reply
,以实现fork-join模式(
- Configure the flow processing strategy to synchronous
- Use a
poll
element as the source - Place a custom component in it, which selects 3 different files from a configurable directory. No need to lock anything because the synchronous strategy of the flow prevents re-entrance. Return a
java.util.List
ofjava.io.File
s. - Add a
collection-splitter
after it. - Add a
request-reply
with anaggregator
in the inbound endpoint to implement a fork-join pattern (http://blogs.mulesoft.org/aggregation-with-mule-fork-and-join-pattern/). Processing of the files will happen in another flow, while the polling flow will block until all 3 files have been processed.
这篇关于Mule File入站流:控制线程数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!