如何使用Spark结构化流将数据从Kafka主题流式传输到Delta表 [英] How to stream data from Kafka topic to Delta table using Spark Structured Streaming

查看:103
本文介绍了如何使用Spark结构化流将数据从Kafka主题流式传输到Delta表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解数据块增量,并考虑使用Kafka进行POC.基本上,计划是使用来自Kafka的数据并将其插入到databricks增量表中.

I'm trying to understand databricks delta and thinking to do a POC using Kafka. Basically the plan is to consume data from Kafka and insert it to the databricks delta table.

这些是我执行的步骤:

  1. 在数据块上创建增量表.

%sql
CREATE TABLE hazriq_delta_trial2 (
  value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'

  1. 使用Kafka的数据.

import org.apache.spark.sql.types._
    
val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"
    
val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 100)
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .table("hazriq_delta_trial2")

但是,当我查询该表时,它是空的.

我可以确认数据即将到来.当我向Kafka主题发送消息时,通过查看图中的峰值来验证这一点.

I can confirm that the data is coming. I verify it by seeing the spike in the graph when I produce a message to the Kafka topic.

我想念什么吗?

我需要有关如何将从卡夫卡(Kafka)获得的数据插入表中的帮助.

I need help on how I can insert the data that I get from Kafka into the table.

推荐答案

下面是一个有效的示例,说明如何从Kafka中读取数据并将其流式传输到增量表中.我使用的是Spark 3.0.1和增量核心0.7.0(如果您使用的是Spark 2.4版本,则需要使用0.6.0).

Below is a working example on how to read data from Kafka and stream it into a delta table. I was using Spark 3.0.1 and delta-core 0.7.0 (if you are on Spark 2.4 version you need to use 0.6.0).

val spark = SparkSession.builder()
  .appName("Kafka2Console")
  .master("local[*]")
  .getOrCreate()

// in production this should be a more reliable location such as HDFS
val deltaPath = "file:///tmp/delta/table"

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("CAST(value AS STRING) as value")

val query: StreamingQuery = df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/sparkCheckpoint")
  .start(deltaPath)

query.awaitTermination()

为了进行测试,我只产生了字符"a","b","c".和"d"表示作为Kafka主题的价值.显然,如果Kafka输入数据例如是JSON字符串.

For testing, I have simply produced characters "a", "b", "c" and "d" as values into the Kafka topic. Obviously, you can build some more sophisticated Dataframes if the Kafka input data is e.g. a JSON string.

val table = spark.read
  .format("delta")
  .load(deltaPath)
  .createOrReplaceTempView("testTable")

spark.sql("SELECT * FROM testTable").show(false)

// result
+-----+
|value|
+-----+
|a    |
|b    |
|c    |
|d    |
+-----+

在deltaPath中创建的文件

>/tmp/delta/table$ ll
total 44
drwxrwxr-x 3 x x 4096 Jan 11 17:12 ./
drwxrwxr-x 3 x x 4096 Jan 11 17:10 ../
drwxrwxr-x 2 x x 4096 Jan 11 17:12 _delta_log/
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-0a0ae7fb-2995-4da4-8284-1ab85899fe9c-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  306 Jan 11 17:12 part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-37eb0bb2-cd27-42a4-9db3-b79cb046b638-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  414 Jan 11 17:12 part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-8d6b4236-1a12-4054-b016-3db7a007cbab-c000.snappy.parquet.crc
-rw-r--r-- 1 x x  407 Jan 11 17:12 part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet
-rw-r--r-- 1 x x   12 Jan 11 17:12 .part-00000-d2612eaa-3f48-4708-bf90-31dd3d83f124-c000.snappy.parquet.crc


这篇关于如何使用Spark结构化流将数据从Kafka主题流式传输到Delta表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆