在Spark结构化流中执行单独的流查询 [英] Executing separate streaming queries in spark structured streaming

查看:69
本文介绍了在Spark结构化流中执行单独的流查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用两个不同的窗口聚合流并将其打印到控制台中.但是,仅第一个流查询正在打印. tenSecsQ未打印到控制台中.

I am trying to aggregate stream with two different windows and printing it into the console. However only the first streaming query is being printed. The tenSecsQ is not printed into the console.

SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .config("spark.master", "local[*]")
    .getOrCreate();

Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
    .groupBy(
         functions.window(words.col("timestamp"), "5 seconds"),
         words.col("word")
    ).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
    .groupBy(
          functions.window(words.col("timestamp"), "10 seconds"),
          words.col("word")
    ).count().orderBy("window");

5s和10s聚合流的触发流查询. 10s流的输出未打印.控制台仅打印5s

Trigger Streaming Query for both 5s and 10s aggregated streams. The output for 10s stream is not printed. Only 5s is printed into console

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
    .queryName("5_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
    .queryName("10_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

tenSecsQ.awaitTermination();

推荐答案

我一直在研究这个问题.

I've been investigating this question.

摘要:结构化流中的每个查询都使用source数据.套接字源为定义的每个查询创建一个新连接.在这种情况下看到的行为是因为nc仅将输入数据传递到第一个连接.

Summary: Each query in Structured Streaming consumes the source data. The socket source creates a new connection for each query defined. The behavior seen in this case is because nc is only delivering the input data to the first connection.

因此,除非我们可以确保所连接的套接字源向打开的每个连接传递相同的数据,否则无法在套接字连接上定义多个聚合.

Henceforth, it's not possible to define multiple aggregations over the socket connection unless we can ensure that the connected socket source delivers the same data to each connection open.

我在Spark邮件列表中讨论了这个问题. Databricks开发人员朱世雄回答:

I discussed this question on the Spark mailing list. Databricks developer Shixiong Zhu answered:

Spark为每个查询创建一个连接.您观察到的行为是因为"nc -lk"是如何工作的.如果使用netstat检查tcp连接,则在启动两个查询时将看到有两个连接.但是,"nc"仅将输入转发到一个连接.

Spark creates one connection for each query. The behavior you observed is because how "nc -lk" works. If you use netstat to check the tcp connections, you will see there are two connections when starting two queries. However, "nc" forwards the input to only one connection.

我通过定义一个小型实验验证了此行为: 首先,我创建了一个 SimpleTCPWordServer ,它为打开的每个连接提供随机单词,并提供基本的结构化流声明两个查询的作业.它们之间的唯一区别是第二个查询定义了一个额外的常量列来区分其输出:

I verified this behavior by defining a small experiment: First, I created a SimpleTCPWordServer that delivers random words to each connection open and a basic Structured Streaming job that declares two queries. The only difference between them is that the 2nd query defines an extra constant column to differentiate its output:

val lines = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", true)
    .load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("7 seconds"))
  .start()

如果StructuredStreaming仅消耗一个流,那么我们应该看到两个查询传递的单词相同.如果每个查询消耗一个单独的流,那么每个查询将报告不同的单词.

If StructuredStreaming would consume only one stream, then we should see the same words delivered by both queries. In the case that each query consumes a separate stream, then we will have different words reported by each query.

这是观察到的输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
|   value|          timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+

+------+-------------------+---+
| value|          timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
|  value|          timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
|   bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|     value|          timestamp|
+----------+-------------------+
|    breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
|   amazing|2017-08-14 13:54:52|
|    bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
|     asset|2017-08-14 13:54:54|
|      cell|2017-08-14 13:54:54|
+----------+-------------------+

我们可以清楚地看到每个查询的流是不同的.除非我们可以保证TCP后端服务器向每个打开的连接传递完全相同的数据,否则似乎不可能在socket source传递的数据上定义多个聚合.

We can clearly see that the streams for each query are different. It would look like it's not possible to define multiple aggregations over the data delivered by the socket source unless we can guarantee that the TCP backend server delivers exactly the same data to each open connection.

这篇关于在Spark结构化流中执行单独的流查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆