在PySpark中进行分类还原的最有效方法是什么? [英] What is the most efficient way to do a sorted reduce in PySpark?

查看:160
本文介绍了在PySpark中进行分类还原的最有效方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在分析2015年以来美国国内航班的按时绩效记录.我需要按机尾编号分组,并将每个机尾编号的所有航班的日期排序列表存储在数据库中,以便由我的应用程序检索.我不确定实现这一目标的两个选择中哪一个是最佳选择.

I am analyzing on-time performance records of US domestic flights from 2015. I need to group by tail number, and store a date sorted list of all the flights for each tail number in a database, to be retrieved by my application. I am not sure which of two options for achieving this is the best one.

# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')

# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x: 
  (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
  )

我可以用简化的方式实现这一目标...

I can achieve this in a reduce sort...

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))

或者我可以在随后的地图工作中实现它...

Or I can achieve it in a subsequent map job...

# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple: 
    (
      tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
    )

在reduce中执行此操作似乎效率很低,但实际上两者都非常慢.在PySpark文档中sorted()看起来很像这样做的方式,所以我想知道PySpark是否不会在内部制作该犹太洁食产品?出于其他原因,哪个选项是最有效或最佳选择?

Doing this in the reduce seems really inefficient, but in fact both are very slow. sorted() looks like the way to do this in the PySpark docs, so I'm wondering if PySpark doesn't make this kosher internally? Which option is the most efficient or the best choice for some other reason?

我的代码也位于此处: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

My code is also in a gist here: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

如果您对数据感到好奇,可以从运输统计局(Bureau of Transportation Statistics)获得,网址为: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=准时

If you're curious about the data, it is from the Bureau of Transportation Statistics, here: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

推荐答案

不幸的是,在开始排序之前,两种方法都是错误的,而且Spark中没有有效且简单的方法.尽管如此,第一个要比另一个严重得多.

Unfortunately both ways are wrong before you even start sorting and there is no effective and simple way of doing this in Spark. Still, the first one is significantly worse than the other.

为什么两种方式都不对?因为它只是另一个groupByKey,并且它只是一个昂贵的操作.有一些方法可以尝试改进(特别是避免地图缩小),但是到了最后,您只需要付出全部洗牌的代价,并且如果您看不到任何失败,则可能不值得大惊小怪.

Why both ways are wrong? Because it is just another groupByKey and it is simply an expensive operation. There are some ways you can try to improve things (especially to avoid map side reduction) but at the end of the day you just have to pay the price of a full shuffle and if you don't see any failures it is probably not worth all the fuss.

不过,第二种方法在算法上要好得多*.如果您想像第一次尝试一样一直保持排序的结构,则应该使用专用工具(aggregateByKeybisect.insort是一个不错的选择),但实际上没有任何好处.

Still, the second approach is much better algorithmically*. If you want to keep sorted structure all the way through like in the first attempt you should dedicated tools (aggregateByKey with bisect.insort would be a good choice) but there is really nothing to gain here.

如果分组输出是硬性要求,那么您可以做的最好的事情就是keyBygroupByKey并进行排序.它不会在第二种解决方案上提高性能,但是可以说会提高可读性:

If the grouped output is a hard requirement the best thing you can do is to keyBy, groupByKey and sort. It won't improve performance over the second solution but arguably will improve readability:

(flights
    .keyBy(lambda x: x[5])
    .groupByKey()
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5])))


*即使您假设 Timsort 的最佳情况,第一种方法也是N次 O(N),而在最坏的情况下,第二个是 O(N log N).


* Even if you assume the best case scenario for Timsort the first approach is N times O(N) while the second one is O(N log N) in the worst case scenario.

这篇关于在PySpark中进行分类还原的最有效方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆