Spark结构化流抛出错误中具有不同计数的聚合 [英] Aggregation with distinct count in Spark structured streaming throwing error

查看:41
本文介绍了Spark结构化流抛出错误中具有不同计数的聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试获取Spark结构化流媒体中的Parentgroup,childgroup和MountingType组的唯一ID.

i am trying to get unique id for group of Parentgroup,childgroup and MountingType in Spark structured streaming.

代码:以下代码抛出错误

Code: the below code is throwing error

  .withWatermark("timestamp", "1 minutes")
          val aggDF = JSONDF.groupBy("Parentgroup","childgroup","MountingType")
       .agg(countDistinct("id"))

Error:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

请有人帮我如何在结构化流中进行聚合并写入csv.非常感谢

may someone please help me how to do aggregate and write to the csv in structured streaming. Thanks a lot

数据:

{"id":"7CE3A7CA","Faulttime":1544362500,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"7CE3A7CA","Faulttime":1544362509,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"010004FF,"Faulttime":1551339188,"name":"Philips","Parentgroup":"Light","childgroup":"Other","MountingType":"Solder"}
    {"id":"010004FF","Faulttime":1551339188,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"Solder"}
    {"id":"010004FF,"Faulttime":1551339191,"name":"Sansui","Parentgroup":"AC","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"CE361405","Faulttime":1552159061,"name":"Hyndai","Parentgroup":"SBAR","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"CE361405","Faulttime":1552159061,"name":"sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
    {"id":"7BE446C0","Faulttime":1553022095,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"Solder"}
    {"id":"7BE446C0","Faulttime":1553022095,"name":"Philips","Parentgroup":"LIGHT","childgroup":"Other","MountingType":"Solder"}

推荐答案

分组依据操作需要在Spark Streaming中指定窗口或时间段.

Group By operations need to specify window or time period in spark streaming.

尝试一下

psuedo code    
val JSONDF = df.withWatermark("timestamp", "1 minutes")
val aggDF = JSONDF.groupBy(window("timestamp", "5 minutes", "1 minutes")).agg(countDistinct("id"),$"Parentgroup",$"childgroup",$"MountingType")

参考: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

这篇关于Spark结构化流抛出错误中具有不同计数的聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆