从 kafka 写入到 hdfs 并将文件分区为日期的最有效方法是什么 [英] What is most efficient way to write from kafka to hdfs with files partitioning into dates

查看:16
本文介绍了从 kafka 写入到 hdfs 并将文件分区为日期的最有效方法是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理应该通过 kafka 写入 hdfs 的项目.假设有在线服务器将消息写入 kafka.每条消息都包含时间戳.我想根据消息中的时间戳创建一个输出将是一个/多个文件的作业.例如如果kafka中的数据是

 {"ts":"01-07-2013 15:25:35.994", "data": ...}...{"ts":"01-07-2013 16:25:35.994", "数据": ...}...{"ts":"01-07-2013 17:25:35.994", "数据": ...}

我想得到 3 个文件作为输出

 kafka_file_2013-07-01_15.jsonkafka_file_2013-07-01_16.jsonkafka_file_2013-07-01_17.json

当然,如果我再次运行此作业并且队列中有新消息,例如

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

它应该创建一个文件

 kafka_file_2013-07-01_17_2.json//17 小时的第二块

我看过一些开源,但大多数都是从 kafka 读取到一些 hdfs 文件夹.这个问题的最佳解决方案/设计/开源是什么

解决方案

您绝对应该查看linkedIn 的Camus API 实现.Camus 是 LinkedIn 的 Kafka->HDFS 管道.它是一个 mapreduce 作业,它从 Kafka 中加载分布式数据.看看这个帖子我有为一个简单的示例编写,该示例从 Twitter 流中获取并根据推文时间戳写入 HDFS.

项目可在 github 上找到 - https://github.com/linkedin/camus

Camus 需要两个主要组件来从 Kafka 读取和解码数据并将数据写入 HDFS –

解码从Kafka读取的消息

Camus 有一组解码器可以帮助解码来自 Kafka 的消息,解码器基本上扩展了 com.linkedin.camus.coders.MessageDecoder,它实现了基于时间戳对数据进行分区的逻辑.此目录中存在一组预定义的解码器,您可以基于这些编写自己的解码器.camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

向 HDFS 写入消息

Camus 需要一组 RecordWriterProvider 类,这些类扩展了 com.linkedin.camus.etl.RecordWriterProvider 这将告诉 Camus 应该写入 HDFS 的有效负载是什么.一组预定义的 RecordWriterProvider 存在于您可以根据这些目录编写自己的目录.

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common

I'm working on project that should write via kafka to hdfs. Suppose there is online server that writes messages into the kafka. Each message includes timestamp in it. I want to create a job that the output will be a file/files according to timestamp in messages. For example if the data in kafka is

 {"ts":"01-07-2013 15:25:35.994", "data": ...}
 ...    
 {"ts":"01-07-2013 16:25:35.994", "data": ...}
 ... 
 {"ts":"01-07-2013 17:25:35.994", "data": ...}

I would like to get the 3 files as output

  kafka_file_2013-07-01_15.json
  kafka_file_2013-07-01_16.json
  kafka_file_2013-07-01_17.json 

And of course If I'm running this job once again and there is a new messages in queue like

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

It should create a file

  kafka_file_2013-07-01_17_2.json // second  chunk of hour 17

I've seen some open sources but most of them reads from kafka to some hdfs folder. What is the best solution/design/opensource for this problem

解决方案

You should definitely check out Camus API implementation from linkedIn. Camus is LinkedIn’s Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka. Check out this post I have written for a simple example which fetches from twitter stream and writes to HDFS based on tweet timestamps.

Project is available at github at - https://github.com/linkedin/camus

Camus needs two main components for reading and decoding data from Kafka and writing data to HDFS –

Decoding Messages read from Kafka

Camus has a set of Decoders which helps in decoding messages coming from Kafka, Decoders basically extends com.linkedin.camus.coders.MessageDecoder which implements logic to partition data based on timestamp. A set of predefined Decoders are present in this directory and you can write your own based on these. camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

Writing messages to HDFS

Camus needs a set of RecordWriterProvider classes which extends com.linkedin.camus.etl.RecordWriterProvider that will tell Camus what’s the payload that should be written to HDFS.A set of predefined RecordWriterProvider are present in this directory and you can write your own based on these.

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common

这篇关于从 kafka 写入到 hdfs 并将文件分区为日期的最有效方法是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆