Spark结构化流异常:没有水印时不支持追加输出模式 [英] spark structured streaming exception : Append output mode not supported without watermark

查看:241
本文介绍了Spark结构化流异常:没有水印时不支持追加输出模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我按年度进行了简单的分组操作,并进行了如下汇总.我试图将结果附加到hdfs路径,如下所示.我说错了,

I have performed a simple group by operation on year and do some aggregation as below. I tried to append the result to hdfs path as shown below. I am getting error saying,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds

下面是我的代码.有人可以帮忙吗

below is my code. can someone please help

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()

我的输入为csv格式

    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733

我的预期输出应为hdfs,并按年份进行分区

my expected output should be in hdfs with partition on year

    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

我真的不确定我的方法有什么问题.请帮助

I am really not sure whats wrong with my approach. please help

推荐答案

这是一个涉及很多方面的问题:

This is a question with many aspects:

  • 结构化流API有局限性.
  • 一个人可以管道化多个查询,并且从技术上讲它会运行,但不会产生任何输出,因此这样做没有任何价值-即使您可以指定它也无法执行其他功能.
  • 手册指出:withWatermark必须在与时间戳相同的列上调用 汇总中使用的列.

  • The Structured Streaming API has limitations imho.
  • One can pipeline multiple queries and technically it runs, but produces no output, so it is of no value to do that - and it cannot perform such other functionality even though you can specify it.
  • The manual states: withWatermark must be called on the same column as the timestamp column used in the aggregate.

例如,df.withWatermark("time","1 min).groupBy(" time2).count()在附加输出模式下无效,因为 水印是在与汇总不同的列上定义的 柱子. 简而言之,对于Append,您需要WaterMark. 我认为您在这里遇到问题.

For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark. I think you have an issue here.

  • 使用路径时以下内容是否令人欣慰?

  • Is the following relavant when using path?

      .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
    

    • 此外,您的最终用例未知.这里的问题是这是否是一种好方法,但是我认为评估的见解太少了,我们只是认为是这样.
    • 我们假设同一部电影的收视率将成为未来微批次的一部分.
    • 提要中缺少event_time,但是您可以自己创建它.有点不现实,但可以,尽管TimeStamp有点问题,我们可以接受.
    • 我建议您查看此博客 http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/,以对结构化流媒体进行出色的评估.
      • Also, your final use case is not known. The question here is whether this is a good approach, but there is too little insight for me to assess, we simply assume it to be so.
      • We are assuming that Ratings for same movie will be part of a future microbatch.
      • There is a lack of an event_time in the feed, but you create it yourself. Somewhat unrealistic, but OK we can live with that, although TimeStamp is a little problematic.
      • I advise you look at this blog http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/ for an excellent assessment of Structured Streaming.
      • 所以,一般来说:

        • 在完成",附加"和更新"选项中,我认为您选择了正确的一个附加".可以使用更新,但我将其排除在范围之外.
        • 但是没有将event_time放在Window中.你应该做这个.我在这里末尾提供了一个示例,该示例在Spark Shell中运行,在该示例中我无法使case类起作用-这就是为什么花了这么长时间的原因,但是在已编译的程序中,这不是问题,也不是DataBricks.
        • 从功能上讲,您无法编写多个查询来进行尝试的聚合.就我而言,它只会产生一些错误.
        • 我建议您使用我使用的时间戳方法,因为我无法测试所有内容,所以使用起来更容易.

        然后:

        • 或者,考虑到您可以在不同的微型批次中获得多个电影评分,可以将该模块的输出写入KAFKA主题,然后将该主题读入另一个模块,然后进行第二次汇总并写出.
        • 或者,将包含计数字段的数据写出,然后提供一个用于查询的视图层,其中要考虑到存在多次写入的事实.

        这里是使用套接字输入和Spark Shell的示例-您可以推断出自己的数据以及微批处理的输出(请注意,看到数据会有延迟):

        Here is a sample using socket input and the Spark Shell - that you can extrapolate to your own data, and the output of a microbatch (note there are delays in seeing the data):

        import java.sql.Timestamp
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.streaming.OutputMode
        
        val sparkSession = SparkSession.builder
          .master("local")
          .appName("example")
          .getOrCreate()
        //create stream from socket
        
        import sparkSession.implicits._
        sparkSession.sparkContext.setLogLevel("ERROR")
        val socketStreamDs = sparkSession.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
          .as[String]
        
        val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 
        
        val windowedCount = stockDs
          .withWatermark("time", "20000 milliseconds")
          .groupBy( 
            window($"time", "10 seconds"),
                   $"symbol" 
          )
          .agg(sum("value"), count($"symbol"))
        
        val query =
          windowedCount.writeStream
            .format("console")
            .option("truncate", "false")
            .outputMode(OutputMode.Append())
        
        query.start().awaitTermination()
        

        导致:

        Batch: 14
        ----------------------------------------------+------+----------+-------------+  
        |window                                       |symbol|sum(value)|count(symbol)|
        +---------------------------------------------+------+----------+-------------+
        |[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
        |[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
        |[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
        |[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
        +---------------------------------------------+------+----------+-------------+
        

        这是一个很大的话题,您需要从整体上对其进行研究.

        It's quite a big topic and you need to look at it holistically.

        对于输出,您可以看到,在某些情况下,尽管avg输出可以用于计算总体avg,但在某些情况下可以方便地进行计数.成功.

        You can see for the output that having a count could be handy in some cases, although avg output can be used to count overall avg. Success.

        这篇关于Spark结构化流异常:没有水印时不支持追加输出模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆