星火SQL /蜂巢查询永远注意到随着加入 [英] Spark SQL/Hive Query Takes Forever With Join

查看:318
本文介绍了星火SQL /蜂巢查询永远注意到随着加入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我做的事情应该是简单,但显然它不是在星火SQL。

如果我运行在MySQL中下面的查询,查询完成在几分之一秒:

  SELECT ua.address_id
从用户u
内部联接user_address UA上ua.address_id = u.user_address_id
WHERE u.user_id = 123;

但是,运行在HiveContext下星火相同的查询(1.5.1)需要超过13秒。增加更多的加入使得查询运行了很很长时间(超过10分钟)。我不知道我在做什么错在这里,我怎么能加快速度。

表是装载到蜂巢语境临时tables.This MySQL表在单个实例运行,与远程计算机上的数据库。


  • 的用户表中有大约4.8万行。

  • user_address表有35万行。

中的表有外键的字段,但没有明确的FK关系在数据库中定义。我使用InnoDB的。

在星火的执行计划:

计划:


  

扫描
  JDBCRelation(JDBC的:mysql:// 。用户[Lorg.apache.spark.Partition; @ 596f5dfc,
  {用户=
,密码= ,URL =的jdbc:mysql的:// ,DBTABLE =用户})
  [ADDRESS_ID#0L,user_address_id#27L]


  
  

过滤器(USER_ID#0L = 123)扫描
  JDBCRelation(JDBC的:mysql:// .user_address,
  [Lorg.apache.spark.Partition; @ 2ce558f3,{用户=
,密码=
  URL =的jdbc:mysql的://
,DBTABLE = user_address})[ADDRESS_ID#52L]


  
  

ConvertToUnsafe ConvertToUnsafe


  
  

TungstenExchange hashpartitioning(ADDRESS_ID#52L)TungstenExchange
  hashpartitioning(user_address_id#27L)TungstenSort [ADDRESS_ID#52L
  ASC]假,0 ​​TungstenSort [user_address_id#27L ASC]假,0 ​​


  
  

SortMergeJoin [user_address_id#27L],[ADDRESS_ID#52L]


  
  

物理== ==计划TungstenProject [ADDRESS_ID#0L]



解决方案

在执行是非常低效的所有类型的查询第一。至于现在(火花1.5.0 *)执行加入这个样子,这两个表已被打乱/散列分区每次执行查询。它不应该在用户的情况下,问题表,其中 USER_ID = 123 predicate是最有可能被推-down但仍需要对 user_address 全面洗牌。

此外,如果表只有注册并没有缓存,那么这个查询的每个执行会从MySQL取整 user_address 表的火花。


  

我不知道我在做什么错在这里,我怎么能加快速度。


为什么要使用火花应用,但单台机器的设置,小数据和查询的类型表明,Spark是不适合在这里它是不完全清楚。

如果应用程序逻辑需要一个单一的记录访问,然后星火SQL不会表现良好一般来说。它是专为分析查询不作为OLTP数据库替代品。

如果一个表/数据帧小得多,你可以尝试广播。

进口org.apache.spark.sql.DataFrame
进口org.apache.spark.sql.functions.broadcastVAL用户:数据帧=?
VAL user_address:数据帧=?VAL userFiltered = user.where(???)user_addresses.join(
  广播(userFiltered),$ADDRESS_ID=== $user_address_id)


*这应该星火1.6.0与 SPARK-11410 更改哪些应使持续的表分区。

So I'm doing something that should be simple, but apparently it's not in Spark SQL.

If I run the following query in MySQL, the query finishes in a fraction of a second:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.

The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.

  • user table has about 4.8 Million rows.
  • user_address table has 350,000 rows.

The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.

The execution plan in Spark:

Plan:

Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]

Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

ConvertToUnsafe ConvertToUnsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

SortMergeJoin [user_address_id#27L], [address_id#52L]

== Physical Plan == TungstenProject [address_id#0L]

解决方案

First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of users table where user_id = 123 predicate is most likely pushed-down but still requires full shuffle on user_address.

Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole user_address table from MySQL to Spark.

I'm not sure what I'm doing wrong here and how I can speed things up.

It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.

Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.

If a single table / data frame is much smaller you could try broadcasting.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")


* This should change in Spark 1.6.0 with SPARK-11410 which should enable persistent table partitioning.

这篇关于星火SQL /蜂巢查询永远注意到随着加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆