Spark Streaming-从Kafka读取json并将json写入其他Kafka主题 [英] Spark Streaming - read json from Kafka and write json to other Kafka topic

查看:1164
本文介绍了Spark Streaming-从Kafka读取json并将json写入其他Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试准备Spark流媒体应用程序(Spark 2.1,Kafka 0.10)

I'm trying prepare application for Spark streaming (Spark 2.1, Kafka 0.10)

我需要从Kafka主题输入"中读取数据,找到正确的数据并将结果写入主题输出"中

I need to read data from Kafka topic "input", find correct data and write result to topic "output"

我可以使用KafkaUtils.createDirectStream方法从Kafka读取数据.

I can read data from Kafka base on KafkaUtils.createDirectStream method.

我将RDD转换为json并准备了过滤器:

I converted the RDD to json and prepare filters:

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>

  val PeopleDf=spark.read.schema(schema1).json(rdd)
  import spark.implicits._
  PeopleDf.show()
  val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
  PeopleDfFilter.show()
}

我可以从Kafka加载数据,并使用KafkaProducer将原样"写入Kafka:

I can load data from Kafka and write "as is" to Kafka use KafkaProducer:

    messages.foreachRDD( rdd => {
      rdd.foreachPartition( partition => {
        val kafkaTopic = "output"
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        partition.foreach{ record: ConsumerRecord[String, String] => {
        System.out.print("########################" + record.value())
        val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
        producer.send(messageResult)
        }}
        producer.close()
      })

    })

但是我不能整合这两个动作>在json中找到合适的值并将发现结果写入Kafka:以JSON格式编写PeopleDfFilter来输出" Kafka主题.

But I can not integrate those two actions > find in json proper value and write findings to Kafka: write PeopleDfFilter in JSON format to "output" Kafka topic.

我在Kafka中有很多输入消息,这就是为什么我要使用foreachPartition创建Kafka生产者的原因.

I have a lot of input messages in Kafka, this is reason why I want to use foreachPartition for creating Kafka producer.

非常感谢您提供任何建议.

Thank you a lot for any advise.

推荐答案

该过程非常简单,为什么不完全使用结构化流?

The process is very simple so why not use structured streaming all the way?

import org.apache.spark.sql.functions.from_json

spark
  // Read the data
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", inservers) 
  .option("subscribe", intopic)
  .load()
  // Transform / filter
  .select(from_json($"value".cast("string"), schema).alias("value"))
  .filter(...)  // Add the condition
  .select(to_json($"value").alias("value")
  // Write back
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", outservers)
  .option("subscribe", outtopic)
  .start()

这篇关于Spark Streaming-从Kafka读取json并将json写入其他Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆