Kafka Connect S3接收器连接器具有自定义分区程序的奇怪行为 [英] Kafka Connect S3 sink connector with custom Partitioner strange behavior
问题描述
我计划使用自定义的基于Field和TimeBased的分区程序在s3中对数据进行分区,如下所示:/part_< field_name> =< field_value>/part_date = YYYY-MM-dd/part_hour = HH/.... parquet.
I plan to use a custom Field and TimeBased partitioner to partition my data in s3 as follow:
/part_<field_name>=<field_value>/part_date=YYYY-MM-dd/part_hour=HH/....parquet.
我的分区程序工作正常,一切都在我的S3存储桶中达到预期.
My Partitioner works fine, everything is as expected in my S3 bucket.
问题与接收器的性能有关
我的输入主题中有400kB/s/broker =〜1.2MB/s,接收器可以处理尖峰并提交少量记录.
The problem is linked to the performance of the sink
I have 400kB/s/broker = ~1.2MB/s in my input topic and the sink works with spikes and commit a small number of records.
如果我使用经典的TimeBasedPartitioner,请在此处输入图像描述
If I use the classic TimeBasedPartitioner, enter image description here
所以我的问题似乎出在我的自定义分区程序中.这是代码:
So my problem seems to be in my custom partitioner. Here is the code:
package test;
import ...;
public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;
protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
this.delim = (String)config.get("directory.delim");
this.partitionDurationMs = partitionDurationMs;
try {
this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
this.timestampExtractor.configure(config);
this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
} catch (IllegalArgumentException e) {
ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
ce.initCause(e);
throw ce;
}
}
private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) {
return DateTimeFormat.forPattern(str).withZone(timeZone);
}
public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) {
long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
return timeZone.convertLocalToUTC(partitionedTime, false);
}
public String encodePartition(SinkRecord sinkRecord, long nowInMillis) {
final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);
}
public String encodePartition(SinkRecord sinkRecord) {
final Long timestamp = this.timestampExtractor.extract(sinkRecord);
final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
}
private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) {
if (timestamp == null) {
String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
} else if (partitionField == null) {
String msg = "Unable to determine partition field using partition.field '" + partitionField + "' for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
} else {
DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
return this.FIELD_SUFFIX
+ config.get("partition.field")
+ this.FIELD_SEP
+ partitionField
+ this.delim
+ recordTime.toString(this.formatter);
}
}
static class PartitionFieldExtractor {
private final String fieldName;
PartitionFieldExtractor(String fieldName) {
this.fieldName = fieldName;
}
String extract(ConnectRecord<?> record) {
Object value = record.value();
if (value instanceof Struct) {
Struct struct = (Struct)value;
return (String) struct.get(fieldName);
} else {
FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
throw new PartitionException("Error encoding partition.");
}
}
}
public long getPartitionDurationMs() {
return partitionDurationMs;
}
public TimestampExtractor getTimestampExtractor() {
return timestampExtractor;
}
}
它或多或少是FieldPartitioner和TimeBasedPartitioner的合并.
It's more or less a merge of FieldPartitioner and TimeBasedPartitioner.
关于为什么我在下沉消息时表现不好的任何线索?在使用记录中的字段进行分区时,反序列化并从消息中提取数据会导致此问题?由于我有大约80个不同的字段值,这可能是内存问题,因为它将在堆中维护80倍以上的缓冲区?
Any clue on why I have suck a bad performance on while sinking messages ? While partitioning using field in the record, deserialize and extract data from the message can cause this issue ? As I have around 80 different fields values, can it be a memory issue as it will maintain 80 times more buffers in the heap ?
感谢您的帮助.
推荐答案
仅供参考,问题在于分区程序本身.我的分区程序需要解码整个消息并获取信息.由于我收到很多消息,因此处理所有这些事件都需要时间.
FYI, the problem was the partitioner itself. My partitioner needed to decode the entire message and get the info. As I have a lot of messages, it takes time to handle all these events.
这篇关于Kafka Connect S3接收器连接器具有自定义分区程序的奇怪行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!