如何使用Apache Camel聚合CSV行? [英] How to aggregate CSV lines with Apache Camel?

查看:203
本文介绍了如何使用Apache Camel聚合CSV行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个与此类似的CSV文件:

I have a CSV similar to this:

County  City  Area  Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...

在CSV解析过程中,我需要汇总相同的县/市以创建如下所示的最终结构:

During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]

基本上是按县/市进行的分组.

basically a grouping by county/city.

我用骆驼尝试了不同的东西,这是最新的:

I tried different things with Camel, this is the latest:

class CsvAppender {
    CsvRow append(CsvRow existing, CsvRow next) {
        next.previous = existing
        next
    }
}

@CsvRecord(separator = "\\t")
class CsvRow {
    @DataField(pos = 1)
    private String county

    @DataField(pos = 2)
    private String city

    @DataField(pos = 3)
    private String area

    @DataField(pos = 4)
    private String street

    CsvRow previous

    boolean sameAggregateWithPrevious() {
        previous?.county == county && previous?.city == city
    }

    public String toString() {
        "${county} ${city} ${area} ${street}"
    }
}

class CsvRouteBuilder extends RouteBuilder {

    void configure() {
        CsvAppender appender = new CsvAppender()

        Closure predicate = { exchange ->
            def body = exchange.getIn().getBody(CsvRow.class)
            def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
            def nextAggregate = exchange.getIn().getHeader('NextAggregate')

            if (!currentAggregate) {
                currentAggregate = body.previous ? [ body.previous ] : []
                nextAggregate = []
            } else if (exchange.getIn().getHeader('AggregateComplete')) {
                currentAggregate = nextAggregate
                nextAggregate = []
            }

            def aggregateComplete = body.sameAggregateWithPrevious()
            if (aggregateComplete) {
                nextAggregate << body
            } else {
                currentAggregate << body
            }
            exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
                                         'NextAggregate': nextAggregate,
                                         'AggregateComplete': aggregateComplete])
            aggregateComplete
        }

        from("file:/tmp/folder?noop=true")
            .split(body().tokenize('\n')).streaming()
            .unmarshal().bindy(BindyType.Csv, CsvRow.class)
                .aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
                .process({
                    it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
                .convertBodyTo(String.class)
            .to("jms:myCsvSplitter")
    }
}

无论如何,我的解决方案无法完全正常工作,因为有时"previous"元素为null,并且代码看起来太冗长.

Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.

有什么想法如何正确聚合csv文件?

Any idea how to aggregate the csv file properly?

推荐答案

我有一些可行的粗略代码,希望可以很好地帮助您.它是用Java而不是Groovy编写的,理由是我的Groovy不够用.不过应该足够容易翻译.

I've got some rough code that works that should hopefully be good enough to help you along. It's in Java rather than Groovy, on the grounds that my Groovy isn't up to much. It should be easy enough to translate though.

首先是聚合器:

public class MyAgregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        CsvRow newBody = (CsvRow)newExchange.getIn().getBody();
        Map<String, List<CsvRow>> map = null;
        if (oldExchange == null) {
            map = new HashMap<String, List<CsvRow>>();
            ArrayList list = new ArrayList<CsvRow>();
            list.add(newBody);
            map.put(newBody.getCounty(), list);
            newExchange.getIn().setBody(map);
            return newExchange;
        } else {
            map = oldExchange.getIn().getBody(Map.class);
            List list = map.get(newBody.getCounty());
            if ( list == null ) {
                list = new ArrayList<CsvRow>();
            }
            list.add(newBody);
            map.put(newBody.getCounty(), list);

            oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete"));
            return oldExchange;
        }
    }
}

这会将行存储在地图中以县为键的列表中.

This stores the rows in a list in a map, keyed by the county.

然后选择路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("file:/c:/dev/test?noop=true")
        .split(body().tokenize("\n"))
        .log("Read line ${body}")
        .unmarshal()
        .bindy(BindyType.Csv, CsvRow.class)
            .aggregate(constant(true), new MyAgregationStrategy()).completionPredicate(simple("${property.CamelSplitComplete} == true"))
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Map results = (Map) exchange.getIn().getBody();
                System.out.println("Got results for " + results.size() + " counties");
            }
        });
    }
}

它使用CamelSplitComplete属性检测拆分完成的时间.然后,在最后的processpr中,您可以对地图执行所需的操作.另外,您可以更改聚合器策略以聚合,但是需要结果.

It uses the CamelSplitComplete property to detect when the splitting is finished. In the processpr at the end you can then do what you like with the map. Alternatively you can change the aggregator strategy to aggregate however you need the results.

希望这会有所帮助.

这篇关于如何使用Apache Camel聚合CSV行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆