无法使用Spark结构化流在Parquet File中写入数据 [英] Not able to write Data in Parquet File using Spark Structured Streaming

查看:294
本文介绍了无法使用Spark结构化流在Parquet File中写入数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark结构化流:

I have a Spark Structured Streaming:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .option("subscribe", "topic")
      .load()

我想使用DataStreamWriter将数据写入FileSystem,

I want to write data to FileSystem using DataStreamWriter,

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .start("data")

但是在data文件夹中创建了零个文件.仅创建_spark_metadata.

But zero files are getting created in data folder. Only _spark_metadata is getting created.

但是,当formatconsole时,我可以在控制台上看到数据:

However, I can see the data on console when format is console:

val query = df
          .writeStream
          .outputMode("append")
          .format("console")
          .start()

+--------------------+------------------+------------------+
|                time|              col1|              col2|
+--------------------+------------------+------------------+
|49368-05-11 20:42...|0.9166470338147503|0.5576946794171861|
+--------------------+------------------+------------------+

我不明白其背后的原因.

I cannot understand the reason behind it.

火花-2.1.0

推荐答案

我解决了此问题.实际上,当我尝试在spark-shell上运行结构化流式传输时,它给出了一个错误,即endingOffsets在流式查询中无效,即:

I resolved this issue. Actually when I tried to run the Structured Streaming on spark-shell, then it gave an error that endingOffsets are not valid in streaming queries, i.e.,:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .option("subscribe", "topic")
      .load()


java.lang.IllegalArgumentException: ending offset not valid in streaming queries
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions$1.apply(KafkaSourceProvider.scala:374)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions$1.apply(KafkaSourceProvider.scala:373)
  at scala.Option.map(Option.scala:146)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:373)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:199)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
  ... 48 elided

因此,我从流查询中删除了endingOffsets.

So, I removed endingOffsets from streaming query.

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "topic")
      .load()

然后我尝试将流查询的结果保存在Parquet文件中,在此期间我才知道-必须指定检查点位置,即:

Then I tried to save streaming queries' result in Parquet files, during which I came to know that - checkpoint location must be specified, i.e.,:

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .start("data")

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
  at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:207)
  at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:204)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:203)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:206)
  ... 48 elided

因此,我添加了checkPointLocation:

val query = df
          .writeStream
          .outputMode("append")
          .format("parquet")
          .option("checkpointLocation", "checkpoint")
          .start("data")

进行了这些修改之后,我能够将流查询的结果保存在Parquet文件中.

After doing these modifications, I was able to save streaming queries' results in Parquet files.

但是,很奇怪,当我通过sbt应用程序运行相同的代码时,它没有引发任何错误,但是当我通过spark-shell运行相同的代码时,这些错误被抛出了.我认为Apache Spark在通过sbt/maven应用程序运行时也应该抛出这些错误.对我来说似乎是个虫子!

But, it is strange that when I ran the same code via sbt application, it didn't threw any errors, but when I ran the same code via spark-shell these errors were thrown. I think Apache Spark should throw these errors when run via sbt/maven app too. It is seems to be a bug to me !

这篇关于无法使用Spark结构化流在Parquet File中写入数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆