从Kafka读取json并将json写入其他Kafka主题 [英] Read json from Kafka and write json to other Kafka topic
问题描述
我正在尝试为 Spark 流准备应用程序(Spark 2.1、Kafka 0.10)
I'm trying prepare application for Spark streaming (Spark 2.1, Kafka 0.10)
我需要从Kafka主题输入"读取数据,找到正确的数据并将结果写入主题输出"
I need to read data from Kafka topic "input", find correct data and write result to topic "output"
我可以基于 KafkaUtils.createDirectStream 方法从 Kafka 读取数据.
I can read data from Kafka base on KafkaUtils.createDirectStream method.
我将 RDD 转换为 json 并准备过滤器:
I converted the RDD to json and prepare filters:
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>
val PeopleDf=spark.read.schema(schema1).json(rdd)
import spark.implicits._
PeopleDf.show()
val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
PeopleDfFilter.show()
}
我可以从 Kafka 加载数据并按原样"写入到 Kafka 使用 KafkaProducer:
I can load data from Kafka and write "as is" to Kafka use KafkaProducer:
messages.foreachRDD( rdd => {
rdd.foreachPartition( partition => {
val kafkaTopic = "output"
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partition.foreach{ record: ConsumerRecord[String, String] => {
System.out.print("########################" + record.value())
val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
producer.send(messageResult)
}}
producer.close()
})
})
然而,我无法整合这两个动作 >在 json 中找到正确的值并将结果写入 Kafka:以 JSON 格式将 PeopleDfFilter 写入输出";卡夫卡话题.
However, I cannot integrate those two actions > find in json proper value and write findings to Kafka: write PeopleDfFilter in JSON format to "output" Kafka topic.
我在Kafka中有很多输入消息,这就是我想使用foreachPartition创建Kafka生产者的原因.
I have a lot of input messages in Kafka, this is the reason I want to use foreachPartition to create the Kafka producer.
推荐答案
过程很简单,为什么不一直使用结构化流?
The process is very simple so why not use structured streaming all the way?
import org.apache.spark.sql.functions.from_json
spark
// Read the data
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", inservers)
.option("subscribe", intopic)
.load()
// Transform / filter
.select(from_json($"value".cast("string"), schema).alias("value"))
.filter(...) // Add the condition
.select(to_json($"value").alias("value")
// Write back
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", outservers)
.option("subscribe", outtopic)
.start()
这篇关于从Kafka读取json并将json写入其他Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!