使用Spark Streaming打印镶木地板架构 [英] Print parquet schema using Spark Streaming

查看:73
本文介绍了使用Spark Streaming打印镶木地板架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是编写的scala代码的摘要,该代码用于提取praquet文件并打印Parquet文件中的架构和前几条记录.

Following is the extract of the scala code written to extract praquet files and print the schema and first few records from the Parquet file.

但是什么都没打印出来.

But nothing is getting printed.

         val batchDuration = 2
     val inputDir = "file:///home/samplefiles"

    val conf = new SparkConf().setAppName("gpParquetStreaming").setMaster("local[*]")
    val sc   = new SparkContext(conf)
     sc.hadoopConfiguration.set("spark.streaming.fileStream.minRememberDuration", "600000")
    val ssc  = new StreamingContext(sc, Seconds(batchDuration))

    ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.parquet.avro.AvroReadSupport")

    val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](inputDir, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

    val dataStream = stream.transform(rdd => rdd.map(tuple => tuple._2)).persist
    val countData = dataStream.count

    dataStream.transform(rdd => rdd.map(record => record.toString)).foreachRDD((rdd: RDD[String], time: Time) => {
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._

        val dataDF = sqlContext.read.json(rdd)
        dataDF.printSchema
        dataDF.show
    })

    countData.print
    dataStream.print
    stream.print

    ssc.start()
    ssc.awaitTermination()
}

我也在尝试读取现有文件.因此,请将参数minRememberDuration设置为某个大数字.但这仍然无济于事.

I am trying to read the existing files also. so have set the parameter minRememberDuration to some big number. But still is does not help.

任何帮助!

谢谢

使用了依赖项

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-common</artifactId>
    <version>1.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-column</artifactId>
    <version>1.8.2</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-format</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-encoding</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>2.8.0</version>
</dependency>

错误:

17/08/25 02:00:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
Serialization stack:
    - object not serializable (class: org.apache.avro.generic.GenericData$Record, value: {"firstName": "Abhishek"})
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
17/08/25 02:00:46 ERROR TaskSetManager: Task 0.0 in stage 8.0 (TID 6) had a not serializable result: org.apache.avro.generic.GenericData$Record

推荐答案

似乎您应该在某些类中添加 extends Serializable

It seems like you should add extends Serializable to some of your classes

这篇关于使用Spark Streaming打印镶木地板架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆