星火SQL +窗口+ Streming问题 - 星火SQL查询正在长期执行火花流运行时, [英] Spark SQL + Window + Streming Issue - Spark SQL query is taking long to execute when running with spark streaming

查看:244
本文介绍了星火SQL +窗口+ Streming问题 - 星火SQL查询正在长期执行火花流运行时,的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们期待实现使用星火流(带水槽)一个用例和Spark与窗口,使我们能够在一组数据进行CEP计算SQL。(为请参照下面的数据捕获和使用)。我们的想法是使用SQL执行其中一些符合某些条件下采取行动。 。执行基于每个传入事件批处理查询似乎非常慢(如它的进展)。

下面慢手段说我已经配置了600秒的窗口大小和20秒的间隔批。 (抽带每两每秒1输入速度的数据),因此,在10分钟后的时间在那里进来的投入将不断说它应该采取同样的时间来执行SQL查询。

但这里后时间的推移它开始花更多的时间并逐渐增大,因此对约300条记录SELECT COUNT(*)查询需要最初1秒和15分钟之后,后来它开始服用2〜3秒,并逐渐增大。

请问AP preciate如果任何人都可以提出一个更好的方法来实现这个用例。下面给出的步骤中,我们为了实现这个执行 -

  //创建火花和流上下文
    JavaSparkContext SC =新JavaSparkContext(sparkConf);
    JavaStreamingContext SSC =新JavaStreamingContext(SC,20);
    JavaReceiverInputDStream< SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(SSC,本地主机,55555);    //添加窗口上的事件
    JavaDStream< SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH,SLIDE_INTERVAL);    // SC是现有JavaSparkContext。
    SQLContext sqlContext =新org.apache.spark.sql.SQLContext(SC);    windowDStream.foreachRDD(新功能< JavaRDD< SparkFlumeEvent>中太虚>()
    {        公共无效调用(JavaRDD< SparkFlumeEvent> eventsData)
        抛出异常
        {
            长T2 = System.currentTimeMillis的();
            lTempTime = System.currentTimeMillis的();            JavaRDD< AVEventPInt> inputRDD1 = eventsData.map(新功能< SparkFlumeEvent,AVEventPInt>()
            {
                @覆盖
                公共AVEventPInt调用(SparkFlumeEvent eventsData)抛出异常
                {
                ...
                    返回avevent;
                }
            });
            数据帧schemaevents = sqlContext.createDataFrame(inputRDD1,AVEventPInt.class);
            schemaevents.registerTempTable(avevents+ lTempTime);
            sqlContext.cacheTable(avevents+ lTempTime);            //这里由查询所花费的时间也逐渐增多
            长T4 = System.currentTimeMillis的();
            龙lTotalEvent = sqlContext.sql(SELECT COUNT(*)FROM avevents+ lTempTime)。首先()getLong之(0)。
            的System.out.println(时间总事件数:+(System.currentTimeMillis的() - T4)/ 1000L +秒\\ n);            sqlContext.dropTempTable(avevents+ lTempTime);
            sqlContext.clearCache();            返回null;        }
    });


解决方案

例如,假设我们想通过日志级别,跨时间事件的计数来确定。在SQL中,我们会出具的形式的查询:

  SELECT水平,COUNT(1)从ambari GROUP BY水平

但是,使用Scala的数据帧API,你可以发出以下查询:

  ambari.groupBy(级别)。COUNT()

在这一点,东西非常接近原生SQL可用于查询,如:

  sqlContext.sql(SELECT水平,COUNT(1)ambari组按级别)

这将返回相同的数据结构数据框API中返回。返回的数据结构本身是一个数据帧

此时,没有执行发生:对数据帧的操作被映射到在RDD(适当的操作在这种情况下

  RDD.groupBy(...)。aggregateByKey(...))

我们可以通过做说收集()对结果带来的执行结果为驾驶员记忆强制执行。

We are looking forward to implement a use case using Spark Streaming (with flume) and Spark SQL with windowing that allows us to perform CEP calculation over a set of data.(See below for how the data is captured and used). The idea is to use SQL to perform some action which matches certain conditions. . Executing the query based on each incoming event batch seems to be very slow (As it progresses).

Here slow means say I have configured window size of 600 Sec and Batch interval of 20 Sec. (pumping the data with speed of 1 input per 2 second) So say at the time after 10 min where incoming input will be constant it should take same time to execute the SQL query.

But here after the time elapses it starts taking more time and increases gradually so for about 300 records select count(*) query takes initially 1 sec and later on after 15 minute it starts taking 2 to 3 sec and increases gradually.

Would appreciate if anyone can suggest a better approach to implementing this use case. Given below are the steps we perform in order to achieve this -

    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);

    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    {

        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        {
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();

            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            {
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                {
                ...
                    return avevent;
                }
            });
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);

            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");

            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();

            return null;

        }
    });

解决方案

For example, suppose we want to determine by log level, the count of events across time. In SQL, we would have issued a query of the form:

SELECT level, COUNT(1) from ambari GROUP BY level

But using the Scala Data Frame API, you could issue the following query:

ambari.groupBy("level").count()

At which point, something very close to native SQL can be used for querying like:

sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

This returns the same data structure as returned in the DataFrame API. The data structure returned is itself a data frame.

At this point, no execution has occurred: The operations on data frames get mapped to appropriate operations on the RDD (in this case

RDD.groupBy(...).aggregateByKey(...))

We can force execution by doing say collect() on the results to bring the results of the execution into driver memory.

这篇关于星火SQL +窗口+ Streming问题 - 星火SQL查询正在长期执行火花流运行时,的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆