提高火花SQL并行 [英] Improve parallelism in spark sql

查看:259
本文介绍了提高火花SQL并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有低于code。我使用pyspark 1.2.1与Python 2.7(CPython的)

I have the below code. I am using pyspark 1.2.1 with python 2.7 (cpython)

for colname in shuffle_columns:
    colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
    # zip_with_random_index is expensive
    colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
    (hive_context.applySchema(colwidx, a_schema)
        .registerTempTable(a_name))

这个code的事情是,它只是在一个时间上的一列运行。我在我的群,我可以在很多列在一次操作足够的节点。 有没有办法在火花做到这一点如果我用什么线程或类似的 - 我可以揭开序幕的倍数 registerTempTable (和相关收集操作)并行呀?

The thing about this code is that it only operates on one column at a time. I have enough nodes in my cluster that I could be operating on many columns at once. Is there a way to do this in spark? What if I used threads or the like - could I kick off multiple of the registerTempTable (and associated collection operations) in parallel that way?

推荐答案

不幸的是,下面不能很好地工作。它可以在所有的个人迭代的执行意识。不幸的是,随后的 hive_context 对象调用失败,因为一个空指针异常。

Unfortunately, the below does not work well. It works in the sense that all of the individual iterations execute. Unfortunately, subsequent calls to the hive_context object fail due to a nullpointer exception.

这是可能的 concurrent.futures

from concurrent import futures

def make_col_temptable(colname):
    colrdd = hive_context.sql('select %s from %s' % (colname, temp_table))
    # zip_with_random_index is expensive
    colwidx = zip_with_random_index(colrdd).map(merge_index_on_row)
    (hive_context.applySchema(colwidx, a_schema)
        .registerTempTable(a_name))

with futures.ThreadPoolExecutor(max_workers=20) as executor:
    futures.wait([executor.submit(make_col_temptable, colname) for colname in shuffle_columns])

这篇关于提高火花SQL并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆