如何改进我的spark应用程序的reducebykey部分? [英] How can I improve the reducebykey part of my spark app?

查看:1609
本文介绍了如何改进我的spark应用程序的reducebykey部分?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有64个火花塞。我有超过8000万行的数据,在我的cassandra集群中为4.2 GB。我现在需要82秒来处理这些数据。我想把这减少到8秒。任何想法呢?这是可能吗?非常感谢。



这是我想要改善的spark应用程式的一部分:

  axes = sqlContext.read.format(org.apache.spark.sql.cassandra)\ 
.options(table =axes,keyspace = source,numPartitions =192)。 load()\
.repartition(64 * 3)\
.reduceByKey(lambda x,y:x + y,52)\
.map .article,[Row(article = x.article,at = x.at,comments = x.comments,likes = x.likes,reads = x.reads,shares = x.shares)]))\
.map(lambda x:(x [0],sorted(x [1],key = lambda y:y.at,reverse = False))\
.filter(lambda x:len [1])> = 2)\
.map(lambda x:x [1] [ - 1])$ ​​b $ b

编辑



这是我目前正在运行的代码是一个实验对不起混乱。上述问题与此代码相关。

  axes = sqlContext.read.format(org.apache.spark.sql.cassandra ).options(table =axes,keyspace = source).load()。repartition(64 * 3)\ 
.map(lambda x:(x.article,文章,at = x.at,comments = x.comments,likes = x.likes,reads = x.reads,shares = x.shares)]))reduceByKey(lambda x,y:x + y)\
.map(lambda x:(x [0],sorted(x [1],key = lambda y:y.at,reverse = False))\
.filter (x [1])> = 2)\
.map(lambda x:x [1] [ - 1])$ ​​b $ b

感谢

解决方案

问题



为什么此代码无法正常工作,假设未修改的Spark分发



分步:


  1. 这两行应该创建一个Spark DataFrame 。到目前为止很好:

      sqlContext.read.format(org.apache.spark.sql.cassandra)
    .options(table =axes,keyspace = source,numPartitions =192)load()

    唯一可能的问题是 numPartitions ,据我所记的不是一个公认的选项。


  2. 这是一个垃圾代码。

      .repartition(64 * 3)


  3. 此时,您切换到RDD。由于 Row 实际上是 tuple reduceByKey 的子类只能在成对RDDs 上工作,每个元素都是大小为2的元组。我不知道为什么选择52个分区。

      .reduceByKey(lambda x,y:x + y,52)


  4. 由于 reduceByKey 总会产生一个大小为2的元组的RDD 't work

      .map(lambda x:(x.article,[Row(article = x.article,at = x .at,comments = x.comments,likes = x.likes,reads = x.reads,shares = x.shares)]))\ 

    特别 x 不能有文章注释。此外这段代码

      [Row(article = x.article,at = x.at,comments = x.comments, like = x.likes,reads = x.reads,shares = x.shares)] 

    创建 list (大小为1)(见下文)。



    以下部分

      Row(article = x.article,。 ..)

    如果有一些过时的列,这些应该在数据转换为RDD之前被过滤掉,以避免过多的流量并减少内存使用。如果没有过时的列,没有理由通过创建新对象给Python GC施加更多的压力。


  5. 由于 x [1] 只有一个元素排序, p>

      .map(lambda x:(x [0],sorted(x [1],key = lambda y:y.at,并且这个过滤器应该总是返回一个空的RDD 

    / p>

      .filter(lambda x:len(x [1])> = 2)\ 


  6. 这不会执行任何有用的操作:

      .map(lambda x:x [1] [ -  1])$ ​​b $ b  


摘要



如果您使用此代码的某个版本问题中显示的顺序混合起来并从第4点映射:

  .map(lambda x: ,[Row(....)]))



  .reduceByKey(lambda x,y:x + y,52)

如果是这样,您实际上使用 .reduceByKey 执行 groupByKey ,这相当于 groupByKey 所有的问题(Python)或更低的效率(Scala)。此外,它还可以减少高度可疑的分区数量。



如果是真的没有好的理由将数据移出JVM DataFrame - > RDD 转换)与相应的序列化 - 即使存在,也可以通过 max 而不是按组进行实际减少来轻松解决。

 从操作符import attrgetter 

(sqlContext.read.format(...)。options(...)。load b .select(...)#只有你实际需要的列
.keyBy(attrgetter(article))
.reduceByKey(lambda r1,r2:max(r1,r2,key = attrgetter (y))))

相关问题:




I have 64 spark cores. I have over 80 Million rows of data which amount to 4.2 GB in my cassandra cluster. I now need 82 seconds to process this data. I want this reduced to 8 seconds. Any thoughts on this? Is this even possible? Thanks.

This is the part of my spark app I want to improve:

axes = sqlContext.read.format("org.apache.spark.sql.cassandra")\
    .options(table="axes", keyspace=source, numPartitions="192").load()\
    .repartition(64*3)\
    .reduceByKey(lambda x,y:x+y,52)\
    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    .filter(lambda x:len(x[1])>=2) \
    .map(lambda x:x[1][-1])

Edit:

This is the code I am currently running the one posted above was an experiment sorry for the confusion. The question above relate to this code.

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(64*3) \
                    .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)])).reduceByKey(lambda x,y:x+y)\
                    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
                    .filter(lambda x:len(x[1])>=2) \
                    .map(lambda x:x[1][-1])

Thanks

解决方案

Issues:

(Why this code cannot work correctly assuming unmodified Spark distribution)

Step-by-step:

  1. These two lines should create a Spark DataFrame. So far so good:

    sqlContext.read.format("org.apache.spark.sql.cassandra")
      .options(table="axes", keyspace=source, numPartitions="192").load()
    

    The only possible concern is numPartitions which as far as I remember is not a recognized option.

  2. This is pretty much a junk code. Shuffling data around without doing any actual job is unlikely to get you anywhere.

    .repartition(64*3)
    

  3. At this point you switch to RDD. Since Row is actually a subclass of tuple and reduceByKey may work only on pairwise RDDs each element hast to be a tuple of size 2. I am not sure why you choose 52 partitions though.

    .reduceByKey(lambda x,y:x+y,52)
    

  4. Since reduceByKey always result in a RDD of tuples of size 2 following part simply shouldn't work

    .map(lambda x: (x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\
    

    In particular x cannot have attributres like article or comments. Moreover this piece of code

    [Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)] 
    

    Creates list of size 1 (see below).

    Following part

    Row(article=x.article, ...)
    

    smells fishy for one more reason. If there are some obsolete columns these should be filtered out before data is converted to RDD to avoid excessive traffic and reduce memory usage. If there are no obsolete columns there is no reason to put more pressure on Python GC by creating new objects.

  5. Since x[1] has only one element sorting it doesn't makes sense:

    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
    

  6. And this filter should always return an empty RDD

    .filter(lambda x:len(x[1])>=2) \
    

  7. And this doesn't perform any useful operations:

    .map(lambda x:x[1][-1])
    

Summary:

If you use some version of this code it most likely the order shown in the question is mixed up and map from the point 4:

.map(lambda x: (x.article,[Row(....)]))

precedes reduceByKey:

.reduceByKey(lambda x,y:x+y,52)

If thats the case you actually use .reduceByKey to perform groupByKey which is either equivalent to groupByKey with all its issues (Python) or less efficient (Scala). Moreover it would reduction in number of partitions highly suspicious.

If thats true there is no good reason to move data out of JVM (DataFrame -> RDD conversion) with corresponding serialization-deserialization, and even if there was, it can be easily solved by actual reduction with max not group-by-key.

from operator import attrgetter

(sqlContext.read.format(...).options(...).load()
  .select(...)  # Only the columns you actually need
  .keyBy(attrgetter("article"))
  .reduceByKey(lambda r1, r2: max(r1, r2, key=attrgetter("y"))))

Related questions:

这篇关于如何改进我的spark应用程序的reducebykey部分?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆