Spark SQL +窗口+流式传输问题-在运行Spark流式传输时,Spark SQL查询执行时间较长 [英] Spark SQL + Window + Streaming Issue - Spark SQL query is taking long to execute when running with spark streaming

查看:82
本文介绍了Spark SQL +窗口+流式传输问题-在运行Spark流式传输时,Spark SQL查询执行时间较长的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们期待使用Spark Streaming(带有水槽)和带有窗口的Spark SQL来实现用例,从而使我们能够对一组数据执行CEP计算(有关如何捕获和使用数据的信息,请参见下文).这个想法是使用SQL来执行一些符合某些条件的动作..基于每个传入事件批处理执行查询似乎非常慢(随着过程的进行).

这里的意思是说我已配置了600秒的窗口大小和20秒的批处理间隔.(以每2秒1个输入的速度抽取数据),所以说在10分钟之后输入的输入将保持不变的时间,应该花费相同的时间来执行SQL查询.

但是,经过一段时间后,它开始花费更多时间并逐渐增加,因此对于大约300条记录,select count(*)查询最初花费1秒钟,然后在15分钟后开始花费2到3秒并逐渐增加./p>

如果有人可以提出更好的方法来实现此用例,将不胜感激.下面给出了我们为了实现这一目标而执行的步骤-

 //创建Spark和流上下文JavaSparkContext sc =新的JavaSparkContext(sparkConf);JavaStreamingContext ssc =新的JavaStreamingContext(sc,20);JavaReceiverInputDStream< SparkFlumeEvent>flumeStream;= FlumeUtils.createStream(ssc,"localhost",55555);//在窗口上添加事件JavaDStream< SparkFlumeEvent>windowDStream =flumeStream.window(WINDOW_LENGTH,SLIDE_INTERVAL);//sc是现有的JavaSparkContext.SQLContext sqlContext =新的org.apache.spark.sql.SQLContext(sc);windowDStream.foreachRDD(新函数< JavaRDD< SparkFlumeEvent> ;, Void>(){公共无效呼叫(JavaRDD< SparkFlumeEvent> eventsData)抛出异常{长t2 = System.currentTimeMillis();lTempTime = System.currentTimeMillis();JavaRDD< AVEventPInt>inputRDD1 = eventsData.map(new Function< SparkFlumeEvent,AVEventPInt>(){@Override公用AVEventPInt调用(SparkFlumeEvent eventsData)引发异常{...返还}});DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1,AVEventPInt.class);schemaevents.registerTempTable("avevents" + lTempTime);sqlContext.cacheTable("avevents" + lTempTime);//在这里查询所花费的时间逐渐增加长t4 = System.currentTimeMillis();长lTotalEvent = sqlContext.sql("SELECT ave(*)FROM avevents" + lTempTime).first().getLong(0);System.out.println(事件总数的时间:" +(System.currentTimeMillis()-t4)/1000L +秒\ n");sqlContext.dropTempTable("avevents" + lTempTime);sqlContext.clearCache();返回null;}}); 

解决方案

例如,假设我们要通过日志级别确定跨时间的事件计数.在SQL中,我们将以以下形式发出查询:

  SELECT级别,来自ambari GROUP BY级别的COUNT(1) 

但是使用Scala Data Frame API,您可以发出以下查询:

  ambari.groupBy("level").count() 

这时,可以使用非常接近本机SQL的内容进行查询,例如:

  sqlContext.sql(选择级别,从ambari按级别分组的COUNT(1)") 

这将返回与DataFrame API中返回的数据结构相同的数据结构.返回的数据结构本身就是一个数据帧.

此时,尚未执行任何操作:将数据帧上的操作映射到RDD上的适当操作(在这种情况下

  RDD.groupBy(...).aggregateByKey(...)) 

我们可以通过在结果上说collect()来强制执行,以将执行结果带入驱动程序内存中.

We are looking forward to implement a use case using Spark Streaming (with flume) and Spark SQL with windowing that allows us to perform CEP calculation over a set of data.(See below for how the data is captured and used). The idea is to use SQL to perform some action which matches certain conditions. . Executing the query based on each incoming event batch seems to be very slow (As it progresses).

Here slow means say I have configured window size of 600 Sec and Batch interval of 20 Sec. (pumping the data with speed of 1 input per 2 second) So say at the time after 10 min where incoming input will be constant it should take same time to execute the SQL query.

But here after the time elapses it starts taking more time and increases gradually so for about 300 records select count(*) query takes initially 1 sec and later on after 15 minute it starts taking 2 to 3 sec and increases gradually.

Would appreciate if anyone can suggest a better approach to implementing this use case. Given below are the steps we perform in order to achieve this -

    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);

    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    {

        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        {
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();

            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            {
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                {
                ...
                    return avevent;
                }
            });
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);

            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");

            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();

            return null;

        }
    });

解决方案

For example, suppose we want to determine by log level, the count of events across time. In SQL, we would have issued a query of the form:

SELECT level, COUNT(1) from ambari GROUP BY level

But using the Scala Data Frame API, you could issue the following query:

ambari.groupBy("level").count()

At which point, something very close to native SQL can be used for querying like:

sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

This returns the same data structure as returned in the DataFrame API. The data structure returned is itself a data frame.

At this point, no execution has occurred: The operations on data frames get mapped to appropriate operations on the RDD (in this case

RDD.groupBy(...).aggregateByKey(...))

We can force execution by doing say collect() on the results to bring the results of the execution into driver memory.

这篇关于Spark SQL +窗口+流式传输问题-在运行Spark流式传输时,Spark SQL查询执行时间较长的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆