Spring集成:带有标题丰富的自定义拆分器 [英] Spring Integration: Custom Splitter with Header Enrichment

查看:27
本文介绍了Spring集成:带有标题丰富的自定义拆分器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以实现可以返回Iterator添加自定义标头信息的消息拆分器?

例如,如果我有以下类

public class CsvFileToIteratorSplitter extends AbstractMessageSplitter {

    @Override
    protected Object splitMessage(Message<?> message) {
        Object payload = message.getPayload();
        Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");

        try {
            InputStream source = new FileInputStream((File) payload);
            BufferedReader reader = new BufferedReader(new InputStreamReader(source));

            String header = reader.lines().findFirst().orElse(null);

            return MessageBuilder.withPayload(reader.lines().iterator())
                    .setHeaderIfAbsent("HEADER", header)
                    .build();

        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

然后我可以添加到标头,但有效负载实际上是Iterator的实例,拆分失败

如果我修改使类现在是

public class CsvFileToIteratorSplitter extends AbstractMessageSplitter {

    @Override
    protected Object splitMessage(Message<?> message) {
        log.debug("{}", message.toString());

        Object payload = message.getPayload();
        Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");

        try {
            InputStream source = new FileInputStream((File) payload);
            BufferedReader reader = new BufferedReader(new InputStreamReader(source));

            return reader.lines().iterator();

        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

拆分正常,但我丢失了标题信息。

有没有办法让功能正常的拆分能够添加到标题中?

推荐答案

您应该返回一个Iterator<MessageBuilder<String>>.

@SpringBootApplication
public class So44604817Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So44604817Application.class, args);
        context.getBean("in", MessageChannel.class).send(new GenericMessage<>(new File("/tmp/foo.txt")));
        context.close();
    }

    @Bean
    @Splitter(inputChannel = "in")
    public MySplitter splitter() {
        MySplitter splitter = new MySplitter();
        splitter.setOutputChannelName("out");
        return splitter;
    }

    @Bean
    public MessageChannel out() {
        return new MessageChannel() {

            @Override
            public boolean send(Message<?> message) {
                return send(message, -1);
            }

            @Override
            public boolean send(Message<?> message, long timeout) {
                System.out.println(message);
                return true;
            }

        };
    }

    public static class MySplitter extends AbstractMessageSplitter {

        @SuppressWarnings("resource")
        @Override
        protected Object splitMessage(Message<?> message) {
            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");

            try {
                InputStream source = new FileInputStream((File) payload);
                final BufferedReader reader = new BufferedReader(new InputStreamReader(source));
                final String header = reader.lines().findFirst().orElse(null);
                final Iterator<String> iterator = reader.lines().iterator();
                Iterator<MessageBuilder<String>> builderIterator = new Iterator<MessageBuilder<String>>() {

                    private String next;

                    @Override
                    public boolean hasNext() {
                        if (this.next != null) { // handle multiple hasNext() calls.
                            return true;
                        }
                        if (!iterator.hasNext()) {
                            try {
                                reader.close();
                            }
                            catch (IOException e) {
                                e.printStackTrace();
                            }
                            return false;
                        }
                        else {
                            this.next = iterator.next();
                            // Handle empty last line
                            if (next.length() == 0 && !iterator.hasNext()) {
                                try {
                                    reader.close();
                                }
                                catch (IOException e) {
                                    e.printStackTrace();
                                }
                                return false;
                            }
                            return true;
                        }
                    }

                    @Override
                    public MessageBuilder<String> next() {
                        String line = this.next;
                        this.next = null;
                        return MessageBuilder
                                .withPayload(line).setHeaderIfAbsent("HEADER", header);
                    }

                };
                return builderIterator;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

    }

}

请注意,您的skip(1)是不正确的,因为第一行已经从阅读器中使用。

包含文件:

FOO,BAR
foo,bar
baz.qux

结果:

GenericMessage [payload=foo,bar, headers={sequenceNumber=1, HEADER=FOO,BAR, correlationId=42ce2e1f-5337-1f75-d4fe-0d7f366f76f1, id=94e98261-fd49-b4d0-f6a0-3181b27f145b, sequenceSize=0, timestamp=1497713691192}]
GenericMessage [payload=baz.qux, headers={sequenceNumber=2, HEADER=FOO,BAR, correlationId=42ce2e1f-5337-1f75-d4fe-0d7f366f76f1, id=c0b1edd6-adb9-3857-cb7c-70f603f376bc, sequenceSize=0, timestamp=1497713691192}]

JIRA Issue INT-4297 to add this functionality to FileSplitter

这篇关于Spring集成:带有标题丰富的自定义拆分器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆