Spark结构化流中的流数据帧读取模式 [英] Reading schema of streaming Dataframe in Spark Structured Streaming

查看:79
本文介绍了Spark结构化流中的流数据帧读取模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Apache Spark结构化流媒体的新手.我正在尝试从事件中心(XML格式)中读取一些事件,并尝试从嵌套的XML中创建新的Spark DF.

我正在使用

我正在尝试为Spark结构化流执行相同的逻辑,如以下代码所示:

结构化流代码

  import com.databricks.spark.xml.functions.from_xml导入com.databricks.spark.xml.schema_of_xml导入org.apache.spark.eventhubs.{ConnectionStringBuilder,EventHubsConf,EventPosition}导入spark.implicits._导入org.apache.spark.sql.functions._导入org.apache.spark.sql.types._valstreamingInputDF =spark.readStream.format("eventhubs").options(eventHubsConf.toMap).加载()val payloadSchema = schema_of_xml(streamingInputDF.select("body").as [String])val parsed = streamingSelectDF.withColumn("parsed",from_xml($"body",payloadSchema))val final_df = parsed.select(parsed.col("parsed"))显示(final_df.select(已分析.*"))) 

valpayloadSchema = schema_of_xml(streamingInputDF.select("body").as [String])的代码部分中,指令抛出错误必须使用writeStream执行流媒体查询.start();;

更新

试图

 valstreamingInputDF =spark.readStream.format("eventhubs").options(eventHubsConf.toMap).加载().select(($``body'').cast(``string'')))val body_value = streamingInputDF.select("body").as [String]body_value.writeStream.format(控制台").开始()spark.streams.awaitAnyTermination()val payloadSchema = schema_of_xml(body_value)val解析= body_value.withColumn("parsed",from_xml($"body",payloadSchema))val final_df = parsed.select(parsed.col("parsed")) 

现在没有遇到错误,但Databricks保持等待状态"

谢谢!

解决方案

如果代码可以在批处理模式下工作,则没有任何问题.

不仅将源转换为流(使用 readStream load )很重要,而且还需要将接收器部分转换为流.

您收到的错误消息只是提醒您也要注意接收器部分.您的数据框 final_df 实际上是一个数据框,必须通过 start 启动.

《结构化流媒体指南》为您提供了所有可用的 final_df.writeStream.format(控制台").开始()spark.streams.awaitAnyTermination()

I'm new with Apache Spark Structured Streaming. I'm trying to read some events from Event Hub (in XML format) and trying to create new Spark DF from the nested XML.

Im using the code example described in https://github.com/databricks/spark-xml and in batch mode is running perfectly but not in Structured Spark Streaming.

Code chunk of spark-xml Github library

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

My batch code

val df = Seq(
  (8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2>  <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)


val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))

val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))

I was trying to do same logic for Spark Structured Streaming like the following code:

Structured Streaming code

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()

val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

display(final_df.select("parsed.*"))

In code part of val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String]) intstruction throws the error Queries with streaming sources must be executed with writeStream.start();;

Update

Tried to


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()
    .select(($"body").cast("string"))

val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()


val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

Now is not running into the error but Databricks stay in "Waiting status"

Thanks!!

解决方案

There is nothing wrong with your code if it works in batch mode.

It is important to not only convert the source into a stream (by using readStream and load) but it is also required to convert the sink part into a stream.

The error message you are getting is just reminding you to also look into the sink part. Your Dataframe final_df is actually a streaming Dataframe which has to be started through start.

The Structured Streaming Guide gives you a good overview on all available Output Sinks and the easiest would be to print the result to the console.

To summarize, you need to add the following to your program:

final_df.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()

这篇关于Spark结构化流中的流数据帧读取模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆