为什么流聚合会一直延迟到总是有两批数据? [英] Why does streaming aggregation delay until two batches of data always?
问题描述
我使用Spark 2.3.0.
I use Spark 2.3.0.
我的问题是,每当我在输入目录中添加第三批数据时,第一批数据就被处理并打印到控制台.为什么?
My issue is whenever I add third batch of data in my input directory, the first batch of data getting processed and printing to console. Why?
val spark = SparkSession
.builder()
.appName("micro1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
.config("spark.sql.parquet.cacheMetadata","false")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("year", IntegerType)
.add("rating", DoubleType)
.add("duration", IntegerType)
val xmlData = spark
.readStream
.option("sep", ",")
.schema(mySchema)
.csv("tostack")
// Right side of a join
val mappingSchema = new StructType()
.add("id", StringType)
.add("megavol", StringType)
val staticData = spark
.read
.option("sep", ",")
.schema(mappingSchema)
.csv("input_tost_static.csv")
xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")
spark
.sql("select * from xmlupdates a join mappingdata b on a.id=b.id")
.withColumn(
"event_time",
to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
.withWatermark("event_time", "10 seconds")
.groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
.agg(
sum($"rating") as "rating",
sum($"duration") as "duration",
sum($"megavol") as "sum_megavol")
.drop("window")
.writeStream
.outputMode("append")
.format("console")
.start
我的输出显示数据如下:我先开始流式传输,然后将数据添加到特定文件夹中.当我添加第三个文件时,将打印第一个文件的聚合结果.为什么?
my output showing data as below: I have started the streaming first and later added data in to the particular folder. when i add my third file the first file aggregated results are getting printed. Why?
-------------------------------------------
Batch: 0
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
|1963| 2.8| 5126| 46.0|
|1921| 6.0| 15212| 3600.0|
+----+------+--------+-----------+
输入数据如下:
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
input_tost_static.csv
数据集如下:
3,3000
4,600
5,46
有人可以帮我为什么火花结构化流显示这种行为吗?我需要在这里添加任何设置吗? 更新:如果我尝试在JOIN操作之前打印val,我将获得批次1的结果...问题是在加入后出现的.它延迟了3批以上....
can some one help me why spark structued streaming showing this behaviour ? Do i need to add any settings here ? UPDATE : I am getting results in batch 1 itself if i try to print the val before JOIN operation... the issue is coming after joining.. its delaying morethan 3 batches....
推荐答案
我首先开始了流式传输
I have started the streaming first
批处理:0 是在您开始查询后立即执行的,并且没有事件流,也就没有输出.
Batch: 0 is executed right after you started the query and given no events were streamed, no output.
这时,根本没有设置事件时间水印.
At this point, the event-time watermark is not set at all.
,然后将数据添加到特定文件夹中.
and later added data in to the particular folder.
可能是批次:1 .
然后将事件时间水印设置为current_timestamp
.为了获得任何输出,我们必须等待"10 seconds"
(根据withWatermark("event_time", "10 seconds")
).
The event-time watermark was then set to current_timestamp
. In order to get any output, we have to wait "10 seconds"
(according to withWatermark("event_time", "10 seconds")
).
当我添加第三个文件时,将打印第一个文件的聚合结果.为什么?
when i add my third file the first file aggregated results are getting printed. Why?
可能是批次:2 .
假设下一次您添加新文件是在上一个current_timestamp
+ "10 seconds"
之后,因此您得到了输出.
I assume the next time you added new files it was after previous current_timestamp
+ "10 seconds"
and so you got the output.
请注意,水印只能是0
,这意味着不会有任何后期数据.
Please note that a watermark can be just 0
which means that no late data is expected.
这篇关于为什么流聚合会一直延迟到总是有两批数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!