Spark从Kafka流式传输并以Avro格式写入HDFS [英] Spark Streaming From Kafka and Write to HDFS in Avro Format
问题描述
如果我想在hdfs中以avro格式编写,请指导我如何修改代码。
为了简单起见,我写了本地C盘。
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming .kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org .apache.kafka.common.serialization.StringDeserializer
对象KafkaStreaming extends App {
val conf = new org.apache.spark.SparkConf()。setMaster(local [*]) .setAppName(kafka-streaming)
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext,org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map [String,Object](
bootstrap.servers - >localhost:9092,
key.deserializer - > classOf [StringDeserializer ],
value.deserializer - > classOf [StringDeserializer],
group.id - > group,
auto.offset.reset - > latest,
enable.auto.commit - > (true:java.lang.Boolean))
val topics = Array(topic)
val stream = KafkaUtils.createDirectStream [String,String](
ssc,
PreferConsistent ,
订阅[String,String](topics,kafkaParams))
val lines = stream.map(_。value)
stream.foreachRDD(rdd => {
rdd .coalesce(1).saveAsTextFile(C:/ data / spark /)
})
ssc.start()
ssc.awaitTermination()}
以下是build.sbt
name:=spark-streaming
version:=1.0
scalaVersion:=2.11.8
libraryDependencies + =org.apache.spark%spark -core_2.11%2.2.0
libraryDependencies + =org.apache.spark%spark-streaming_2.11%2.2.0
libraryDependencies + =org.apache .spark%spark-streaming-kafka-0-
10_2.11%2.2.0
libraryDependencies + =org.apache.kafka%kafka-clients%0.11 .0.1
将当前时间戳添加到您正在编写的文件中。
这应该可以解决您的问题。 :)
==========
如果你想追加所有文件到一个文件中,那么你可以使用如下的数据框:
由于这个Filesystem的设计方式,我不推荐在HDFS中使用append。但这里是你可以尝试的。
$ b
- 使用数据框的保存模式(append)创建数据框然后写入文件。
例如:
val dataframe = youRdd .toDF();
dataframe.write()。mode(SaveMode.Append).format(FILE_FORMAT).. save(path);
查看是否有帮助
I basically want to consumes data from Kafka and write it to HDFS. But happens so is that it is not writing any files in hdfs. it create empty files.
And also please guide me if i want to write in avro format in hdfs how can i modify the code.
For the sake of simplicity am writing to local C drive.
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaStreaming extends App{
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean))
val topics = Array("topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val lines = stream.map(_.value)
stream.foreachRDD(rdd => {
rdd.coalesce(1).saveAsTextFile("C:/data/spark/")
})
ssc.start()
ssc.awaitTermination()}
And below is the build.sbt
name := "spark-streaming"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-
10_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1"
Saw your code, you can simply append current timestamp to the files you are writing.
That should solve your problem. :)
==========
If you want to append all the files into one file, then you can use dataframes as below:
I would not recommend using append in HDFS because of the way this Filesystem is designed. But here is what you can try.
- Create a dataframe from your RDD
- Use the Dataframe's save mode as ("append") and then write the file.
e.g:
val dataframe = youRdd.toDF(); dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT)..save(path);
See if that helps
这篇关于Spark从Kafka流式传输并以Avro格式写入HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!