Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区 [英] Kafka Connect S3 Sink Connector Partitioning large topics by id field

查看:34
本文介绍了Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是将数据从 Kafka 提取到 S3 数据湖的有用方法.我们使用了 FieldPartitioner 和 TimeBasePartitioner,并看到了一些相当不错的结果.

We've been working on adding Kafka Connect to our data platform for the last few weeks and think it would be a useful way of extracting data from Kafka into an S3 datalake. We've played around with FieldPartitioner and the TimeBasePartitioner and seen some pretty decent results.

我们还需要按用户 id 进行分区 - 但是尝试在用户 id 字段上使用 FieldPartitioner 时,连接器非常慢 - 特别是与按日期等分区相比.我知道按 id 分区会产生很多输出分区,因此不会那么快 - 这很好,但它需要能够跟上生产者.

We also have the need to partition by user id - but having tried using the FieldPartitioner on a user id field the connector is extremely slow - especially compared to partitioning by date etc. I understand that partitioning by an id will create a lot of output partitions and thus won't be as fast - which is fine but it needs to be able to keep up with producers.

到目前为止,我们已经尝试增加内存和堆 - 但我们通常不会看到任何内存问题,除非我们将 flush.size 增加到一个很大的数字.我们还尝试了小的冲洗尺寸、非常小的和大的rotate.schedule.interval.ms 配置.我们还研究了网络,但这似乎很好 - 使用其他分区器网络保持良好.

So far we've tried increasing memory and heap - but we don't usually see any memory issues unless we bump the flush.size to a large number. We've also tried small flush sizes, very small and large rotate.schedule.interval.ms configurations. We've also looked at networking, but that seems to be fine - using other partitioners the network keeps up fine.

在可能浪费大量时间之前,有没有人使用 S3 接收器连接器尝试或成功通过 id 字段进行分区,尤其是在较大的主题上?或者有没有人在配置或设置方面有任何建议,可能是一个不错的地方?

Before potentially wasting a lot of time on this has anyone attempted or succeeded in partitioning by an id field, especially on larger topics, using the S3 Sink Connector? Or has anyone got any suggestions in terms of configuration or setup that might be a good place to look?

推荐答案

我不习惯 Kafka 的连接器,但我至少会尽力提供帮助.

I'm not used to Kafka's connector, but I will at least try to help.

我不知道您是否可以将连接器配置为 kafka 主题的分区级别;我假设这里有一些方法可以做到这一点.

I am not aware if you can configure the connector to kafka topic's partition level; I am assuming there's some way to do that here.

一种可能的方法是专注于您的客户向 Kafka 经纪人生产的步骤.我的建议是实现你自己的 Partitioner,以便进一步"控制你想要在 kafka 端发送数据的位置.

One possible way to do this would be focused on the step where your clients produce to the Kafka brokers. My suggestion is to implement your own Partitioner, in order to have a "further" control of where you want to send the data on kafka's side.

这是自定义分区器的示例/简化.例如,您的生产者发送的 key 具有以下格式:id_name_date.此自定义分区程序尝试提取第一个元素 (id),然后选择所需的分区.

This is an example/simplification of your custom partitioner. For example, the key your producers send has this format: id_name_date. This custom partitioner tries to extract the first element (id) and then chooses the desired partition.

public class IdPartitioner implements Partitioner 
{       
   @Override
   public int partition(String topic, Object key, byte[] kb, 
                        Object v, byte[] vb, Cluster cl) 
   {
       try 
       {
         String pKey= (String) key;
         int id = Integer.parseInt(pKey.substring(0,pKey.indexOf("_")));
        
          /* getPartitionForId would decide which partition number corresponds
           for the received ID.You could also implement the logic directly here.*/

         return getPartitionForId(id);
       }
       catch (Exception e)
       {return 0;}
   }

   @Override
   public void close() 
   {
     //maybe some work here if needed
   }
}

即使您可能需要在 KafkaConnect 端进行更多调整,我相信此选项可能会有所帮助.假设您有一个包含 5 个分区的主题,并且 getPartitionForId 仅检查 ID 的第一个数字以决定分区(为了简化起见,最小 Id 为 100,最大 Id 为 599).

Even if you'll may need some more tunning on KafkaConnect side, I believe this option may be helpful. Assuming you have a topic with 5 partitions, and that getPartitionForId just checks the first number of the ID in order to decide the partition (for simplification purposes, min Id is 100 and max Id is 599).

所以如果接收到的key是,f.e:123_tempdata_20201203,分区方法会返回0,也就是第一个分区.

So if the received key is, f.e: 123_tempdata_20201203, the partition method would return 0, that is, the 1st partition.

(图片显示的是 P1 而不是 P0,因为我相信这样的例子看起来更自然,但请注意,第一个分区实际上被定义为 partition 0 .好吧老实说我在画这个的时候忘记了P0,没有保存模板,所以我不得不找个借口,比如:看起来更自然).

(The image shows P1 instead of P0 because i believe the example looks more natural this way, but be aware that the 1st partition is in fact defined as partition 0 . Ok to be honest I forgot about P0 while painting this and didn't save the template, so I had to search for an excuse, like: looks more natural).

基本上,这将是 S3 上传之前的预调整调整.

Basically this would be a pre-adjustment, or acommodation, before the S3 upload.

我知道这可能不是理想的答案,因为我不知道您系统的确切规格.我的猜测是有可能将主题分区直接指向 s3 位置.

I am aware maybe this isn't the ideal answer, as I don't know the exact specifications of your system. My guess is that there's some possibility to directly point topic partitions to s3 locations.

如果不可能这样做,至少我希望这可以给你一些进一步的想法.干杯!

If there's no possibility to do so, at least I hope this could give you some further ideas. Cheers!

这篇关于Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆