带有替代方法的foreachBatch重载方法 [英] Overloaded method foreachBatch with alternatives
问题描述
我正在尝试将json文件序列化为实木复合地板格式.我有这个错误:
I am trying to serialize a json file to parquet format. I have this error :
错误:(34,25)重载了foreachBatch的方法,并带有替代方法:(函数:org.apache.spark.api.java.function.VoidFunction2 [org.apache.spark.sql.Dataset [org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row](功能:(org.apache.spark.sql.Dataset [org.apache.spark.sql.Row],scala.Long)=> Unit)org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row]无法应用于((org.apache.spark.sql.DataFrame,scala.Long)=> org.apache.spark.sql.DataFrame)askDF.writeStream.foreachBatch {(askDF:DataFrame,batchId:Long)=>
Error:(34, 25) overloaded method foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
这是我的代码:
package fr.fdj
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object serialize {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("serialize")
.getOrCreate()
def main(args : Array[String]) {
spark.sparkContext.setLogLevel("ERROR")
//schema definition
val mySchema = StructType(Array(
StructField("Species", StringType, true),
StructField("Race", StringType, true),
StructField("Color", StringType, true),
StructField("Age", IntegerType, true)
))
val askDF = spark
.readStream
.format("json")
.option("header", "true")
.schema(mySchema)
.load("/src/main/scala/file.json")
askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
}.start().awaitTermination()
}
}
推荐答案
我想您正在使用Scala 2.12.
I suppose you are using Scala 2.12.
由于Scala 2.12中的某些更改,方法DataStreamWriter.foreachBatch需要对代码进行一些更新,否则会产生歧义.
Due to some changes in Scala 2.12, the method DataStreamWriter.foreachBatch requires some updates on the code, otherwise this ambiguity happens.
您可以在此处检查这两个foreachBatch方法: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html
You can check Both foreachBatch methods here: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html
我想您可以改用scala 2.11,或检查解决该问题的链接: https://docs.databricks.com/release-notes/runtime/7.0.html
I guess you could user scala 2.11 instead, or check the link, where the issue has been addressed: https://docs.databricks.com/release-notes/runtime/7.0.html
在您的代码中,您可以尝试以下操作:
In your code, you could try this:
def myFunc( askDF:DataFrame, batchID:Long ) : Unit = {
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
}
askDF.writeStream.foreachBatch(myFunc _).start().awaitTermination()
这篇关于带有替代方法的foreachBatch重载方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!