Spark从Kafka流式传输并以Avro格式写入HDFS [英] Spark Streaming From Kafka and Write to HDFS in Avro Format

查看:249
本文介绍了Spark从Kafka流式传输并以Avro格式写入HDFS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我基本上想要使用来自Kafka的数据并将其写入HDFS。但发生的情况是,它不是在hdfs中编写任何文件。它会创建空文件。

如果我想在hdfs中以avro格式编写,请指导我如何修改代码。

为了简单起见,我写了本地C盘。

  import org.apache.spark.SparkConf 
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming .kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org .apache.kafka.common.serialization.StringDeserializer

对象KafkaStreaming extends App {
val conf = new org.apache.spark.SparkConf()。setMaster(local [*]) .setAppName(kafka-streaming)
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext,org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map [String,Object](
bootstrap.servers - >localhost:9092,
key.deserializer - > classOf [StringDeserializer ],
value.deserializer - > classOf [StringDeserializer],
group.id - > group,
auto.offset.reset - > latest,
enable.auto.commit - > (true:java.lang.Boolean))
val topics = Array(topic)
val stream = KafkaUtils.createDirectStream [String,String](
ssc,
PreferConsistent ,
订阅[String,String](topics,kafkaParams))
val lines = stream.map(_。value)
stream.foreachRDD(rdd => {
rdd .coalesce(1).saveAsTextFile(C:/ data / spark /)
})
ssc.start()
ssc.awaitTermination()}

以下是build.sbt

  name:=spark-streaming
version:=1.0
scalaVersion:=2.11.8
libraryDependencies + =org.apache.spark%spark -core_2.11%2.2.0
libraryDependencies + =org.apache.spark%spark-streaming_2.11%2.2.0
libraryDependencies + =org.apache .spark%spark-streaming-kafka-0-
10_2.11%2.2.0
libraryDependencies + =org.apache.kafka%kafka-clients%0.11 .0.1


解决方案

将当前时间戳添加到您正在编写的文件中。



这应该可以解决您的问题。 :)

==========



如果你想追加所有文件到一个文件中,那么你可以使用如下的数据框:



由于这个Filesystem的设计方式,我不推荐在HDFS中使用append。但这里是你可以尝试的。
$ b


  1. 使用数据框的保存模式(append)创建数据框然后写入文件。

例如:

val dataframe = youRdd .toDF();
dataframe.write()。mode(SaveMode.Append).format(FILE_FORMAT).. save(path);

查看是否有帮助


I basically want to consumes data from Kafka and write it to HDFS. But happens so is that it is not writing any files in hdfs. it create empty files.

And also please guide me if i want to write in avro format in hdfs how can i modify the code.

For the sake of simplicity am writing to local C drive.

import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaStreaming extends App{
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (true: java.lang.Boolean))
val topics = Array("topic")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))
val lines = stream.map(_.value)
stream.foreachRDD(rdd => {
  rdd.coalesce(1).saveAsTextFile("C:/data/spark/")
})
ssc.start()
ssc.awaitTermination()}

And below is the build.sbt

name := "spark-streaming"
version := "1.0"
scalaVersion := "2.11.8" 
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-
10_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1"

解决方案

Saw your code, you can simply append current timestamp to the files you are writing.

That should solve your problem. :)

==========

If you want to append all the files into one file, then you can use dataframes as below:

I would not recommend using append in HDFS because of the way this Filesystem is designed. But here is what you can try.

  1. Create a dataframe from your RDD
  2. Use the Dataframe's save mode as ("append") and then write the file.

e.g:

val dataframe = youRdd.toDF(); dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT)..save(path);

See if that helps

这篇关于Spark从Kafka流式传输并以Avro格式写入HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆