Spark Dataframe以avro格式写入kafka主题吗? [英] Spark Dataframe write to kafka topic in avro format?

查看:329
本文介绍了Spark Dataframe以avro格式写入kafka主题吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark中有一个类似

I have a Dataframe in Spark that looks like

eventDF

eventDF

   Sno|UserID|TypeExp
    1|JAS123|MOVIE
    2|ASP123|GAMES
    3|JAS123|CLOTHING
    4|DPS123|MOVIE
    5|DPS123|CLOTHING
    6|ASP123|MEDICAL
    7|JAS123|OTH
    8|POQ133|MEDICAL
    .......
    10000|DPS123|OTH

我需要将其以Avro格式写入Kafka主题 目前,我可以使用以下代码以JSON的形式在Kafka中编写

I need to write it to Kafka topic in Avro format currently i am able to write in Kafka as JSON using following code

val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
  kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
    .option("kafka.bootstrap.servers", "Host:port")
    .option("topic", "eventdf")
    .save()

现在我想以Avro格式将其写到Kafka主题

Now I want to write this in Avro format to Kafka topic

推荐答案

火花> = 2.4 :

您可以使用 函数来自 spark-avro 库.

You can use to_avro function from spark-avro library.

import org.apache.spark.sql.avro._

eventDF.select(
  to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)

火花< 2.4

您必须以相同的方式进行操作:

You have to do it the same way:

  • Create a function which writes serialized Avro record to ByteArrayOutputStream and return the result. A naive implementation (this supports only flat objects) could be similar to (adopted from Kafka Avro Scala Example by Sushil Kumar Singh)

import org.apache.spark.sql.Row

def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
  val gr: GenericRecord = new GenericData.Record(schema)
  row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))

  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
  writer.write(gr, encoder)
  encoder.flush()
  out.close()

  out.toByteArray()
}

  • 将其转换为udf:

    import org.apache.spark.sql.functions.udf
    
    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
    

  • 使用它代替to_json

    eventDF.select(
      encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )
    

  • 这篇关于Spark Dataframe以avro格式写入kafka主题吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆