Spark SQL + Window + Streaming 问题 - 使用 Spark 流运行时,Spark SQL 查询需要很长时间才能执行 [英] Spark SQL + Window + Streaming Issue - Spark SQL query is taking long to execute when running with spark streaming

查看:35
本文介绍了Spark SQL + Window + Streaming 问题 - 使用 Spark 流运行时,Spark SQL 查询需要很长时间才能执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们期待使用 Spark Streaming(带水槽)和带窗口的 Spark SQL 实现一个用例,允许我们对一组数据执行 CEP 计算.(有关如何捕获和使用数据,请参见下文).这个想法是使用 SQL 来执行一些匹配特定条件的操作..根据每个传入的事件批次执行查询似乎很慢(随着它的进行).

We are looking forward to implement a use case using Spark Streaming (with flume) and Spark SQL with windowing that allows us to perform CEP calculation over a set of data.(See below for how the data is captured and used). The idea is to use SQL to perform some action which matches certain conditions. . Executing the query based on each incoming event batch seems to be very slow (As it progresses).

这里的慢意味着说我配置了 600 秒的窗口大小和 20 秒的批处理间隔.(以每 2 秒 1 个输入的速度抽取数据)所以说在 10 分钟后传入的输入将保持不变,执行 SQL 查询应该花费相同的时间.

Here slow means say I have configured window size of 600 Sec and Batch interval of 20 Sec. (pumping the data with speed of 1 input per 2 second) So say at the time after 10 min where incoming input will be constant it should take same time to execute the SQL query.

但是在这里,随着时间的流逝,它开始花费更多时间并逐渐增加,因此对于大约 300 条记录,select count(*) 查询最初需要 1 秒,然后在 15 分钟后开始需要 2 到 3 秒并逐渐增加.

But here after the time elapses it starts taking more time and increases gradually so for about 300 records select count(*) query takes initially 1 sec and later on after 15 minute it starts taking 2 to 3 sec and increases gradually.

如果有人能提出更好的方法来实现这个用例,我们将不胜感激.以下是我们为实现这一目标而执行的步骤 -

Would appreciate if anyone can suggest a better approach to implementing this use case. Given below are the steps we perform in order to achieve this -

    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);

    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    {

        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        {
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();

            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            {
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                {
                ...
                    return avevent;
                }
            });
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);

            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");

            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();

            return null;

        }
    });

推荐答案

例如,假设我们要通过日志级别确定跨时间的事件计数.在 SQL 中,我们会发出以下形式的查询:

For example, suppose we want to determine by log level, the count of events across time. In SQL, we would have issued a query of the form:

SELECT level, COUNT(1) from ambari GROUP BY level

但是使用 Scala Data Frame API,您可以发出以下查询:

But using the Scala Data Frame API, you could issue the following query:

ambari.groupBy("level").count()

此时,可以使用非常接近本机 SQL 的内容进行查询,例如:

At which point, something very close to native SQL can be used for querying like:

sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

这将返回与 DataFrame API 中返回的数据结构相同的数据结构.返回的数据结构本身就是一个数据框.

This returns the same data structure as returned in the DataFrame API. The data structure returned is itself a data frame.

此时,没有执行发生:数据帧上的操作被映射到 RDD 上的适当操作(在这种情况下

At this point, no execution has occurred: The operations on data frames get mapped to appropriate operations on the RDD (in this case

RDD.groupBy(...).aggregateByKey(...))

我们可以通过对结果执行 say collect() 来强制执行,以将执行结果带入驱动程序内存.

We can force execution by doing say collect() on the results to bring the results of the execution into driver memory.

这篇关于Spark SQL + Window + Streaming 问题 - 使用 Spark 流运行时,Spark SQL 查询需要很长时间才能执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆