将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小 [英] Flush size when using kafka-connect-transform-archive with HdfsSinkConnector

查看:31
本文介绍了将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将 Kafka 主题中的数据保留在我的数据湖中.

I have data in a Kafka topic which I want to preserve on my data lake.

在担心密钥之前,我能够使用 HdfsSinkConnector 将 Avro 值保存在数据湖上的文件中.每个文件中消息值的数量由 HdfsSinkConnector 的flush.size"属性决定.

Before worrying about the keys, I was able to save the Avro values in files on the datalake using HdfsSinkConnector. The number of message values in each file was determined by the "flush.size" property of the HdfsSinkConnector.

一切都很好.接下来,我也想保留密钥.为此,我使用了 kafka-connect-transform-archive,它将 String 键和 Avro 值包装到一个新的 Avro 模式中.

All good. Next I wanted to preserve the keys as well. To do this I used the kafka-connect-transform-archive which wraps the String key and Avro value into a new Avro schema.

这很好用……除了 HdfsSinkConnector 的 flush.size 现在被忽略了.数据湖中保存的每个文件只有 1 条消息.

This works great ... except that the flush.size for the HdfsSinkConnector is now being ignored. Each file saved in the data lake has exactly 1 message only.

因此,这两种情况是 1) 仅保存值,每个文件中的值数量由 flush.size 决定;2) 保存键和值,每个文件只包含一条消息,而忽略 flush.size.

So, the two cases are 1) save values only, with the number of values in each file determined by the flush.size and 2) save keys and values with each file containing exactly one message and flush.size being ignored.

这两种情况的唯一区别是指定存档转换的 HdfsSinkConnector 的配置.

The only difference between the two situations is the configuration for the HdfsSinkConnector which specifies the archive transform.

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

kafka-connect-transform-archive 是否按设计忽略刷新大小,或者是否需要一些额外的配置才能在数据湖上为每个文件保存多个键、值消息?

Does the kafka-connect-transform-archive ignore flush size by design, or is there some additional configuration that I need in order to be able to save multiple key, value messages per file on the data lake?

推荐答案

我在使用 kafka gcs sink 连接器时遇到了同样的问题.

i had the same problem when using kafka gcs sink connector.

在 com.github.jcustenborder.kafka.connect.archive.Archive 代码中,每条消息都会创建一个新的 Schema.

In com.github.jcustenborder.kafka.connect.archive.Archive code, a new Schema is created per message.

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
    .name("com.github.jcustenborder.kafka.connect.archive.Storage")
    .field("key", r.keySchema())
    .field("value", r.valueSchema())
    .field("topic", Schema.STRING_SCHEMA)
    .field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
    .put("key", r.key())
    .put("value", r.value())
    .put("topic", r.topic())
    .put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

}

如果您查看 kafka 转换 InsertField$Value 方法,您会看到它使用 SynchronizedCache 以便每次检索相同的架构.

If you look at kafka transform InsertField$Value method, you will see that it use a SynchronizedCache in order to retreive the same schema every time.

https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms.java#Lsert70一个>

https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L170

因此,您只需要创建一个架构(在 apply 函数之外)或使用相同的 SynchronizedCache 代码.

So, you just need to create a schema (outside the apply function) or use the same SynchronizedCache code.

这篇关于将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时的刷新大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆