在Spark Streaming中使用Spark SQL [英] Using Spark SQL with Spark Streaming
问题描述
关于Spark结构化流,尝试使SparkSql有意义.
Spark Session从kafka主题读取事件,将数据聚合到按不同列名分组的计数,并将其打印到控制台.
原始输入数据的结构如下:
Trying make sense of SparkSql with respect to Spark Structured Streaming.
Spark Session reads events from a kafka topic, aggregates data to counts grouped by different column names and prints it to the console.
Raw input data structured like this:
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|. sourceTypes| Guid| platform|datacenter|pagesId| eventTimestamp| Id1234| Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................| ANDROID| dev| aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| ANDROID| dev| ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................| WEBOS| dev| aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+
sourceTypes
,platform
,datacenter
和pageId
的计数是必需的.
Counts are required for sourceTypes
, platform
, datacenter
and pageId
.
使用以下代码聚合数据:
Aggregating data with following code:
Dataset<Row> query = sourceDataset
.withWatermark("eventTimestamp", watermarkInterval)
.select(
col("eventTimestamp"),
col("datacenter"),
col("platform"),
col("pageId")
)
.groupBy(
window(col("eventTimestamp"), windowInterval),
col("datacenter"),
col("platform"),
col("pageId")
)
.agg(
max(col("eventTimestamp"))
);
此处watermarkInterval=45seconds
,windowInterval=15seconds
& triggerInterval=15seconds
.
使用以下内容消费新的聚合数据集:
Consuming the new aggregated dataset with:
aggregatedDataset
.writeStream()
.outputMode(OutputMode.Append())
.format("console")
.trigger(Trigger.ProcessingTime(triggerInterval))
.start();
有两个问题:
-
输出数据未打印每个
groupBy
的计数,例如平台,pageId等.
Output data is not printing the counts for each
groupBy
like platform, pageId etc.
如何以json格式打印输出?我尝试在控制台上输出数据时使用select(to_json(struct("*")).as("value"))
,但是它不起作用.
How to print the output in json format? I tried using select(to_json(struct("*")).as("value"))
while outputting data on console but it doesn't work.
推荐答案
您可以使用以下代码段解决问题:
You can solve your problem using the following code snippet:
.outputMode("complete")
这篇关于在Spark Streaming中使用Spark SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!