无法执行 HTTP 请求:超时等待来自 Flink 池中的连接 [英] Unable to execute HTTP request: Timeout waiting for connection from pool in Flink

查看:82
本文介绍了无法执行 HTTP 请求:超时等待来自 Flink 池中的连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个应用程序,该应用程序将一些文件上传到 s3 存储桶,稍后,它从 s3 存储桶读取文件并将其推送到我的数据库.

I'm working on an app which uploads some files to an s3 bucket and at a later point, it reads files from s3 bucket and pushes it to my database.

我使用 Flink 1.4.2fs.s3a API 从 s3 存储桶读取和写入文件.

I'm using Flink 1.4.2 and fs.s3a API for reading and write files from the s3 bucket.

将文件上传到 s3 存储桶工作正常,没有任何问题,但是当我的应用程序的第二阶段从 s3 读取这些上传的文件开始时,我的应用程序抛出以下错误:

Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:155)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:702)
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at org.apache.flink.api.common.io.GenericCsvInputFormat.open(GenericCsvInputFormat.java:301)
at org.apache.flink.api.java.io.CsvInputFormat.open(CsvInputFormat.java:53)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:160)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:37)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

我能够通过增加 s3a API 的最大连接参数来控制这个错误.

I was able to control this error by increasing the max connection parameter for s3a API.

截至目前,我在 s3 存储桶中有大约 1000 个文件,这些文件由我的应用在 s3 存储桶中推送和拉取我的最大连接是3000.我正在使用 Flink 的并行性从 s3 存储桶上传/下载这些文件.我的任务管理器数量是 14.这是一个间歇性失败,我也有针对这种情况的成功案例.

As of now, I have around 1000 files in the s3 bucket which is pushed and pulled by my app in the s3 bucket and my max connection is 3000. I'm using Flink's parallelism to upload/download these files from s3 bucket. My task manager count is 14. This is an intermittent failure, I'm having success cases also for this scenario.

我的查询是,

  1. 为什么我会遇到间歇性故障?如果我设置的最大连接数很低,那么我的应用每次运行时都应该抛出这个错误.
  2. 有什么方法可以计算我的应用程序运行所需的最佳连接数,而不会遇到连接池超时错误?或者这个错误是否与我不知道的其他事情有关?

谢谢提前

推荐答案

一些评论,基于我通过 Flink(批处理)工作流处理来自 S3 的大量文件的经验:

Some comments, based on my experience with processing lots of files from S3 via Flink (batch) workflows:

  1. 在读取文件时,Flink 会根据文件的数量和每个文件的大小计算拆分".每个拆分都是单独读取的,因此理论上的最大同时连接数不是基于文件数,而是基于文件和文件大小的组合.
  2. HTTP 客户端使用的连接池在一段时间后释放连接,因为能够重用现有连接是一种胜利(服务器/客户端握手不必发生).因此,这会为池中的可用连接数量带来一定程度的随机性.
  3. 连接池的大小不会对内存产生太大影响,因此我通常将其设置得相当高(例如,最近的工作流程为 4096).
  4. 使用 AWS 连接代码时,bump 的设置为 fs.s3.maxConnections,这与纯 Hadoop 配置不同.
  1. When you are reading the files, Flink will calculate "splits" based on the number of files, and each file's size. Each split is read separately, so the theoretical max # of simultaneous connections isn't based on the # of files, but a combination of files and file sizes.
  2. The connection pool used by the HTTP client releases connections after some amount of time, as being able to reuse an existing connection is a win (server/client handshake doesn't have to happen). So that introduces a degree of randomness into how many available connections are in the pool.
  3. The size of the connection pool doesn't impact memory much, so I typically set it pretty high (e.g. 4096 for a recent workflow).
  4. When using AWS connection code, the setting to bump is fs.s3.maxConnections, which isn't the same as a pure Hadoop configuration.

这篇关于无法执行 HTTP 请求:超时等待来自 Flink 池中的连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆