什么是最有效的方式来写从卡夫卡到hdfs文件分区到日期 [英] What is most efficient way to write from kafka to hdfs with files partitioning into dates

查看:167
本文介绍了什么是最有效的方式来写从卡夫卡到hdfs文件分区到日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发通过kafka写入hdfs的项目。
假设有在线服务器将消息写入卡夫卡。每条消息都包含时间戳。
我想创建一个作业,根据消息中的时间戳,输出将是一个文件/文件。
例如,如果kafka中的数据是

  {ts:2013年01月7日15:25 :35.994,data:...} 
...
{ts:01-07-2013 16:25:35.994,data:...}
...
{ts:01-07-2013 17:25:35.994,data:...}

我想获得3个文件作为输出

  kafka_file_2013 -07-01_15.json 
kafka_file_2013-07-01_16.json
kafka_file_2013-07-01_17.json

当然如果我再次运行这个工作,并且有一个像

一样的新消息

  {ts:01-07-2013 17:25:35.994,data:...} 

它应该创建一个文件

  kafka_file_2013-07-01_17_2.json //第二小时17 

我已经看到一些开放源代码,但大部分都是从kafka读取到一些hdfs文件夹。
这个问题的最佳解决方案/设计/开源是什么

解决方案

您一定要查看来自linkedIn的Camus API 实现。加缪是LinkedIn的Kafka-> HDFS管道。这是一个mapreduce作业,它可以从Kafka中分发数据。看看这个博文我有编写一个简单的例子,它从twitter流获取并基于tweet时间戳写入HDFS。



项目可在github处获得 - https://github.com/linkedin/camus



加缪需要两个主要组件读取和解码来自Kafka的数据并将数据写入HDFS - 解密从Kafka读取的消息

加缪拥有一个解码器集合有助于解码来自Kafka的消息,解码器基本上扩展了 com.linkedin.camus.coders.MessageDecoder ,它实现了基于时间戳的逻辑分区数据。一组预定义的解码器出现在这个目录中,您可以根据这些编写自己的解码器。 camus / camus-kafka-coders / src / main / java / com / linkedin / camus / etl / kafka / coders /



将消息写入HDFS



加缪需要一组RecordWriterProvider类,它们扩展了 com.linkedin.camus.etl.RecordWriterProvider ,它会告诉Camus什么是应该写入HDFS的有效负载。一组预定义的RecordWriterProvider存在于这个目录中,你可以根据这些来编写自己的。

  camus-etl-kafka / src / main / java / com / linkedin / camus / etl / kafka / common 


I'm working on project that should write via kafka to hdfs. Suppose there is online server that writes messages into the kafka. Each message includes timestamp in it. I want to create a job that the output will be a file/files according to timestamp in messages. For example if the data in kafka is

 {"ts":"01-07-2013 15:25:35.994", "data": ...}
 ...    
 {"ts":"01-07-2013 16:25:35.994", "data": ...}
 ... 
 {"ts":"01-07-2013 17:25:35.994", "data": ...}

I would like to get the 3 files as output

  kafka_file_2013-07-01_15.json
  kafka_file_2013-07-01_16.json
  kafka_file_2013-07-01_17.json 

And of course If I'm running this job once again and there is a new messages in queue like

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

It should create a file

  kafka_file_2013-07-01_17_2.json // second  chunk of hour 17

I've seen some open sources but most of them reads from kafka to some hdfs folder. What is the best solution/design/opensource for this problem

解决方案

You should definitely check out Camus API implementation from linkedIn. Camus is LinkedIn’s Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka. Check out this post I have written for a simple example which fetches from twitter stream and writes to HDFS based on tweet timestamps.

Project is available at github at - https://github.com/linkedin/camus

Camus needs two main components for reading and decoding data from Kafka and writing data to HDFS –

Decoding Messages read from Kafka

Camus has a set of Decoders which helps in decoding messages coming from Kafka, Decoders basically extends com.linkedin.camus.coders.MessageDecoder which implements logic to partition data based on timestamp. A set of predefined Decoders are present in this directory and you can write your own based on these. camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

Writing messages to HDFS

Camus needs a set of RecordWriterProvider classes which extends com.linkedin.camus.etl.RecordWriterProvider that will tell Camus what’s the payload that should be written to HDFS.A set of predefined RecordWriterProvider are present in this directory and you can write your own based on these.

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common

这篇关于什么是最有效的方式来写从卡夫卡到hdfs文件分区到日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆