基于时间的桶记录(kafka-hdfs-connector) [英] Bucket records based on time(kafka-hdfs-connector)

查看:638
本文介绍了基于时间的桶记录(kafka-hdfs-connector)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图使用Confluent平台提供的kafka-hdfs-connector将来自Kafka的数据复制到Hive表中。虽然我能够成功地做到这一点,但我想知道如何根据时间间隔来分段传入的数据。例如,我希望每5分钟创建一个新的分区。

I am trying to copy data from Kafka into Hive tables using kafka-hdfs-connector provided by Confluent platform. While I am able to do it successfully I was wondering how to bucket the incoming data based on time interval. For example, I would like to have a new partition created every 5 minutes.

我试过了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner with partition.duration.ms ,但我认为我做错了。我在Hive表中看到只有一个分区,所有数据都进入该特定分区。像这样:

I tried io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner with partition.duration.ms but I think I am doing it the wrong way. I see only one partition in the Hive table with all the data going into that particular partition. Something like this :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03

所有的avro对象都被复制到这个分区中。

And all the avro objects are getting copied into this partition.

相反,我想要这样的东西:

Instead, I would like to have something like this :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03
year=2016/month=03/day=15/hour=19/minute=08
year=2016/month=03/day=15/hour=19/minute=13

最初连接器将创建路径年= 2016 /月= 03 /日= 15 /小时= 19 /分= 03 ,并将继续将所有传入数据复制到这个目录下5分钟,在第6分钟开始时它应该创建一个新路径,即 year = 2016 / month = 03 / day = 15 / hour = 19 / minute = 08 下一个5分钟到这个目录的数据,等等。

Initially connector will create the path year=2016/month=03/day=15/hour=19/minute=03 and will continue to copy all the incoming data into this directory for next 5 minutes, and at the start of 6th minute it should create a new path, i.e year=2016/month=03/day=15/hour=19/minute=08 and copy the data for next 5 minutes into this directory, and so on.

这是我的配置文件的样子:

This is how my config file looks like :

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
hdfs.url=hdfs://localhost:9000
flush.size=3
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
partition.duration.ms=300000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
locale=en
timezone=GMT
logs.dir=/kafka-connect/logs
topics.dir=/kafka-connect/topics
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD

如果有人能指引我朝着正确的方向,这将会非常有帮助。如果需要,我会很乐意分享更多细节。不想让这个问题看起来像一个永远不会结束。

It would be really helpful if someone could point me in the right direction. I would be glad to share more details in case it's required. Don't want to make this question look like one that never ends.

非常感谢!

推荐答案

您在path.format中的分钟字段是错误的:

your minute field in path.format is wrong:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/

它应该是:

it should be:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/

这篇关于基于时间的桶记录(kafka-hdfs-connector)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆