Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner [英] Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner

查看:54
本文介绍了Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用 Kafka Connect S3接收器连接器3.3.1 会将Kafka消息复制到S3,处理后期数据时出现OutOfMemory错误.

I'm currently working with the Kafka Connect S3 Sink Connector 3.3.1 to copy Kafka messages over to S3 and I have OutOfMemory errors when processing late data.

我知道这似乎是一个很长的问题,但是我尽力使它变得清晰和易于理解.非常感谢您的帮助.

I know it looks like a long question, but I tried my best to make it clear and simple to understand. I highly appreciate your help.

  • 连接器对Kafka消息进行简单的字节到字节的复制,并在字节数组的开头添加消息的长度(用于解压缩).
    • 这是 CustomByteArrayFormat 类的角色(请参见下面的配置)
    • The connector does a simple byte to byte copy of the Kafka messages and add the length of the message at the beginning of the byte array (for decompression purposes).
      • This is the role of the CustomByteArrayFormat class (see configs below)
      • CustomTimeBasedPartitioner 扩展了 io.confluent.connect.storage.partitioner.TimeBasedPartitioner ,其唯一目的是将 generatePartitionedPath 方法覆盖为将主题放在路径的末尾.
      • The CustomTimeBasedPartitioner extends the io.confluent.connect.storage.partitioner.TimeBasedPartitioner and its sole purpose is to override the generatePartitionedPath method to put the topic at the end of the path.
      • 这些错误仅在连接器关闭几个小时并且必须赶上数据时才会发生
      • 重新打开连接器时,它会开始追赶,但由于OutOfMemory错误而很快失败
      • 当发生这些OOM错误时,连接器的 timestamp.extractor 配置设置为 Record
      • 将此配置切换为 Wallclock (即Kafka Connect进程的时间)请勿抛出OOM错误,并且可以处理所有最新数据,但不再正确存储最新数据
        • 所有最新数据都将存储在重新打开连接器的时间的 YYYY/MM/dd/HH/mm/topic-name
        • The timestamp.extractor configuration of the connector is set to Record when those OOM errors happen
        • Switching this configuration to Wallclock (i.e. the time of the Kafka Connect process) DO NOT throw OOM errors and all of the late data can be processed, but the late data is no longer correctly bucketed
          • All of the late data will be bucketed in the YYYY/MM/dd/HH/mm/topic-name of the time at which the connector was turn back on
          • "partition.duration.ms":"600000" 参数使连接器存储桶数据每小时以六个10分钟的路径传输( 2018/06/20/12/[00 |10 | 20 | 30 | 40 | 50] for 2018-06-20 at 12pm)
          • 因此,如果有24h的延迟数据,则连接器将不得不以 24h * 6 = 144 个不同的S3路径输出数据.
          • 每个10分钟文件夹包含10,000条消息/秒* 600秒= 6,000,000条消息,大小为6 GB
          • 如果确实并行读取,则将使864GB的数据进入内存
          • The "partition.duration.ms": "600000" parameter make the connector bucket data in six 10 minutes paths per hour (2018/06/20/12/[00|10|20|30|40|50] for 2018-06-20 at 12pm)
          • Thus, with 24h of late data, the connector would have to output data in 24h * 6 = 144 different S3 paths.
          • Each 10 minutes folder contains 10,000 messages/sec * 600 seconds = 6,000,000 messages for a size of 6 GB
          • If it does indeed read in parallel, that would make 864GB of data going into memory
          • "flush.size":"100000" 表示,如果读取的邮件数量超过100,000条,则应将它们提交到文件中(从而释放内存).
            • 对于1KB的消息,这意味着每100MB提交一次
            • 但是,即使有144个并行读取,这仍然只能提供14.4 GB的总空间,这比可用的堆大小要少24GB.
            • 在提交之前,"flush.size" 是要每个分区读取的记录数吗?还是每个连接器的任务?
            • The "flush.size": "100000" imply that if there is more dans 100,000 messages read, they should be committed to files (and thus free memory)
              • With messages of 1KB, this means committing every 100MB
              • But even if there is 144 parallel readings, that would still only give a total of 14.4 GB, which is less than the 24GB of heap size available
              • Is the "flush.size" the number of record to read per partition before committing? Or maybe per connector's task?

              我的主要问题是,给出给定的内存使用量的数学运算是什么:

              • 每秒的数量或记录
              • 记录的大小
              • 我从中读取的主题的Kafka分区数量
              • 连接器任务的数量(如果相关)
              • 每小时写入的存储桶数(此处为6,因为"partition.duration.ms":"600000" config)
              • 要处理的最新数据的最大小时数

              S3接收器连接器配置

              S3 Sink Connector configurations

              {
                "name": "xxxxxxx",
                "config": {
                  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
                  "s3.region": "us-east-1",
                  "partition.duration.ms": "600000",
                  "topics.dir": "xxxxx",
                  "flush.size": "100000",
                  "schema.compatibility": "NONE",
                  "topics": "xxxxxx,xxxxxx",
                  "tasks.max": "16",
                  "s3.part.size": "52428800",
                  "timezone": "UTC",
                  "locale": "en",
                  "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
                  "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
                  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
                  "name": "xxxxxxxxx",
                  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
                  "s3.bucket.name": "xxxxxxx",
                  "rotate.schedule.interval.ms": "600000",
                  "path.format": "YYYY/MM/dd/HH/mm",
                  "timestamp.extractor": "Record"
              }
              

              工作人员配置

              bootstrap.servers=XXXXXX
              key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
              value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
              internal.key.converter=org.apache.kafka.connect.json.JsonConverter
              internal.value.converter=org.apache.kafka.connect.json.JsonConverter
              internal.key.converter.schemas.enable=false
              internal.value.converter.schemas.enable=false
              consumer.auto.offset.reset=earliest
              consumer.max.partition.fetch.bytes=2097152
              consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
              group.id=xxxxxxx
              config.storage.topic=connect-configs
              offset.storage.topic=connect-offsets
              status.storage.topic=connect-status
              rest.advertised.host.name=XXXX
              

              修改:

              我忘记添加我遇到的错误的示例:

              I forgot to add an example of the errors I have:

              2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
              java.lang.OutOfMemoryError: Java heap space
              [2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
              [2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
              org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
                  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
                  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
                  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
                  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
                  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
                  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                  at java.lang.Thread.run(Thread.java:745)
              

              推荐答案

              我终于能够理解Kafka Connect S3连接器中堆大小用法的工作原理

              I was finally able to understand how the Heap Size usage works in the Kafka Connect S3 Connector

              • S3连接器会将每个Kafka分区的数据写入分区的 路径
                • 这些 paths 的分区方式取决于 partitioner.class 参数;
                • 默认情况下,它是时间戳记,然后 partition.duration.ms 的值将确定每个已分区 paths 的持续时间
                • The S3 Connector will write the data of each Kafka partition into partitioned paths
                  • The way those paths are partitioned depends on the partitioner.class parameter;
                  • By default, it is by timestamp, and the value of partition.duration.ms will then determine the duration of each partitioned paths.
                  • 读取20个分区的示例,将 timestamp.extractor 设置为 Record ,将 partition.duration.ms 设置为1h,s3.part.size 设置为50 MB
                    • 每小时所需的堆大小等于 20 * 50 MB = 1 GB;
                    • 但是,将 timestamp.extractor 设置为 Record ,消息的时间戳对应于早于其读取时间的一个小时,该消息将在此更早的时间被缓冲.小时缓冲.因此,实际上,连接器将至少需要 20 * 50 MB * 2h = 2 GB的内存,因为总会出现延迟事件,如果延迟超过1小时,则连接事件会更多;
                    • 请注意,如果将 timestamp.extractor 设置为 Wallclock ,则情况并非如此,因为就Kafka Connect而言,几乎不会发生任何迟发事件.
                    • li>
                    • Example with 20 partitions read, a timestamp.extractor set to Record, partition.duration.ms set to 1h, s3.part.size set to 50 MB
                      • The Heap Size needed each hour is then equal to 20 * 50 MB = 1 GB;
                      • But, timestamp.extractor being set to Record, messages having a timestamp corresponding to an earlier hour then the one at which they are read will be buffered in this earlier hour buffer. Therefore, in reality, the connector will need minimum 20 * 50 MB * 2h = 2 GB of memory because there is always late events, and more if there is events with a lateness superior to 1 hour;
                      • Note that this isn't true if timestamp.extractor is set to Wallclock because there will virtually never be late events as far as Kafka Connect is concerned.
                      • rotate.schedule.interval.ms 时间已过去
                        • 此刷新条件总是触发.
                        • rotate.schedule.interval.ms time has passed
                          • This flush condition is always triggered.
                          • 这意味着,如果将 timestamp.extractor 设置为 Record ,则10分钟的 Record 时间可能会少于或多于10分钟实际时间
                            • 例如,在处理较晚的数据时,将在几秒钟内处理价值10分钟的数据,如果将 rotate.interval.ms 设置为10分钟,则此条件将每秒触发一次(应有);
                            • 相反,如果事件流中有暂停,则此条件只有在看到带有时间戳的事件表明超过了 rotate.interval.ms 之后才触发,条件最后触发.
                            • This means that if timestamp.extractor is set to Record, 10 minutes of Record time can pass in less or more and 10 minutes of actual time
                              • For instance, when processing late data, 10 minutes worth of data will be processed in a few seconds, and if rotate.interval.ms is set to 10 minutes then this condition will trigger every second (as it should);
                              • On the contrary, if there is a pause in the flow of events, this condition will not trigger until it sees an events with a timestamp showing that more than rotate.interval.ms has passed since the condition last triggered.
                              • 对于 rotate.interval.ms ,如果没有足够的消息,此条件可能永远不会触发.
                              • As for the rotate.interval.ms, this condition might never trigger if there is not enough messages.
                              • 如果您使用 Record 时间戳进行分区,则应将其乘以最大延迟(以毫秒为单位)/partition.duration.ms
                                • 这是最坏的情况,您在所有分区中持续发生延迟事件,并且在所有最大延迟(以毫秒为单位)范围内.
                                • If you are using a Record timestamp for partitioning, you should multiply it by max lateness in milliseconds / partition.duration.ms
                                  • This is a worst case scenario where you have constantly late events in all partitions and for the all range of max lateness in milliseconds.
                                  • 默认设置为2.1 MB.
                                  • 一个安全的考虑是确保Kafka消息的缓冲不会超过可用堆总大小的50%.

                                  这篇关于Kafka Connect S3 Connector OutOfMemory错误与TimeBasedPartitioner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆