Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner [英] Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner
问题描述
我目前正在使用 Kafka Connect S3接收器连接器3.3.1 会将Kafka消息复制到S3,处理后期数据时出现OutOfMemory错误.
I'm currently working with the Kafka Connect S3 Sink Connector 3.3.1 to copy Kafka messages over to S3 and I have OutOfMemory errors when processing late data.
我知道这似乎是一个很长的问题,但是我尽力使它变得清晰和易于理解.非常感谢您的帮助.
I know it looks like a long question, but I tried my best to make it clear and simple to understand. I highly appreciate your help.
- 连接器对Kafka消息进行简单的字节到字节的复制,并在字节数组的开头添加消息的长度(用于解压缩).
- 这是
CustomByteArrayFormat
类的角色(请参见下面的配置)
- The connector does a simple byte to byte copy of the Kafka messages and add the length of the message at the beginning of the byte array (for decompression purposes).
- This is the role of the
CustomByteArrayFormat
class (see configs below)
-
CustomTimeBasedPartitioner
扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner
,其唯一目的是将generatePartitionedPath
方法覆盖为将主题放在路径的末尾.
- The
CustomTimeBasedPartitioner
extends theio.confluent.connect.storage.partitioner.TimeBasedPartitioner
and its sole purpose is to override thegeneratePartitionedPath
method to put the topic at the end of the path.
- 这些错误仅在连接器关闭几个小时并且必须赶上数据时才会发生
- 重新打开连接器时,它会开始追赶,但由于OutOfMemory错误而很快失败
- 当发生这些OOM错误时,连接器的
timestamp.extractor
配置设置为Record
- 将此配置切换为
Wallclock
(即Kafka Connect进程的时间)请勿抛出OOM错误,并且可以处理所有最新数据,但不再正确存储最新数据- 所有最新数据都将存储在重新打开连接器的时间的
YYYY/MM/dd/HH/mm/topic-name
中
- The
timestamp.extractor
configuration of the connector is set toRecord
when those OOM errors happen - Switching this configuration to
Wallclock
(i.e. the time of the Kafka Connect process) DO NOT throw OOM errors and all of the late data can be processed, but the late data is no longer correctly bucketed- All of the late data will be bucketed in the
YYYY/MM/dd/HH/mm/topic-name
of the time at which the connector was turn back on
-
"partition.duration.ms":"600000"
参数使连接器存储桶数据每小时以六个10分钟的路径传输(2018/06/20/12/[00 |10 | 20 | 30 | 40 | 50]
for 2018-06-20 at 12pm) - 因此,如果有24h的延迟数据,则连接器将不得不以
24h * 6 = 144
个不同的S3路径输出数据. - 每个10分钟文件夹包含10,000条消息/秒* 600秒= 6,000,000条消息,大小为6 GB
- 如果确实并行读取,则将使864GB的数据进入内存
- The
"partition.duration.ms": "600000"
parameter make the connector bucket data in six 10 minutes paths per hour (2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm) - Thus, with 24h of late data, the connector would have to output data in
24h * 6 = 144
different S3 paths. - Each 10 minutes folder contains 10,000 messages/sec * 600 seconds = 6,000,000 messages for a size of 6 GB
- If it does indeed read in parallel, that would make 864GB of data going into memory
-
"flush.size":"100000"
表示,如果读取的邮件数量超过100,000条,则应将它们提交到文件中(从而释放内存).- 对于1KB的消息,这意味着每100MB提交一次
- 但是,即使有144个并行读取,这仍然只能提供14.4 GB的总空间,这比可用的堆大小要少24GB.
- 在提交之前,
"flush.size"
是要每个分区读取的记录数吗?还是每个连接器的任务?
- The
"flush.size": "100000"
imply that if there is more dans 100,000 messages read, they should be committed to files (and thus free memory)- With messages of 1KB, this means committing every 100MB
- But even if there is 144 parallel readings, that would still only give a total of 14.4 GB, which is less than the 24GB of heap size available
- Is the
"flush.size"
the number of record to read per partition before committing? Or maybe per connector's task?
我的主要问题是,给出给定的内存使用量的数学运算是什么:
- 每秒的数量或记录
- 记录的大小
- 我从中读取的主题的Kafka分区数量
- 连接器任务的数量(如果相关)
- 每小时写入的存储桶数(此处为6,因为
"partition.duration.ms":"600000"
config) - 要处理的最新数据的最大小时数
S3接收器连接器配置
S3 Sink Connector configurations
{ "name": "xxxxxxx", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.region": "us-east-1", "partition.duration.ms": "600000", "topics.dir": "xxxxx", "flush.size": "100000", "schema.compatibility": "NONE", "topics": "xxxxxx,xxxxxx", "tasks.max": "16", "s3.part.size": "52428800", "timezone": "UTC", "locale": "en", "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat", "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "name": "xxxxxxxxx", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "xxxxxxx", "rotate.schedule.interval.ms": "600000", "path.format": "YYYY/MM/dd/HH/mm", "timestamp.extractor": "Record" }
工作人员配置
bootstrap.servers=XXXXXX key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false consumer.auto.offset.reset=earliest consumer.max.partition.fetch.bytes=2097152 consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor group.id=xxxxxxx config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-status rest.advertised.host.name=XXXX
修改:
我忘记添加我遇到的错误的示例:
I forgot to add an example of the errors I have:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482) java.lang.OutOfMemoryError: Java heap space [2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483) [2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
推荐答案
我终于能够理解Kafka Connect S3连接器中堆大小用法的工作原理
I was finally able to understand how the Heap Size usage works in the Kafka Connect S3 Connector
- S3连接器会将每个Kafka分区的数据写入分区的
路径
- 这些
paths
的分区方式取决于partitioner.class
参数; - 默认情况下,它是时间戳记,然后
partition.duration.ms
的值将确定每个已分区paths
的持续时间
- The S3 Connector will write the data of each Kafka partition into partitioned
paths
- The way those
paths
are partitioned depends on thepartitioner.class
parameter; - By default, it is by timestamp, and the value of
partition.duration.ms
will then determine the duration of each partitionedpaths
.
- 读取20个分区的示例,将
timestamp.extractor
设置为Record
,将partition.duration.ms
设置为1h,s3.part.size
设置为50 MB- 每小时所需的堆大小等于
20 * 50 MB
= 1 GB; - 但是,将
timestamp.extractor
设置为Record
,消息的时间戳对应于早于其读取时间的一个小时,该消息将在此更早的时间被缓冲.小时缓冲.因此,实际上,连接器将至少需要20 * 50 MB * 2h
= 2 GB的内存,因为总会出现延迟事件,如果延迟超过1小时,则连接事件会更多; - 请注意,如果将
timestamp.extractor
设置为Wallclock
,则情况并非如此,因为就Kafka Connect而言,几乎不会发生任何迟发事件. li>
- Example with 20 partitions read, a
timestamp.extractor
set toRecord
,partition.duration.ms
set to 1h,s3.part.size
set to 50 MB- The Heap Size needed each hour is then equal to
20 * 50 MB
= 1 GB; - But,
timestamp.extractor
being set toRecord
, messages having a timestamp corresponding to an earlier hour then the one at which they are read will be buffered in this earlier hour buffer. Therefore, in reality, the connector will need minimum20 * 50 MB * 2h
= 2 GB of memory because there is always late events, and more if there is events with a lateness superior to 1 hour; - Note that this isn't true if
timestamp.extractor
is set toWallclock
because there will virtually never be late events as far as Kafka Connect is concerned.
-
rotate.schedule.interval.ms
时间已过去- 此刷新条件总是触发.
rotate.schedule.interval.ms
time has passed- This flush condition is always triggered.
- 这意味着,如果将
timestamp.extractor
设置为Record
,则10分钟的Record
时间可能会少于或多于10分钟实际时间- 例如,在处理较晚的数据时,将在几秒钟内处理价值10分钟的数据,如果将
rotate.interval.ms
设置为10分钟,则此条件将每秒触发一次(应有); - 相反,如果事件流中有暂停,则此条件只有在看到带有时间戳的事件表明超过了
rotate.interval.ms
之后才触发,条件最后触发.
- This means that if
timestamp.extractor
is set toRecord
, 10 minutes ofRecord
time can pass in less or more and 10 minutes of actual time- For instance, when processing late data, 10 minutes worth of data will be processed in a few seconds, and if
rotate.interval.ms
is set to 10 minutes then this condition will trigger every second (as it should); - On the contrary, if there is a pause in the flow of events, this condition will not trigger until it sees an events with a timestamp showing that more than
rotate.interval.ms
has passed since the condition last triggered.
- 对于
rotate.interval.ms
,如果没有足够的消息,此条件可能永远不会触发.
- As for the
rotate.interval.ms
, this condition might never trigger if there is not enough messages.
- 如果您使用
Record
时间戳进行分区,则应将其乘以最大延迟(以毫秒为单位)/partition.duration.ms
- 这是最坏的情况,您在所有分区中持续发生延迟事件,并且在所有
最大延迟(以毫秒为单位)
范围内.
- If you are using a
Record
timestamp for partitioning, you should multiply it bymax lateness in milliseconds / partition.duration.ms
- This is a worst case scenario where you have constantly late events in all partitions and for the all range of
max lateness in milliseconds
.
- 默认设置为2.1 MB.
- 一个安全的考虑是确保Kafka消息的缓冲不会超过可用堆总大小的50%.
这篇关于Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
- This is a worst case scenario where you have constantly late events in all partitions and for the all range of
- 这是最坏的情况,您在所有分区中持续发生延迟事件,并且在所有
- For instance, when processing late data, 10 minutes worth of data will be processed in a few seconds, and if
- 例如,在处理较晚的数据时,将在几秒钟内处理价值10分钟的数据,如果将
- The Heap Size needed each hour is then equal to
- 每小时所需的堆大小等于
- The way those
- 这些
- All of the late data will be bucketed in the
- 所有最新数据都将存储在重新打开连接器的时间的
- This is the role of the
- 这是