Spark Dataframe 以 avro 格式写入 kafka 主题? [英] Spark Dataframe write to kafka topic in avro format?

查看:50
本文介绍了Spark Dataframe 以 avro 格式写入 kafka 主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Spark 中有一个 Dataframe 看起来像

I have a Dataframe in Spark that looks like

eventDF

   Sno|UserID|TypeExp
    1|JAS123|MOVIE
    2|ASP123|GAMES
    3|JAS123|CLOTHING
    4|DPS123|MOVIE
    5|DPS123|CLOTHING
    6|ASP123|MEDICAL
    7|JAS123|OTH
    8|POQ133|MEDICAL
    .......
    10000|DPS123|OTH

我需要将它以Avro格式写入Kafka主题目前我可以使用以下代码在 Kafka 中写入 JSON

I need to write it to Kafka topic in Avro format currently i am able to write in Kafka as JSON using following code

val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
  kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
    .option("kafka.bootstrap.servers", "Host:port")
    .option("topic", "eventdf")
    .save()

现在我想把这个以Avro格式写到Kafka主题中

Now I want to write this in Avro format to Kafka topic

推荐答案

Spark >= 2.4:

您可以使用to_avro 函数来自 spark-avro 图书馆.

You can use to_avro function from spark-avro library.

import org.apache.spark.sql.avro._

eventDF.select(
  to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)

火花<2.4

你必须以同样的方式去做:

You have to do it the same way:

  • 创建一个函数,将序列化的 Avro 记录写入 ByteArrayOutputStream 并返回结果.一个简单的实现(这仅支持平面对象)可能类似于(采用自 Kafka Avro Scala示例来自Sushil Kumar Singh)

  • Create a function which writes serialized Avro record to ByteArrayOutputStream and return the result. A naive implementation (this supports only flat objects) could be similar to (adopted from Kafka Avro Scala Example by Sushil Kumar Singh)

import org.apache.spark.sql.Row

def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
  val gr: GenericRecord = new GenericData.Record(schema)
  row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))

  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
  writer.write(gr, encoder)
  encoder.flush()
  out.close()

  out.toByteArray()
}

  • 将其转换为udf:

    import org.apache.spark.sql.functions.udf
    
    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
    

  • 用它代替 to_json

    eventDF.select(
      encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )
    

  • 这篇关于Spark Dataframe 以 avro 格式写入 kafka 主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆