从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake [英] Read file path from Kafka topic and then read file and write to DeltaLake in Structured Streaming

查看:38
本文介绍了从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,其中存储在s3中的json记录的文件路径以kafka的形式出现消息在卡夫卡.我必须使用Spark结构化流处理数据.

I have a use case where the file path of the json records stored in s3 are coming as a kafka message in kafka. I have to process the data using spark structured streaming.

我认为的设计如下:

  1. 在kafka Spark结构化流传输中,读取包含数据路径的消息.
  2. 在驱动程序中收集消息记录.(邮件很小)
  3. 从数据位置创建数据框.

kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################", rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

我想知道您的观点,如果这种方法很好吗?特别是在调用collect之后创建Dataframe时是否存在问题.如果有更好的方法,请让我知道.

I would like to know your views, if this approach is fine? Specifically if there is some problem with creating the Dataframe after calling collect. If there is any better approach, please let me know the same.

推荐答案

您的想法很好用.

实际上,必须将数据帧收集到驱动程序.否则,您将无法通过在每个执行程序上调用SparkSession来创建分布式数据集.没有 collect ,您将最终遇到NullPointerException.

Actually, it is mandatory to collect your Dataframe to the driver. Otherwise, you can not create a distributed dataset by calling the SparkSession on each executor. Without the collect you will end up having a NullPointerException.

我稍微重写了您的代码celeton,还实现了有关如何将Dataframe写入增量表的部分(基于您的其他

I have slightly re-written your code sceleton and also implemented the part on how to write your Dataframe into a delta table (based on your other question). In addition, I am using a Dataset[String] instead of a Dataframe[Row] which makes life a bit easier.

使用Spark 3.0.1和delta-core 0.7.0可以正常工作.例如,我的测试文件如下

Using Spark 3.0.1 with delta-core 0.7.0 works fine. As an example my test file looks like

{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}

我将该文件的位置发送给了一个名为"test"的Kafka主题.并应用以下代码来解析文件,并使用以下代码将其列(基于给定架构)写入增量表中:

I sent the location of that file to a Kafka topic called "test" and applied the following code to parse the file and write its columns (based on a given schema) into a delta table using the code below:

  val spark = SparkSession.builder()
    .appName("KafkaConsumer")
    .master("local[*]")
    .getOrCreate()

  val jsonSchema = new StructType()
    .add("a", StringType)
    .add("b", StringType)

  val deltaPath = "file:///tmp/spark/delta/test"

  import spark.implicits._
  val kafkaDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as data_path")
    .as[String]

  kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
    // collect to driver
    val records = batchDf.collect()

    // create dataframe based on file location and process and write to Delta-Lake
    records.foreach((path: String) => {
      val dfToProcess = spark.read.schema(jsonSchema).json(path)
      dfToProcess.show(false) // replace this line with your custom processing logic
      dfToProcess.write.format("delta").save(deltaPath)
    })
  }).start()

  spark.streams.awaitAnyTermination()

show 调用的输出符合预期:

The output of the show call is as expected:

+----+----+
|a   |b   |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+

并且数据已作为增量表写入通过 deltaPath

and the data has been written as delta table into the location specified through deltaPath

/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc

这篇关于从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆