如何将流数据集写入Kafka? [英] How to write streaming dataset to Kafka?

查看:384
本文介绍了如何将流数据集写入Kafka?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对主题数据进行一些充实.因此,使用Spark结构化流从Kafka接收器读取回Kafka.

I'm trying to do some enrichment to the topics data. Therefore read from Kafka sink back to Kafka using Spark structured streaming.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但是我遇到了一个例外:

But im getting an exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

有什么解决方法吗?

推荐答案

T. Gawęda提到,没有将流数据集写入Kafka(即Kafka接收器)的kafka格式.

As T. Gawęda mentioned, there is no kafka format to write streaming datasets to Kafka (i.e. a Kafka sink).

Spark 2.1中当前推荐的解决方案是使用

The currently recommended solution in Spark 2.1 is to use foreach operator.

foreach运算允许对输出数据进行任意运算.从Spark 2.1开始,此功能仅适用于Scala和Java.要使用此功能,您将必须实现ForeachWriter接口(Scala/Java文档),该接口具有的方法只要在触发器之后产生一系列作为输出的行序列,就会被调用.请注意以下要点.

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

这篇关于如何将流数据集写入Kafka?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆