提高火花应用程序的速度 [英] Improve speed of spark app
问题描述
这是我的python-spark代码的一部分,它的一部分对我的需要运行太慢。
特别是这部分代码,我真的想提高它的速度,但不知道如何。
sqlContext.read(6百万行数据行)现在需要大约1分钟才能完成。 .format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source).load()
我的spark应用程式的更多内容:
article_ids = sqlContext.read.format org.apache.spark.sql.cassandra)。options(table =article_by_created_at,keyspace = source).load()。 2)
$ p $
axes = sqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source).load()
speed_df = article_ids.join(axes,article_ids.article == axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares)\
.map(lambda x:(x.article,[x]))。reduceByKey(lambda x,y:x + y)\
。 [1],key = lambda y:y.at,reverse = False)))\
.filter(lambda x:len(x [1])> = 2)\
。 map(lambda x:x [1] [ - 1])\
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+如果x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
非常感谢您的建议。
编辑:
我也尝试增加并行性,但没有任何明显的效果:
sqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source) .load()。repartition(number)
和
$ b bsqlContext.read.format(org.apache.spark.sql.cassandra)。options(table =axes,keyspace = source,numPartitions = number).load ()
输入拆分大小确定在Spark Partition中放置多少个C *分区。This is part of my python-spark code which parts of it run too slow for my needs. Especially this part of the code, which I would really like to improve it's speed but don't know how to. It currently takes around 1 minute for 60 Million data rows and I would like to improve it to under 10 seconds.
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
More context of my spark app:
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2) axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \ .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ .filter(lambda x:len(x[1])>=2) \ .map(lambda x:x[1][-1]) \ .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
Thanks a lot for your suggestions.
EDIT:
Count takes up most of the time (50s) not join
I also tried increasing parallelism with but it didn't have any obvious effect:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
and
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
解决方案First you should figure out what's actually taking the most amount of time.
For example determine how long just reading the data takes
axes = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(table="axes", keyspace=source) .load() .count()
Increasing the parallelism or number of parallel readers may help this but only if you aren't maxing out the IO of your Cassandra Cluster.
Second, see if you can do everything with the Dataframes api. Every-time you use a python lambda you are incurring serialization costs between the python and scala types.
Edit:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number)
Will only take effect after the load has completed so this won't help you.
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()
Is not a valid parameter for the Spark Cassandra Connector so this won't do anything.
See https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters Input Split Size determines how many C* partitions to put in a Spark Partition.
这篇关于提高火花应用程序的速度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!