为什么流聚合会一直延迟到总是有两批数据? [英] Why does streaming aggregation delay until two batches of data always?

查看:158
本文介绍了为什么流聚合会一直延迟到总是有两批数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spark 2.3.0.

I use Spark 2.3.0.

我的问题是,每当我在输入目录中添加第三批数据时,第一批数据就被处理并打印到控制台.为什么?

My issue is whenever I add third batch of data in my input directory, the first batch of data getting processed and printing to console. Why?

val spark = SparkSession
  .builder()
  .appName("micro1")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .config("spark.sql.parquet.cacheMetadata","false")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
  .add("id", IntegerType)
  .add("name", StringType)
  .add("year", IntegerType)
  .add("rating", DoubleType)
  .add("duration", IntegerType)
val xmlData = spark
  .readStream
  .option("sep", ",")
  .schema(mySchema)
  .csv("tostack")

// Right side of a join
val mappingSchema = new StructType()
  .add("id", StringType)
  .add("megavol", StringType)
val staticData = spark
  .read
  .option("sep", ",")
  .schema(mappingSchema)
  .csv("input_tost_static.csv") 

xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")

spark
  .sql("select * from xmlupdates a join mappingdata b on  a.id=b.id")
  .withColumn(
    "event_time",
    to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
  .withWatermark("event_time", "10 seconds")
  .groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
  .agg(
    sum($"rating") as "rating",
    sum($"duration") as "duration",
    sum($"megavol") as "sum_megavol")
  .drop("window")
  .writeStream
  .outputMode("append")
  .format("console")
  .start

我的输出显示数据如下:我先开始流式传输,然后将数据添加到特定文件夹中.当我添加第三个文件时,将打印第一个文件的聚合结果.为什么?

my output showing data as below: I have started the streaming first and later added data in to the particular folder. when i add my third file the first file aggregated results are getting printed. Why?

     -------------------------------------------
     Batch: 0
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 1
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 2
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     |1963|   2.8|    5126|       46.0|
     |1921|   6.0|   15212|     3600.0|
     +----+------+--------+-----------+

输入数据如下:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733

input_tost_static.csv数据集如下:

3,3000
4,600
5,46

有人可以帮我为什么火花结构化流显示这种行为吗?我需要在这里添加任何设置吗? 更新:如果我尝试在JOIN操作之前打印val,我将获得批次1的结果...问题是在加入后出现的.它延迟了3批以上....

can some one help me why spark structued streaming showing this behaviour ? Do i need to add any settings here ? UPDATE : I am getting results in batch 1 itself if i try to print the val before JOIN operation... the issue is coming after joining.. its delaying morethan 3 batches....

推荐答案

我首先开始了流式传输

I have started the streaming first

批处理:0 是在您开始查询后立即执行的,并且没有事件流,也就没有输出.

Batch: 0 is executed right after you started the query and given no events were streamed, no output.

这时,根本没有设置事件时间水印.

At this point, the event-time watermark is not set at all.

,然后将数据添加到特定文件夹中.

and later added data in to the particular folder.

可能是批次:1 .

然后将事件时间水印设置为current_timestamp.为了获得任何输出,我们必须等待"10 seconds"(根据withWatermark("event_time", "10 seconds")).

The event-time watermark was then set to current_timestamp. In order to get any output, we have to wait "10 seconds" (according to withWatermark("event_time", "10 seconds")).

当我添加第三个文件时,将打印第一个文件的聚合结果.为什么?

when i add my third file the first file aggregated results are getting printed. Why?

可能是批次:2 .

假设下一次您添加新文件是在上一个current_timestamp + "10 seconds"之后,因此您得到了输出.

I assume the next time you added new files it was after previous current_timestamp + "10 seconds" and so you got the output.

请注意,水印只能是0,这意味着不会有任何后期数据.

Please note that a watermark can be just 0 which means that no late data is expected.

这篇关于为什么流聚合会一直延迟到总是有两批数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆