从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake [英] Read file path from Kafka topic and then read file and write to DeltaLake in Structured Streaming

查看:22
本文介绍了从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka卡夫卡中的消息.我必须使用 Spark 结构化流处理数据.

I have a use case where the file path of the json records stored in s3 are coming as a kafka message in kafka. I have to process the data using spark structured streaming.

我想到的设计如下:

  1. 在 kafka Spark 结构化流中,读取包含数据路径的消息.
  2. 在驱动程序中收集消息记录.(消息很小)
  3. 从数据位置创建数据框.

kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################", rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

我想知道您的意见,这种方法好不好?特别是如果在调用 collect 后创建 Dataframe 存在一些问题.如果有更好的方法,请告诉我.

I would like to know your views, if this approach is fine? Specifically if there is some problem with creating the Dataframe after calling collect. If there is any better approach, please let me know the same.

推荐答案

你的想法完美无缺.

实际上,必须将您的 Dataframe 收集到驱动程序.否则,您无法通过在每个执行器上调用 SparkSession 来创建分布式数据集.如果没有 collect,您最终会遇到 NullPointerException.

Actually, it is mandatory to collect your Dataframe to the driver. Otherwise, you can not create a distributed dataset by calling the SparkSession on each executor. Without the collect you will end up having a NullPointerException.

我稍微重新编写了您的代码 sceleton,并实现了如何将您的数据帧写入增量表的部分(基于您的其他 问题).此外,我使用的是 Dataset[String] 而不是 Dataframe[Row] 这让生活更轻松.

I have slightly re-written your code sceleton and also implemented the part on how to write your Dataframe into a delta table (based on your other question). In addition, I am using a Dataset[String] instead of a Dataframe[Row] which makes life a bit easier.

将 Spark 3.0.1 与 delta-core 0.7.0 一起使用效果很好.例如我的测试文件看起来像

Using Spark 3.0.1 with delta-core 0.7.0 works fine. As an example my test file looks like

{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}

我将该文件的位置发送到名为test"的 Kafka 主题中.并应用以下代码来解析文件并使用以下代码将其列(基于给定架构)写入增量表:

I sent the location of that file to a Kafka topic called "test" and applied the following code to parse the file and write its columns (based on a given schema) into a delta table using the code below:

  val spark = SparkSession.builder()
    .appName("KafkaConsumer")
    .master("local[*]")
    .getOrCreate()

  val jsonSchema = new StructType()
    .add("a", StringType)
    .add("b", StringType)

  val deltaPath = "file:///tmp/spark/delta/test"

  import spark.implicits._
  val kafkaDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as data_path")
    .as[String]

  kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
    // collect to driver
    val records = batchDf.collect()

    // create dataframe based on file location and process and write to Delta-Lake
    records.foreach((path: String) => {
      val dfToProcess = spark.read.schema(jsonSchema).json(path)
      dfToProcess.show(false) // replace this line with your custom processing logic
      dfToProcess.write.format("delta").save(deltaPath)
    })
  }).start()

  spark.streams.awaitAnyTermination()

show 调用的输出符合预期:

The output of the show call is as expected:

+----+----+
|a   |b   |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+

并且数据已经作为delta表写入到通过deltaPath

and the data has been written as delta table into the location specified through deltaPath

/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc

这篇关于从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆