sparksql.sql.codegen没有提供任何改进 [英] sparksql.sql.codegen is not giving any improvement

查看:1233
本文介绍了sparksql.sql.codegen没有提供任何改进的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面的spark sql中执行查询。这些表的数据存储在配置单元表中的2个不同节点中。但是由于查询有点慢,我尝试在spark中查找一些选项,以便查询可以执行得更快。所以我发现我们可以配置 sparksql.sql.codegen spark.sql.inMemoryColumnarStorage.compressed 为true,而不是默认为false。

但是我没有任何改进,查询的这两个选项为true会花费4,1分钟执行。有了这个虚假选项也需要4,1分钟。



你明白为什么这个选项不起作用吗?

  query = hiveContext.sql(选择
l_returnflag,
l_linestatus,
总和(l_quantity)为sum_qty,
sum(l_extendedprice)为sum_base_price,
sum(l_extendedprice * (1_discount))作为sum_disc_price,
sum(l_extendedprice *(1-l_discount)*(1 + l_tax))作为sum_charge,
avg(l_quantity)作为avg_qty,
avg(l_extendedprice )as avg_price,
avg(l_discount)as avg_disc,
count(*)as count_order
from
lineitem
where
l_shipdate< ='1998 -09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus);

query.collect();


解决方案


  • spark.sql.codegen.wholeStage 对于spark 2.0是默认启用的。并且它将完成火花催化剂方面的所有内部优化。


  • spark.sql。 codegen (这是Spark 1.3+中的功能)默认为 false 。即使你真的如此,你也可以用 DF.explain / debug




然而,请。重新访问Spark 2+中解释的方法如下。如果您使用较低版本的火花,即1.3或1.4+,则相同的DataFrame方法是有效的我们必须使用hiveContext。





  • 根据我的经验,Dataset [Row ]以上查询的DataFrame方法比plain hive查询快一些。



请尝试下面的伪代码。



创建一个没有任何聚合,分组的数据框,就像这样。

  import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql


// warehouseLocation指向托管数据库和表的默认位置
val warehouseLocation =file:$ {system:user.dir} / spark-warehouse

val spark = SparkSession
.builder()
.appName(Spark Hive Aggregations)
.config(spark.sql.warehouse.dir,warehouseLocation)
.enableHiveSupport()
.getOrCreate()


val df:DataFrame = sql((select l_returnflag,l_linestatus,l_quantity,l_extendedprice,l_quantity,l_extendedprice ,l_quantity,l_extendedprice,l_discount from
lineitem where l_shipdate< ='1998-09-16);

//可以使用spark udf或when(cond,evaluation),而不是直接表达式
val df1 = df.withColum n(sum_disc_price,df.col(l_extendedprice)*(1-df.col(l_discount))
.withColumn(sum_charge,df.col(l_extendedprice)*(1 + df.col(l_tax))

// NOW SUM,AVG和group by by dataframe
val groupeddf = df1.groupBy(
df1.col(returnflag )
,df1.col(l_linestatus)
.agg(
avg(df1.col(l_quantity)),
,avg(df1.col(l_extendedprice ))
,avg(df1.col(l_discount))
,sum(df1.col(l_quantity))
,sum(df1.col(l_extendedprice)) )
,sum(df1.col(sum_disc_price))
,sum(df1.col(sum_charge))
,count(df1.col(l_linestatus)。as (cnt)
)//结束agg
)//结束分组
//按数据框排序
.orderBy(l_returnflag))
.sort(l_linestatus)
val finalDF = groupeddf.select(l_returnflag,l_linestatus,.............等);




  • 另外,执行程序内存,执行程序/内核数量等参数需要考虑找到确切的问题


I am executing a query in spark sql like below. The data of the tables is stored in 2 different nodes in hive tables.

But because the query is a bit slow I try to find some options in spark so the query can execute faster. So I found that we can configure sparksql.sql.codegen and spark.sql.inMemoryColumnarStorage.compressed to true instead of the default false.

But I'm not having any improvement, the query with this two options at true is taking 4,1 minutes to execute. With this options at false is taking also 4,1 minutes.

Do you understand why this options arent working?

   query = hiveContext.sql("""select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus""");

query.collect();

解决方案

  • spark.sql.codegen.wholeStage is enabled by default for spark 2.0. and it will do all the internal optimization possible from the spark catalist side .

  • spark.sql.codegen (which is feature in Spark 1.3+) is by default false. Even if you make as true, you can cross-check with DF.explain / debug

However, pls. Re-visit the approach which was explained in spark 2+ as below.

If you are using lower version of spark i.e 1.3 or 1.4+ the same DataFrame approach is valid except we have to use with hiveContext.


  • With my experience, Dataset[Row] aka DataFrame approach of above query is bit faster than plain hive query.

please try the below pseudo code.

create a data frame with out any aggregation, group, order by like this.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql


// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Aggregations")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()


val df : DataFrame = sql(("""select l_returnflag, l_linestatus,l_quantity,l_extendedprice,l_quantity ,l_extendedprice,l_quantity, l_extendedprice, l_discount from
        lineitem where l_shipdate <= '1998-09-16""");

// can use spark udf or when(cond, evaluation), instead of direct expression
 val df1 =  df.withColumn("sum_disc_price", df.col("l_extendedprice") * (1 - df.col("l_discount"))
          .withColumn("sum_charge", df.col("l_extendedprice") * (1 + df.col("l_tax"))

//NOW SUM, AVG and group by  on dataframe
val groupeddf = df1.groupBy(
  df1.col("returnflag")
, df1.col("l_linestatus")
.agg(
      avg(df1.col("l_quantity")),
    , avg(df1.col("l_extendedprice"))
    , avg(df1.col("l_discount"))
    , sum(df1.col("l_quantity"))
    , sum(df1.col("l_extendedprice"))
    , sum(df1.col("sum_disc_price"))
    , sum(df1.col("sum_charge"))
    , count(df1.col("l_linestatus").as("cnt")
    ) //end agg
    ) //end group by 
//order by on dataframe  
.orderBy("l_returnflag"))
.sort("l_linestatus")
val finalDF = groupeddf.select("l_returnflag","l_linestatus",............. etc);

  • Also , parameters like executor memory, number of executors/ cores etc also needs to be considered to find exact issue

这篇关于sparksql.sql.codegen没有提供任何改进的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆