如何从Kafka读取XML格式的流数据? [英] How to read streaming data in XML format from Kafka?

查看:81
本文介绍了如何从Kafka读取XML格式的流数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 结构化流从 Kafka 主题读取 XML 数据.

I am trying to read XML data from Kafka topic using Spark Structured streaming.

我尝试使用 Databricks spark-xml 包,但我收到错误消息,说该包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据?

I tried using the Databricks spark-xml package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?

我当前的代码:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)

推荐答案

.format("kafka") \
.format('com.databricks.spark.xml') \

最后一个 com.databricks.spark.xml 获胜并成为流媒体源(隐藏 Kafka 作为源).

The last one with com.databricks.spark.xml wins and becomes the streaming source (hiding Kafka as the source).

换句话说,上面的内容相当于单独的.format('com.databricks.spark.xml').

In order words, the above is equivalent to .format('com.databricks.spark.xml') alone.

如您所见,Databricks spark-xml 包不支持流式读取(即不能充当流式源).该软件包不适用于流式传输.

As you may have experienced, the Databricks spark-xml package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.

有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据?

Is there any way I can extract XML data from Kafka topic using structured streaming?

您只能使用标准函数或 UDF 自己访问和处理 XML.在 Spark 2.2.0 之前的 Structured Streaming 中没有对流式 XML 处理的内置支持.

You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.

无论如何,这应该没什么大不了的.Scala 代码可能如下所示.

That should not be a big deal anyway. A Scala code could look as follows.

val input = spark.
  readStream.
  format("kafka").
  ...
  load

val values = input.select('value cast "string")

val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))

// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start

<小时>

另一种可能的解决方案是编写自己的自定义流Source 将处理 def getBatch(start: Option[Offset], end: Offset) 中的 XML 格式:数据帧.那应该可以工作.


Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame. That is supposed to work.

这篇关于如何从Kafka读取XML格式的流数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆