如何从Kafka读取XML格式的流数据? [英] How to read streaming data in XML format from Kafka?

查看:80
本文介绍了如何从Kafka读取XML格式的流数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark结构化流技术从Kafka主题中读取XML数据.

I am trying to read XML data from Kafka topic using Spark Structured streaming.

我尝试使用Databricks的 spark-xml 程序包,但是出现错误消息,说该程序包不支持流式读取.有什么方法可以使用结构化流从Kafka主题提取XML数据?

I tried using the Databricks spark-xml package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?

我当前的代码:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)

推荐答案

.format("kafka") \
.format('com.databricks.spark.xml') \

最后一个带有 com.databricks.spark.xml 的代码获胜并成为流媒体源(将Kafka隐藏为源).

The last one with com.databricks.spark.xml wins and becomes the streaming source (hiding Kafka as the source).

按顺序来说,以上代码等效于 .format('com.databricks.spark.xml').

In order words, the above is equivalent to .format('com.databricks.spark.xml') alone.

如您所知,Databricks的 spark-xml 包不支持流式读取(即不能充当流式源).该软件包不可用于流式传输.

As you may have experienced, the Databricks spark-xml package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.

有什么方法可以使用结构化流从Kafka主题中提取XML数据吗?

Is there any way I can extract XML data from Kafka topic using structured streaming?

您可以使用标准函数或UDF自己访问和处理XML.在Spark 2.2.0之前的结构化流中,不存在对流XML处理的内置支持.

You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.

反正这没什么大不了的.Scala代码可能如下所示.

That should not be a big deal anyway. A Scala code could look as follows.

val input = spark.
  readStream.
  format("kafka").
  ...
  load

val values = input.select('value cast "string")

val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))

// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start


另一种可能的解决方案是编写您自己的自定义流


Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame. That is supposed to work.

这篇关于如何从Kafka读取XML格式的流数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆