spring-integration-aws动态文件下载 [英] spring-integration-aws dynamic file download

本文介绍了spring-integration-aws动态文件下载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要根据消息内容从S3下载文件.换句话说,要下载的文件以前是未知的,我必须在运行时搜索并找到它.

I've a requirement to download a file from S3 based on a message content. In other words, the file to download is previously unknown, I've to search and find it at runtime. S3StreamingMessageSource doesn't seem to be a good fit because:

  1. 它依赖于在我需要等待消息的地方进行轮询.
  2. 我找不到在流中动态创建 S3StreamingMessageSource 的任何方法. gateway(IntegrationFlow)看起来很有趣,但是我需要的是一个不存在的 gateway(Function< Message<?> ;, IntegrationFlow>).
  1. It relies on polling where as I need to wait for the message.
  2. I can't find any way to create a S3StreamingMessageSource dynamically in the middle of a flow. gateway(IntegrationFlow) looks interesting but what I need is a gateway(Function<Message<?>, IntegrationFlow>) that doesn't exist.

另一个候选人是

Another candidate is S3MessageHandler but it has no support for listing files which I need for finding the desired file.

我可以直接使用AWS API实现自己的消息处理程序,只是想知道我是否丢失了某些东西,因为这似乎不是一个不寻常的要求.毕竟,并非每个应用程序都坐在那里,并不断轮询S3中是否有新文件.

I can implement my own message handler using AWS API directly, just wondering if I'm missing something, because this doesn't seem like an unusual requirement. After all, not every app just sits there and keeps polling S3 for new files.

推荐答案

对于遇到此问题的任何人,这就是我所做的.诀窍是:

For anyone coming across this question, this is what I did. The trick is to:

  1. 稍后(而不是在构造时)设置过滤器.请注意,由于没有 addFilters getFilters 方法,因此过滤器只能设置一次,以后不能添加.@ artem-bilan,这很不方便.
  2. 手动调用 S3StreamingMessageSource.receive .

  1. Set filters later, not at construction time. Note that there is no addFilters or getFilters method, so filters can only be set once, and can't be added later. @artem-bilan, this is inconvenient.
  2. Call S3StreamingMessageSource.receive manually.

.handle(String.class, (fileName, h) -> {
if (messageSource instanceof S3StreamingMessageSource) {
    S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;

    ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
    chainFileListFilter.addFilters(
            new S3SimplePatternFileListFilter("**/*/*.json.gz"),
            new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
            new S3FileListFilter(fileName)
    );
    s3StreamingMessageSource.setFilter(chainFileListFilter);

    return s3StreamingMessageSource.receive();
}
log.warn("Expected: {} but got: {}.",
        S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
return messageSource.receive();
}, spec -> spec
    .requiresReply(false) // in case all messages got filtered out
)

这篇关于spring-integration-aws动态文件下载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆