提高火花应用程序的速度 [英] Improve speed of spark app

查看:254
本文介绍了提高火花应用程序的速度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的python-spark代码的一部分,它的一部分对我的需要运行太慢。
特别是这部分代码,我真的想提高它的速度,但不知道如何。

  sqlContext.read(6百万行数据行)现在需要大约1分钟才能完成。 .format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source).load()

我的spark应用程式的更多内容:

  article_ids = sqlContext.read.format org.apache.spark.sql.cassandra)。options(table =article_by_created_at,keyspace = source).load()。 2)

axes = sqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source).load()
speed_df = article_ids.join(axes,article_ids.article == axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares)\
.map(lambda x:(x.article,[x]))。reduceByKey(lambda x,y:x + y)\
。 [1],key = lambda y:y.at,reverse = False)))\
.filter(lambda x:len(x [1])> = 2)\
。 map(lambda x:x [1] [ - 1])\
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+如果x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
非常感谢您的建议。



编辑:



我也尝试增加并行性,但没有任何明显的效果:

  sqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source) .load()。repartition(number)


$ b b

  sqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source,numPartitions = number).load ()


输入拆分大小确定在Spark Partition中放置多少个C *分区。


This is part of my python-spark code which parts of it run too slow for my needs. Especially this part of the code, which I would really like to improve it's speed but don't know how to. It currently takes around 1 minute for 60 Million data rows and I would like to improve it to under 10 seconds.

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

More context of my spark app:

article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
     .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
     .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
     .filter(lambda x:len(x[1])>=2) \
     .map(lambda x:x[1][-1]) \
     .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))    

Thanks a lot for your suggestions.

EDIT:

Count takes up most of the time (50s) not join

I also tried increasing parallelism with but it didn't have any obvious effect:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

and

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

解决方案

First you should figure out what's actually taking the most amount of time.

For example determine how long just reading the data takes

axes = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(table="axes", keyspace=source)
  .load()
  .count()

Increasing the parallelism or number of parallel readers may help this but only if you aren't maxing out the IO of your Cassandra Cluster.

Second, see if you can do everything with the Dataframes api. Every-time you use a python lambda you are incurring serialization costs between the python and scala types.

Edit:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

Will only take effect after the load has completed so this won't help you.

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

Is not a valid parameter for the Spark Cassandra Connector so this won't do anything.

See https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters Input Split Size determines how many C* partitions to put in a Spark Partition.

这篇关于提高火花应用程序的速度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆