如何将流数据集写入Kafka? [英] How to write streaming dataset to Kafka?

查看:27
本文介绍了如何将流数据集写入Kafka?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对主题数据进行一些充实.因此使用 Spark 结构化流从 Kafka sink 读取回 Kafka.

I'm trying to do some enrichment to the topics data. Therefore read from Kafka sink back to Kafka using Spark structured streaming.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但我遇到了一个例外:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

有什么解决方法吗?

推荐答案

As T.Gawęda 提到,没有 kafka 格式可以将流数据集写入 Kafka(即 Kafka 接收器).

As T. Gawęda mentioned, there is no kafka format to write streaming datasets to Kafka (i.e. a Kafka sink).

Spark 2.1 目前推荐的解决方案是使用 foreach 运算符.

The currently recommended solution in Spark 2.1 is to use foreach operator.

foreach 操作允许对输出数据进行任意操作.从 Spark 2.1 开始,这仅适用于 Scala 和 Java.要使用它,您必须实现接口 ForeachWriter(Scala/Java 文档),该接口具有在触发后生成一系列行作为输出时调用的方法.请注意以下要点.

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

这篇关于如何将流数据集写入Kafka?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆