如何从Kafka读取XML格式的流数据? [英] How to read streaming data in XML format from Kafka?
问题描述
我正在尝试使用Spark结构化流技术从Kafka主题中读取XML数据.
I am trying to read XML data from Kafka topic using Spark Structured streaming.
我尝试使用Databricks的 spark-xml
程序包,但是出现错误消息,说该程序包不支持流式读取.有什么方法可以使用结构化流从Kafka主题提取XML数据?
I tried using the Databricks spark-xml
package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?
我当前的代码:
df = spark \
.readStream \
.format("kafka") \
.format('com.databricks.spark.xml') \
.options(rowTag="MainElement")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option(subscribeType, "test") \
.load()
错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
推荐答案
.format("kafka") \
.format('com.databricks.spark.xml') \
最后一个带有 com.databricks.spark.xml
的代码获胜并成为流媒体源(将Kafka隐藏为源).
The last one with com.databricks.spark.xml
wins and becomes the streaming source (hiding Kafka as the source).
按顺序来说,以上代码等效于 .format('com.databricks.spark.xml')
.
In order words, the above is equivalent to .format('com.databricks.spark.xml')
alone.
如您所知,Databricks的 spark-xml
包不支持流式读取(即不能充当流式源).该软件包不可用于流式传输.
As you may have experienced, the Databricks spark-xml
package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.
有什么方法可以使用结构化流从Kafka主题中提取XML数据吗?
Is there any way I can extract XML data from Kafka topic using structured streaming?
您可以使用标准函数或UDF自己访问和处理XML.在Spark 2.2.0之前的结构化流中,不存在对流XML处理的内置支持.
You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.
反正这没什么大不了的.Scala代码可能如下所示.
That should not be a big deal anyway. A Scala code could look as follows.
val input = spark.
readStream.
format("kafka").
...
load
val values = input.select('value cast "string")
val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
// print XMLs and numbers to the stdout
val q = numbersFromXML.
writeStream.
format("console").
start
Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame
. That is supposed to work.
这篇关于如何从Kafka读取XML格式的流数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!