基于ROW_NUMBER过滤RDD [英] Filter RDD based on row_number

查看:522
本文介绍了基于ROW_NUMBER过滤RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

sc.textFile(路径)允许读取HDFS文件,但它不接受的参数(像跳过行数,has_headers的,...)。

sc.textFile(path) allows to read an HDFS file but it does not accept parameters (like skip a number of rows, has_headers,...).

在学习星火奥赖利电子书,它的建议使用下面的函数读取一个CSV(示例5-12。Python的负载CSV为例)

in the "Learning Spark" O'Reilly e-book, it's suggested to use the following function to read a CSV (Example 5-12. Python load CSV example)

import csv
import StringIO

def loadRecord(line):
"""Parse a CSV line"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

我的问题是关于如何有选择性的关于采取行:

My question is about how to be selective about the rows "taken":


  1. 如何避免加载的第一行(标题)

  2. 如何删除特定行(例如,第5行)

我在这里看到一些体面的解决办法:<一href=\"http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd\">select元素的范围的,但我想看看是否有什么更简单。

I see some decent solutions here: select range of elements but I'd like to see if there is anything more simple.

THX!

推荐答案

不要担心加载你不需要的行/线。当你这样做:

Don't worry about loading the rows/lines you don't need. When you do:

input = sc.textFile(inputFile)

您不加载文件。你只是获得一个对象,将让你的文件进行操作。因此,为了提高效率,这是更好地认为只有你想要得到什么样的条款。例如:

you are not loading the file. You are just getting an object that will allow you to operate on the file. So to be efficient, it is better to think in terms of getting only what you want. For example:

header = input.take(1)[0]
rows = input.filter(lambda line: line != header)

请注意,这里我不使用索引来引用我想放弃,而是它的价值就行了。这样做的副作用是其他行该值也将被忽略,但更多的是星火的精神星火展览将在节点的不同部分分配您的文本文件和行号的概念得到每个分区丢失。这也是为什么这是不容易的火花(Hadoop的)做,因为每个分区应被视为独立和全球行号会打破这个假设的原因。

Note that here I am not using an index to refer to the line I want to drop but rather its value. This has the side effect that other lines with this value will also be ignored but is more in the spirit of Spark as Spark will distribute your text file in different parts across the nodes and the concept of line numbers gets lost in each partition. This is also the reason why this is not easy to do in Spark(Hadoop) as each partition should be considered independent and a global line number would break this assumption.

如果您真的需要行号,我建议您将它们添加到文件星火以外的工作(见这里),然后只是此列星火内进行过滤。

If you really need to work with line numbers I recommend that you add them to the file outside of Spark(see here) and then just filter by this column inside of Spark.

修改:由@Daniel Darabos建议新增 zipWithIndex 解决方案

Edit: Added zipWithIndex solution as suggested by @Daniel Darabos.

sc.textFile('test.txt')\
  .zipWithIndex()\            # [(u'First', 0), (u'Second', 1), ...
  .filter(lambda x: x[1]!=5)\ # select columns
  .map(lambda x: x[0])\       # [u'First', u'Second'
  .collect()

这篇关于基于ROW_NUMBER过滤RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆