带有替代方法的foreachBatch重载方法 [英] Overloaded method foreachBatch with alternatives

查看:117
本文介绍了带有替代方法的foreachBatch重载方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将json文件序列化为实木复合地板格式.我有这个错误:

I am trying to serialize a json file to parquet format. I have this error :

错误:(34,25)重载了foreachBatch的方法,并带有替代方法:(函数:org.apache.spark.api.java.function.VoidFunction2 [org.apache.spark.sql.Dataset [org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row](功能:(org.apache.spark.sql.Dataset [org.apache.spark.sql.Row],scala.Long)=> Unit)org.apache.spark.sql.streaming.DataStreamWriter [org.apache.spark.sql.Row]无法应用于((org.apache.spark.sql.DataFrame,scala.Long)=> org.apache.spark.sql.DataFrame)askDF.writeStream.foreachBatch {(askDF:DataFrame,batchId:Long)=>

Error:(34, 25) overloaded method foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>

这是我的代码:

package fr.fdj
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


object serialize {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("serialize")
    .getOrCreate()


  def main(args : Array[String]) {

  spark.sparkContext.setLogLevel("ERROR")

  //schema definition
  val mySchema = StructType(Array(
    StructField("Species", StringType, true),
    StructField("Race", StringType, true),
    StructField("Color", StringType, true),
    StructField("Age", IntegerType, true)
  ))

  val askDF = spark
  .readStream
  .format("json")
  .option("header", "true")
  .schema(mySchema)
  .load("/src/main/scala/file.json")

  askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
  }.start().awaitTermination()


  }
}

推荐答案

我想您正在使用Scala 2.12.

I suppose you are using Scala 2.12.

由于Scala 2.12中的某些更改,方法DataStreamWriter.foreachBatch需要对代码进行一些更新,否则会产生歧义.

Due to some changes in Scala 2.12, the method DataStreamWriter.foreachBatch requires some updates on the code, otherwise this ambiguity happens.

您可以在此处检查这两个foreachBatch方法: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html

You can check Both foreachBatch methods here: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html

我想您可以改用scala 2.11,或检查解决该问题的链接: https://docs.databricks.com/release-notes/runtime/7.0.html

I guess you could user scala 2.11 instead, or check the link, where the issue has been addressed: https://docs.databricks.com/release-notes/runtime/7.0.html

在您的代码中,您可以尝试以下操作:

In your code, you could try this:

def myFunc( askDF:DataFrame, batchID:Long ) : Unit = {
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
}
askDF.writeStream.foreachBatch(myFunc _).start().awaitTermination()

这篇关于带有替代方法的foreachBatch重载方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆