轮询对象的S3时等待池中的连接超时 [英] Timeout waiting for connection from pool while polling S3 for Objects

查看:421
本文介绍了轮询对象的S3时等待池中的连接超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个后端服务,该服务使用spring aws集成定期轮询S3存储桶,并处理来自S3的被轮询对象.下面是它的实现

I am working on a backend service which polls S3 bucket periodically using spring aws integration and processes the polled object from S3. Below is the implementation for it

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);   
        return messageSource;
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
    }

    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileReadingFlow() throws IOException {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
                .get();
    }
}

我从对象上传的S3中获取消息,并且能够使用作为消息有效负载的一部分接收到的输入流来处理它.但是我在这里面临的问题是,收到少量消息后出现超时,等待池中的连接"异常

I am getting the messages from S3 on object upload and I am able to process it using the input stream received as part of message payload. But the problem I face here is that I get 'Time out waiting for connection from pool' exception after receiving few messages

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

我知道问题与未关闭打开的S3Object有关,如此处所述 https://github.com/aws/aws-sdk-java/issues/1405 ,所以我实现了关闭作为消息有效负载的一部分接收到的S3Object的输入流.但这并不能解决问题,因此我不断收到例外.有人可以帮我解决此问题吗?

I know that the issue is related to not closing the opened S3Object like stated here https://github.com/aws/aws-sdk-java/issues/1405 so I have implemented closing the input stream of the S3Object received as part of message payload. But that does not solve the issue and I keep getting the exceptions. Can someone help me to fix this issue ?

推荐答案

您的问题仍然是在配置中将消息传递注释声明与Java DSL混合在一起.

Your problem that you still mix Messaging Annotations declarations with Java DSL in your configuration.

就像在fileReadingFlow中,您在代码processS3Object()方法中关闭了这些InputStream一样,但是对于@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))生成的InputStream则不执行任何操作. 你为什么要把它放在第一位呢?如果不使用该代码,什么使您保留该代码?

Looks like in the fileReadingFlow you close those InputStreams in your code processS3Object() method, but you do nothing with InputStreams produced by the @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")). Why do you have it in fist place at all? What makes you to keep that code if you don't use it?

S3StreamingMessageSource始终被两次轮询:由@InboundChannelAdapterIntegrationFlows.from()进行.

This S3StreamingMessageSource is polled all the time twice: by the @InboundChannelAdapter and IntegrationFlows.from().

您只需要从S3StreamingMessageSource bean定义中删除该@InboundChannelAdapter,仅此而已.

You just have to remove that @InboundChannelAdapter from the S3StreamingMessageSource bean definition and that's all.

请阅读更多参考手册,以确定这种注释的原因以及在使用Java DSL时如何不需要它:

Please, read more Reference Manual to determine the reason of such an annotation and how you don't need it when you use Java DSL:

https://docs.spring.io/spring -integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters

这篇关于轮询对象的S3时等待池中的连接超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆