使用 Apache Camel 文件组件实现自定义流程策略 [英] Implementing a Custom Process Strategy with Apache Camel File Component

查看:15
本文介绍了使用 Apache Camel 文件组件实现自定义流程策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在开发一个基于 Camel 的 ETL 应用程序,该应用程序处理出现在日期目录中的文件组.这些文件需要作为一个由文件名开头确定的组一起处理.只有将完成的文件(.flag")写入目录后,才能处理这些文件.我知道骆驼文件组件有一个完成文件选项,但这只允许您检索与完成文件同名的文件.应用程序需要持续运行并在日期滚动时开始轮询第二天的目录.

I am currently working on a camel based ETL application that processes groups of files as they appear in a dated directory. The files need to be processed together as a group determined by the beginning of the file name. The files can only be processed once the done file (".flag") has been written to the directory. I know the camel file component has a done file option, but that only allows you to retrieve files with the same name as the done file. The application needs to run continuously and start polling the next day's directory when the date rolls.

示例目录结构:

 /process-directory
      /03-09-2011
      /03-10-2011
           /GROUPNAME_ID1_staticfilename.xml
           /GROUPNAME_staticfilename2.xml
           /GROUPNAME.flag
           /GROUPNAME2_ID1_staticfilename.xml
           /GROUPNAME2_staticfilename2.xml
           /GROUPNAME2_staticfilename3.xml
           /GROUPNAME2.flag


我有以下启动处理的路线(名称被混淆):

I have the following route (names obfuscated) that kicks off the processing:

@Override
public void configure() throws Exception 
{
    getContext().addEndpoint("processShare", createProcessShareEndpoint());

    from("processShare")
        .process(new InputFileRouter())
        .choice()
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
                .to("seda://type1?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
                .to("seda://type2?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
                .to("seda://type3?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
                .to("seda://type4?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
                .to("seda://type5?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
                .to("seda://type6?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
                .to("seda://type7?size=1")
            .otherwise()
                .log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}


我的问题是如何配置文件端点.我目前正在尝试以编程方式配置端点,但运气不佳.到目前为止,我在骆驼方面的经验主要是使用 Spring DSL 而不是 Java DSL.


My problems are around how to configure the file endpoint. I'm currently trying to programatically configure the endpoint without a lot of luck. My experience in camel thus far has been predominently using the Spring DSL and not the Java DSL.

我尝试实例化 FileEndpoint 对象的路线,但每当路线构建时,我都会收到一条错误消息,指出文件属性为空.我相信这是因为我应该创建一个 FileComponent 而不是端点.我不会在不使用 uri 的情况下创建端点,因为我无法使用 uri 在目录名称中指定动态日期.

I went down the route of trying to instantiate a FileEndpoint object, but whenever the route builds I get an error saying that the file property is null. I believe this is because I should be creating a FileComponent and not an endpoint. I'm not creating the endpoint without using a uri because I am not able to specify the dynamic date in the directory name using the uri.

private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
    {
        FileEndpoint endpoint = new FileEndpoint();

        //Custom directory "ready to process" implementation.
        endpoint.setProcessStrategy(getContext().getRegistry().lookup(
                "inputFileProcessStrategy", MyFileInputProcessStrategy.class));

        try 
        {
            //Controls the number of files returned per directory poll.
            endpoint.setMaxMessagesPerPoll(Integer.parseInt(
                    PropertiesUtil.getProperty(
                            AdapterConstants.OUTDIR_MAXFILES, "1")));
        } 
        catch (NumberFormatException e) 
        {
            throw new ConfigurationException(String.format(
                    "Property %s is required to be an integer.", 
                    AdapterConstants.OUTDIR_MAXFILES), e);
        }

        Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();

        //Controls the delay between directory polls.
        consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_POLLING_MILLIS));

        //Controls which files are included in directory polls.
        //Regex that matches file extensions (eg. {SOME_FILE}.flag)
        consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");

        endpoint.setConsumerProperties(consumerPropertiesMap);

        GenericFileConfiguration configuration = new GenericFileConfiguration();

        //Controls the directory to be polled by the endpoint.
        if(CommandLineOptions.getInstance().getInputDirectory() != null)
        {
            configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
        }
        else
        {
            SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConstants.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));

            configuration.setDirectory(
                    PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\" +
                    dateFormat.format(new Date()));
        }

        endpoint.setConfiguration(configuration);

        return endpoint;


  1. 在这种情况下实施 GenericFileProcessingStrategy 是否正确?如果是这样,在某个地方有这样的例子吗?我查看了骆驼文件单元测试,并没有看到任何突然出现在我身上的东西.

  1. Is implementing a GenericFileProcessingStrategy the correct thing to do in this situation? If so, is there an example of this somewhere? I have looked through the camel file unit tests and didn't see anything that jumped out at me.

我在配置端点时做错了什么?我觉得清理这个烂摊子的答案与问题 3 相关.

What am I doing wrong with configuring the endpoint? I feel like the answer to cleaning up this mess is tied in with question 3.

您能否将文件端点配置为在轮询和日期更改时滚动日期文件夹?

Can you configure the file endpoint to roll dated folders when polling and the date changes?

一如既往地感谢您的帮助.

As always thanks for the help.

推荐答案

您可以使用 processStrategy 选项从端点 uri 引用自定义 ProcessStrategy,例如 file:xxxx?processStrategy=#myProcess.请注意我们如何在值前面加上 # 表示它应该从注册表中查找它.所以在 Spring XML 中你只需添加一个<bean id="myProcess" ...>标记

You can refer to a custom ProcessStrategy from the endpoint uri using the processStrategy option, eg file:xxxx?processStrategy=#myProcess. Notice how we prefix the value with # to indicate it should lookup it from the registry. So in Spring XML you just add a <bean id="myProcess" ...> tag

在 Java 中,从 CamelContext API 获取端点可能更容易:

In Java its probably easier to grab the endpoint from the CamelContext API:

FileEndpoint file = context.getEndpoint("file:xxx?aaa=123&bbb=456", FileEndpoint.class);

这允许您预先配置端点.当然,之后您可以使用 FileEndpoint 上的 API 来设置其他配置.

This allows you to pre configure the endpoint. And of course afterwards you can use the API on FileEndpoint to set other configurations.

这篇关于使用 Apache Camel 文件组件实现自定义流程策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆