将kafka-connect-transform-archive与HdfsSinkConnector一起使用时的刷新大小 [英] Flush size when using kafka-connect-transform-archive with HdfsSinkConnector

查看:154
本文介绍了将kafka-connect-transform-archive与HdfsSinkConnector一起使用时的刷新大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要保存在我的数据湖中的Kafka主题中的数据。

I have data in a Kafka topic which I want to preserve on my data lake.

在担心密钥之前,我能够将Avro值保存在使用HdfsSinkConnector将文件放在数据湖上。每个文件中消息值的数量由HdfsSinkConnector的 flush.size属性确定。

Before worrying about the keys, I was able to save the Avro values in files on the datalake using HdfsSinkConnector. The number of message values in each file was determined by the "flush.size" property of the HdfsSinkConnector.

一切正常。接下来,我也想保留密钥。为此,我使用了kafka-connect-transform-archive,它将String键和Avro值包装到新的Avro模式中。

All good. Next I wanted to preserve the keys as well. To do this I used the kafka-connect-transform-archive which wraps the String key and Avro value into a new Avro schema.

这很好用...除了HdfsSinkConnector的flush.size现在被忽略了。数据湖中保存的每个文件仅具有1条消息。

This works great ... except that the flush.size for the HdfsSinkConnector is now being ignored. Each file saved in the data lake has exactly 1 message only.

因此,两种情况是1)仅保存值,每个文件中的值数由flush.size和2)保存键和值,每个文件仅包含一条消息,并且flush.size被忽略。

So, the two cases are 1) save values only, with the number of values in each file determined by the flush.size and 2) save keys and values with each file containing exactly one message and flush.size being ignored.

两种情况之间的唯一区别是配置

The only difference between the two situations is the configuration for the HdfsSinkConnector which specifies the archive transform.

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

-connect-transform-archive根据设计忽略刷新大小,或者是否需要一些其他配置才能保存数据湖中每个文件的多个键值消息?

Does the kafka-connect-transform-archive ignore flush size by design, or is there some additional configuration that I need in order to be able to save multiple key, value messages per file on the data lake?

推荐答案

在使用kafka gcs接收器连接器时,我遇到了同样的问题。

i had the same problem when using kafka gcs sink connector.

在com.github.jcustenborder.kafka中.connect.archive.Archiv通过代码,将为每条消息创建一个新的架构。

In com.github.jcustenborder.kafka.connect.archive.Archive code, a new Schema is created per message.

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
    .name("com.github.jcustenborder.kafka.connect.archive.Storage")
    .field("key", r.keySchema())
    .field("value", r.valueSchema())
    .field("topic", Schema.STRING_SCHEMA)
    .field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
    .put("key", r.key())
    .put("value", r.value())
    .put("topic", r.topic())
    .put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

}

如果您看一下kafka转换InsertField $ Value方法,您将看到它使用SynchronizedCache以便每次都检索相同的模式。

If you look at kafka transform InsertField$Value method, you will see that it use a SynchronizedCache in order to retreive the same schema every time.

https://github.com /axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L170

因此,您只需要创建一个架构(在apply函数之外)或使用相同的SynchronizedCache代码。

So, you just need to create a schema (outside the apply function) or use the same SynchronizedCache code.

这篇关于将kafka-connect-transform-archive与HdfsSinkConnector一起使用时的刷新大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆