流聚合未写入接收器 [英] streaming aggregate not writing into sink
问题描述
我必须处理每天收到的一些文件.该信息具有主键(日期,client_id,operation_id).因此,我创建了一个Stream,该流仅将新数据附加到增量表中:
I have to process some files which arrive to me daily. The information have primary key (date,client_id,operation_id). So I created a Stream which append only new data into a delta table:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
这工作正常,但是我需要总结按(date,client_id)分组的信息,因此我创建了另一个从此操作表到新表的流.因此,我尝试将date
字段转换为时间戳,以便在编写聚合流时可以使用附加模式:
This is working fine, but i need to summarize this information grouped by (date,client_id), so i created another streaming from this operations table to a new table. So i tried to convert my date
field to a timestamp, so i could use append mode while writing an aggregated stream:
import pyspark.sql.functions as F
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date'))
summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>)
summarized\
.repartition('date')\
.writeStream\
.outputMode('append')\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.trigger(once=True)\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
此代码可以运行,但不会在接收器中写入任何内容.
This code runs, but it does not write anything in the sink.
为什么不将结果写入接收器?
why it isn't writing results into sink?
推荐答案
此处可能有两个问题.
我非常确定问题出在输入内容格式错误的F.to_timestamp('date')
导致null
.
I'm quite sure that the issue is with F.to_timestamp('date')
that gives null
due to malformed input.
如果是这样,withWatermark('timestamp_date','1 second')
将永远无法物化",并且不会触发任何输出.
If so, withWatermark('timestamp_date','1 second')
can never be "materialized" and triggers no output.
您能否spark.read.format('delta').load('/mnt/sandbox/operations')
(到read
而不是readStream
),看看转换是否给出正确的值?
Could you spark.read.format('delta').load('/mnt/sandbox/operations')
(to read
not to readStream
) and see if the conversion gives proper values?
spark.\
read.\
format('delta').\
load('/mnt/sandbox/operations').\
withColumn('timestamp_date',F.to_timestamp('date')).\
show
所有行都使用相同的时间戳
withWatermark('timestamp_date','1 second')
也有可能没有完成(因此完成"了聚合),因为所有行都来自同一时间戳,因此时间不会提前.
All Rows Use Same Timestamp
It is also possible that withWatermark('timestamp_date','1 second')
does not finishes (and so "completes" an aggregation) because all rows are from the same timestamp so the time does not advance.
您应该具有不同时间戳的行,以便每个timestamp_date的时间概念可以超过'1 second'
延迟窗口.
You should have rows with different timestamps so the notion of time per the timestamp_date can get past the '1 second'
lateness window.
这篇关于流聚合未写入接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!