提高火花SQL并行 [英] Improve parallelism in spark sql
问题描述
我有低于code。我使用pyspark 1.2.1与Python 2.7(CPython的)
I have the below code. I am using pyspark 1.2.1 with python 2.7 (cpython)
for colname in shuffle_columns:
colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
# zip_with_random_index is expensive
colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
(hive_context.applySchema(colwidx, a_schema)
.registerTempTable(a_name))
这个code的事情是,它只是在一个时间上的一列运行。我在我的群,我可以在很多列在一次操作足够的节点。 有没有办法在火花做到这一点如果我用什么线程或类似的 - 我可以揭开序幕的倍数 registerTempTable
(和相关收集操作)并行呀?
The thing about this code is that it only operates on one column at a time. I have enough nodes in my cluster that I could be operating on many columns at once. Is there a way to do this in spark? What if I used threads or the like - could I kick off multiple of the registerTempTable
(and associated collection operations) in parallel that way?
推荐答案
不幸的是,下面不能很好地工作。它可以在所有的个人迭代的执行意识。不幸的是,随后的 hive_context
对象调用失败,因为一个空指针异常。
Unfortunately, the below does not work well. It works in the sense that all of the individual iterations execute. Unfortunately, subsequent calls to the hive_context
object fail due to a nullpointer exception.
这是可能的 concurrent.futures
:
from concurrent import futures
def make_col_temptable(colname):
colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
# zip_with_random_index is expensive
colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
(hive_context.applySchema(colwidx, a_schema)
.registerTempTable(a_name))
with futures.ThreadPoolExecutor(max_workers=20) as executor:
futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])
这篇关于提高火花SQL并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!