具有自定义分区器奇怪行为的 Kafka Connect S3 接收器连接器 [英] Kafka Connect S3 sink connector with custom Partitioner strange behavior

查看:29
本文介绍了具有自定义分区器奇怪行为的 Kafka Connect S3 接收器连接器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


我计划使用自定义 Field 和 TimeBased 分区器在 s3 中对我的数据进行分区,如下所示:/part_=/part_date=YYYY-MM-dd/part_hour=HH/....实木复合地板.


I plan to use a custom Field and TimeBased partitioner to partition my data in s3 as follow: /part_<field_name>=<field_value>/part_date=YYYY-MM-dd/part_hour=HH/....parquet.

我的分区器工作正常,我的 S3 存储桶中的一切都符合预期.

My Partitioner works fine, everything is as expected in my S3 bucket.

该问题与接收器的性能有关
我的输入主题中有 400kB/s/broker = ~1.2MB/s,接收器处理峰值并提交少量记录.

The problem is linked to the performance of the sink
I have 400kB/s/broker = ~1.2MB/s in my input topic and the sink works with spikes and commit a small number of records.

如果我使用经典的 TimeBasedPartitioner,在此处输入图片说明

If I use the classic TimeBasedPartitioner, enter image description here

所以我的问题似乎出在我的自定义分区程序中.代码如下:

So my problem seems to be in my custom partitioner. Here is the code:

package test;
import ...;

public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {

private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {

    this.delim = (String)config.get("directory.delim");
    this.partitionDurationMs = partitionDurationMs;

    try {
        this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
        this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
        this.timestampExtractor.configure(config);
        this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
    } catch (IllegalArgumentException e) {
        ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
        ce.initCause(e);
        throw ce;
    }
}

private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) {
    return DateTimeFormat.forPattern(str).withZone(timeZone);
}

public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) {
    long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
    long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
    return timeZone.convertLocalToUTC(partitionedTime, false);
}

public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
    final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
    final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
    return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);
}

public String encodePartition(SinkRecord sinkRecord) {
    final Long timestamp = this.timestampExtractor.extract(sinkRecord);
    final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
    return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
}

private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) {

    if (timestamp == null) {
        String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
    } else if (partitionField == null) {
        String msg = "Unable to determine partition field using partition.field '" + partitionField  + "' for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
    }  else {
        DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
        return this.FIELD_SUFFIX
                + config.get("partition.field")
                + this.FIELD_SEP
                + partitionField
                + this.delim
                + recordTime.toString(this.formatter);
    }
}

static class PartitionFieldExtractor {

    private final String fieldName;

    PartitionFieldExtractor(String fieldName) {
        this.fieldName = fieldName;
    }

    String extract(ConnectRecord<?> record) {
        Object value = record.value();
        if (value instanceof Struct) {
            Struct struct = (Struct)value;
            return (String) struct.get(fieldName);
        } else {
            FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
            throw new PartitionException("Error encoding partition.");
        }
    }
}

public long getPartitionDurationMs() {
    return partitionDurationMs;
}

public TimestampExtractor getTimestampExtractor() {
    return timestampExtractor;
}
}

它或多或少是 FieldPartitioner 和 TimeBasedPartitioner 的合并.

It's more or less a merge of FieldPartitioner and TimeBasedPartitioner.

关于为什么我在接收消息时表现不佳的任何线索?在使用记录中的字段进行分区时,从消息中反序列化和提取数据会导致此问题吗?由于我有大约 80 个不同的字段值,这是否是内存问题,因为它将在堆中维护 80 倍多的缓冲区?

Any clue on why I have suck a bad performance on while sinking messages ? While partitioning using field in the record, deserialize and extract data from the message can cause this issue ? As I have around 80 different fields values, can it be a memory issue as it will maintain 80 times more buffers in the heap ?

感谢您的帮助.

推荐答案

仅供参考,问题在于分区程序本身.我的分区器需要解码整个消息并获取信息.由于我有很多消息,处理所有这些事件需要时间.

FYI, the problem was the partitioner itself. My partitioner needed to decode the entire message and get the info. As I have a lot of messages, it takes time to handle all these events.

这篇关于具有自定义分区器奇怪行为的 Kafka Connect S3 接收器连接器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆