org.apache.spark.sql.AnalysisException:无法在流式数据集/DataFrame上调用“写入" [英] org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame

查看:103
本文介绍了org.apache.spark.sql.AnalysisException:无法在流式数据集/DataFrame上调用“写入"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正试图将Spark结构化流式(2.3)数据集写入ScyllaDB(Cassandra).

I'm trying to write a Spark Structured Streaming (2.3) dataset to ScyllaDB (Cassandra).

我写数据集的代码:

  def saveStreamSinkProvider(ds: Dataset[InvoiceItemKafka]) = {
    ds
      .writeStream
      .format("cassandra.ScyllaSinkProvider")
      .outputMode(OutputMode.Append)
      .queryName("KafkaToCassandraStreamSinkProvider")
      .options(
        Map(
          "keyspace" -> namespace,
          "table" -> StreamProviderTableSink,
          "checkpointLocation" -> "/tmp/checkpoints"
        )
      )
      .start()
  }

我的ScyllaDB流接收器:

My ScyllaDB Streaming Sinks:

class ScyllaSinkProvider extends StreamSinkProvider {
  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): ScyllaSink =
    new ScyllaSink(parameters)
}

class ScyllaSink(parameters: Map[String, String]) extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit =
    data.write
            .cassandraFormat(
              parameters("table"),
              parameters("keyspace")
              //parameters("cluster")
            )
      .mode(SaveMode.Append)
      .save()
}

但是,当我运行这段代码时,我收到一个异常:

However, when I run this code, I receive an exception:

...
[error]       +- StreamingExecutionRelation KafkaSource[Subscribe[transactions_load]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
[error]     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
[error]     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[error] Caused by: org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;
[error]     at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[error]     at org.apache.spark.sql.Dataset.write(Dataset.scala:3103)
[error]     at cassandra.ScyllaSink.addBatch(CassandraDriver.scala:113)
[error]     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
...


我也看到了类似的问题,但这是针对CosmosDB的-推荐答案

您可以先将其转换为RDD,然后编写:

You could convert it to an RDD first and then write:

class ScyllaSink(parameters: Map[String, String]) extends Sink {    

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    val schema = data.schema
    // this ensures that the same query plan will be used
    val rdd: RDD[Row] = df.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    // write the RDD to Cassandra 
  }
}

这篇关于org.apache.spark.sql.AnalysisException:无法在流式数据集/DataFrame上调用“写入"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆