Spark Dataframe以avro格式写入kafka主题吗? [英] Spark Dataframe write to kafka topic in avro format?
问题描述
我在Spark中有一个类似
I have a Dataframe in Spark that looks like
eventDF
eventDF
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
.......
10000|DPS123|OTH
我需要将其以Avro格式写入Kafka主题 目前,我可以使用以下代码以JSON的形式在Kafka中编写
I need to write it to Kafka topic in Avro format currently i am able to write in Kafka as JSON using following code
val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
.option("kafka.bootstrap.servers", "Host:port")
.option("topic", "eventdf")
.save()
现在我想以Avro格式将其写到Kafka主题
Now I want to write this in Avro format to Kafka topic
推荐答案
火花> = 2.4 :
您可以使用 spark-avro
库.
You can use to_avro
function from spark-avro
library.
import org.apache.spark.sql.avro._
eventDF.select(
to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)
火花< 2.4
您必须以相同的方式进行操作:
You have to do it the same way:
-
创建一个将序列化的Avro记录写入
ByteArrayOutputStream
并返回结果的函数.天真的实现(仅支持平面对象)可能类似于(从 Kafka Avro Scala采纳的示例 Sushil Kumar Singh )
Create a function which writes serialized Avro record to
ByteArrayOutputStream
and return the result. A naive implementation (this supports only flat objects) could be similar to (adopted from Kafka Avro Scala Example by Sushil Kumar Singh)
import org.apache.spark.sql.Row
def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
val gr: GenericRecord = new GenericData.Record(schema)
row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(gr, encoder)
encoder.flush()
out.close()
out.toByteArray()
}
将其转换为udf
:
import org.apache.spark.sql.functions.udf
val schema: org.apache.avro.Schema
val encodeUDF = udf(encode(schema) _)
使用它代替to_json
eventDF.select(
encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
)
这篇关于Spark Dataframe以avro格式写入kafka主题吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!