星火SQL /蜂巢查询永远注意到随着加入 [英] Spark SQL/Hive Query Takes Forever With Join
问题描述
所以我做的事情应该是简单,但显然它不是在星火SQL。
如果我运行在MySQL中下面的查询,查询完成在几分之一秒:
SELECT ua.address_id
从用户u
内部联接user_address UA上ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,运行在HiveContext下星火相同的查询(1.5.1)需要超过13秒。增加更多的加入使得查询运行了很很长时间(超过10分钟)。我不知道我在做什么错在这里,我怎么能加快速度。
表是装载到蜂巢语境临时tables.This MySQL表在单个实例运行,与远程计算机上的数据库。
- 的用户表中有大约4.8万行。
- user_address表有35万行。
中的表有外键的字段,但没有明确的FK关系在数据库中定义。我使用InnoDB的。
在星火的执行计划:
计划:
扫描
JDBCRelation(JDBC的:mysql:// 。用户[Lorg.apache.spark.Partition; @ 596f5dfc,
{用户= 的,密码= ,URL =的jdbc:mysql的:// 的,DBTABLE =用户})
[ADDRESS_ID#0L,user_address_id#27L]
过滤器(USER_ID#0L = 123)扫描
JDBCRelation(JDBC的:mysql:// .user_address,
[Lorg.apache.spark.Partition; @ 2ce558f3,{用户= 的,密码=
URL =的jdbc:mysql的:// 的,DBTABLE = user_address})[ADDRESS_ID#52L]
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(ADDRESS_ID#52L)TungstenExchange
hashpartitioning(user_address_id#27L)TungstenSort [ADDRESS_ID#52L
ASC]假,0 TungstenSort [user_address_id#27L ASC]假,0
SortMergeJoin [user_address_id#27L],[ADDRESS_ID#52L]
物理== ==计划TungstenProject [ADDRESS_ID#0L]
块引用>解决方案在执行是非常低效的所有类型的查询第一。至于现在(火花1.5.0 *)执行加入这个样子,这两个表已被打乱/散列分区每次执行查询。它不应该在
用户的情况下,问题
表,其中USER_ID = 123
predicate是最有可能被推-down但仍需要对user_address
全面洗牌。此外,如果表只有注册并没有缓存,那么这个查询的每个执行会从MySQL取整
user_address
表的火花。
我不知道我在做什么错在这里,我怎么能加快速度。
块引用>为什么要使用火花应用,但单台机器的设置,小数据和查询的类型表明,Spark是不适合在这里它是不完全清楚。
如果应用程序逻辑需要一个单一的记录访问,然后星火SQL不会表现良好一般来说。它是专为分析查询不作为OLTP数据库替代品。
如果一个表/数据帧小得多,你可以尝试广播。
进口org.apache.spark.sql.DataFrame
进口org.apache.spark.sql.functions.broadcastVAL用户:数据帧=?
VAL user_address:数据帧=?VAL userFiltered = user.where(???)user_addresses.join(
广播(userFiltered),$ADDRESS_ID=== $user_address_id)*这应该星火1.6.0与 SPARK-11410 更改哪些应使持续的表分区。
So I'm doing something that should be simple, but apparently it's not in Spark SQL.
If I run the following query in MySQL, the query finishes in a fraction of a second:
SELECT ua.address_id FROM user u inner join user_address ua on ua.address_id = u.user_address_id WHERE u.user_id = 123;
However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.
The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.
- user table has about 4.8 Million rows.
- user_address table has 350,000 rows.
The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.
The execution plan in Spark:
Plan:
Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]
Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
== Physical Plan == TungstenProject [address_id#0L]
解决方案First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of
users
table whereuser_id = 123
predicate is most likely pushed-down but still requires full shuffle onuser_address
.Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole
user_address
table from MySQL to Spark.I'm not sure what I'm doing wrong here and how I can speed things up.
It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.
Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.
If a single table / data frame is much smaller you could try broadcasting.
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.broadcast val user: DataFrame = ??? val user_address: DataFrame = ??? val userFiltered = user.where(???) user_addresses.join( broadcast(userFiltered), $"address_id" === $"user_address_id")
* This should change in Spark 1.6.0 with SPARK-11410 which should enable persistent table partitioning.
这篇关于星火SQL /蜂巢查询永远注意到随着加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!