为什么我的简单Spark应用程序工作得如此缓 [英] Why my simple Spark application works so slow?

查看:237
本文介绍了为什么我的简单Spark应用程序工作得如此缓的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark API通过mllib的FP增长生成的频繁项目集 count 。我的火花是版本1.5.1。以下是我的代码:

 #!/ usr / bin / python $ b $ p from pyspark.mllib.fpm import FPGrowth 
从pyspark导入SparkContext,SparkConf
from pyspark import HiveContext
import os
os.environ ['PYSPARK_PYTHON'] ='/ usr / bin / python'
appName =FP_growth
sc = SparkContext()
sql_context = HiveContext(sc)

def read_spu(prod):#prod_code):
sql =
选择
t.orderno_nosplit,
t.prod_code,
t.item_code,
sum(t.item_qty)as item_qty
from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
其中t.prod_code ='%s'
组由t.prod_code,t.orderno_nosplit,t.item_code%prod
spu_result = sql_context.sql(sql)
返回spu_result.cache()

if __name__ =='__main__':
spu = read_spu('6727780')
conf = 0.7
trans = spu。 rdd.repartition(100).map(lambda x:(x [b] [b] [b] [b] [b] [b] [b] [b] [b] [b] .freqItemsets()。count()
print'freq_count:',freq_count
sc.stop()

输入数据是从Hadoop读取的,数据不是很大,只有大约20000行。但是,该脚本在 .count 阶段工作非常缓慢。我不知道为什么。从表现来看,这似乎是因为数据倾斜。但是输出数据并不大(每个任务只有大约100KB)。

群集有8个320个核心的节点和1.56T的总内存(不只有一个用户)。我的spark-submit脚本是 spark-submit --master yarn-cluster --executor-memory 30g --num-executors 20 --executor-cores 5 FP_growth.py



附件是运行时的性能屏幕打印:

使用的资源



主动阶段



任务

解决方案

repartition(100)看起来不是一个好主意,您可以检查哪些阶段花费的时间最多。由于只有20000条记录。遣返应将其分为每个分区中的200条记录。

如果数据量不大,则根本不需要遣返。或尝试使用40-60个分区(2或3个)*无执行者。


I am trying to count the frequent item sets generated by FP growth of mllib using Spark API. My Spark is version 1.5.1. The following is my code:

#!/usr/bin/python 
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext,SparkConf
from pyspark import HiveContext
import os
os.environ['PYSPARK_PYTHON']='/usr/bin/python'
appName = "FP_growth"
sc = SparkContext()
sql_context = HiveContext(sc)

def read_spu(prod):#prod_code):
    sql = """
        select 
        t.orderno_nosplit, 
        t.prod_code, 
        t.item_code, 
        sum(t.item_qty) as item_qty
        from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
        where t.prod_code='%s'
        group by t.prod_code, t.orderno_nosplit, t.item_code  """%prod
    spu_result = sql_context.sql(sql)
    return spu_result.cache()

if __name__ == '__main__':
    spu=read_spu('6727780')  
    conf=0.7             
    trans=spu.rdd.repartition(100).map(lambda x: (x[0],x[2])).groupByKey().mapValues(list).values().cache()
    model = FPGrowth.train(trans, 0.01, 100) 
    freq_count = model.freqItemsets().count()
    print 'freq_count:',freq_count  
    sc.stop()

The input data are read from Hadoop, and the data is not very large, only about 20000 rows. However, the script works very very slow in the stage of .count. I don't know why. From the performance, it seems it is because of data skew. But the output data is not large(only about 100KB per task ).

The cluster has 8 nodes of 320 cores and 1.56 T total memory (not only one user). My spark-submit script is spark-submit --master yarn-cluster --executor-memory 30g --num-executors 20 --executor-cores 5 FP_growth.py

The attachments are the screen prints of performance when running:

Resource used

Active Stages

Tasks

解决方案

repartition(100) doesn't look like a good idea, you can check which stages are taking the most time. Since there are only 20000 records. repatriation should split them into 200 records in each partition.

If the data size is not huge, you do not need to repatriation at all. Or try with 40-60 partitions(2 or 3 ) * no of executors.

这篇关于为什么我的简单Spark应用程序工作得如此缓的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆