如何使用 Apache Camel 聚合 CSV 行? [英] How to aggregate CSV lines with Apache Camel?

查看:31
本文介绍了如何使用 Apache Camel 聚合 CSV 行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个与此类似的 CSV:

I have a CSV similar to this:

County  City  Area  Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...

在 CSV 解析过程中,我需要聚合同一个县/市以创建这样的最终结构:

During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]

基本上是按县/市分组.

basically a grouping by county/city.

我用 Camel 尝试了不同的东西,这是最新的:

I tried different things with Camel, this is the latest:

class CsvAppender {
    CsvRow append(CsvRow existing, CsvRow next) {
        next.previous = existing
        next
    }
}

@CsvRecord(separator = "\\t")
class CsvRow {
    @DataField(pos = 1)
    private String county

    @DataField(pos = 2)
    private String city

    @DataField(pos = 3)
    private String area

    @DataField(pos = 4)
    private String street

    CsvRow previous

    boolean sameAggregateWithPrevious() {
        previous?.county == county && previous?.city == city
    }

    public String toString() {
        "${county} ${city} ${area} ${street}"
    }
}

class CsvRouteBuilder extends RouteBuilder {

    void configure() {
        CsvAppender appender = new CsvAppender()

        Closure predicate = { exchange ->
            def body = exchange.getIn().getBody(CsvRow.class)
            def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
            def nextAggregate = exchange.getIn().getHeader('NextAggregate')

            if (!currentAggregate) {
                currentAggregate = body.previous ? [ body.previous ] : []
                nextAggregate = []
            } else if (exchange.getIn().getHeader('AggregateComplete')) {
                currentAggregate = nextAggregate
                nextAggregate = []
            }

            def aggregateComplete = body.sameAggregateWithPrevious()
            if (aggregateComplete) {
                nextAggregate << body
            } else {
                currentAggregate << body
            }
            exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
                                         'NextAggregate': nextAggregate,
                                         'AggregateComplete': aggregateComplete])
            aggregateComplete
        }

        from("file:/tmp/folder?noop=true")
            .split(body().tokenize('\n')).streaming()
            .unmarshal().bindy(BindyType.Csv, CsvRow.class)
                .aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
                .process({
                    it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
                .convertBodyTo(String.class)
            .to("jms:myCsvSplitter")
    }
}

无论如何,我的解决方案并不能完全奏效,因为有时前一个"元素为空并且代码看起来过于冗长.

Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.

知道如何正确聚合 csv 文件吗?

Any idea how to aggregate the csv file properly?

推荐答案

我有一些可以工作的粗略代码,希望能对您有所帮助.它使用 Java 而不是 Groovy,因为我的 Groovy 不够用.不过应该很容易翻译.

I've got some rough code that works that should hopefully be good enough to help you along. It's in Java rather than Groovy, on the grounds that my Groovy isn't up to much. It should be easy enough to translate though.

首先是聚合器:

public class MyAgregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        CsvRow newBody = (CsvRow)newExchange.getIn().getBody();
        Map<String, List<CsvRow>> map = null;
        if (oldExchange == null) {
            map = new HashMap<String, List<CsvRow>>();
            ArrayList list = new ArrayList<CsvRow>();
            list.add(newBody);
            map.put(newBody.getCounty(), list);
            newExchange.getIn().setBody(map);
            return newExchange;
        } else {
            map = oldExchange.getIn().getBody(Map.class);
            List list = map.get(newBody.getCounty());
            if ( list == null ) {
                list = new ArrayList<CsvRow>();
            }
            list.add(newBody);
            map.put(newBody.getCounty(), list);

            oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete"));
            return oldExchange;
        }
    }
}

这将行存储在地图中的列表中,以县为键.

This stores the rows in a list in a map, keyed by the county.

然后是路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("file:/c:/dev/test?noop=true")
        .split(body().tokenize("\n"))
        .log("Read line ${body}")
        .unmarshal()
        .bindy(BindyType.Csv, CsvRow.class)
            .aggregate(constant(true), new MyAgregationStrategy()).completionPredicate(simple("${property.CamelSplitComplete} == true"))
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Map results = (Map) exchange.getIn().getBody();
                System.out.println("Got results for " + results.size() + " counties");
            }
        });
    }
}

它使用 CamelSplitComplete 属性来检测拆分何时完成.在最后的 processpr 中,您可以对地图进行任何您喜欢的操作.或者,您可以更改聚合器策略,以根据您需要的结果进行聚合.

It uses the CamelSplitComplete property to detect when the splitting is finished. In the processpr at the end you can then do what you like with the map. Alternatively you can change the aggregator strategy to aggregate however you need the results.

希望这会有所帮助.

这篇关于如何使用 Apache Camel 聚合 CSV 行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆