星火:在RDD地图使用迭代lambda函数() [英] Spark: Using iterator lambda function in RDD map()

查看:190
本文介绍了星火:在RDD地图使用迭代lambda函数()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有说我加载到HDFS星火简单的数据集。它看起来是这样的:

  1 1 1 1 1 1 1
1 1 1 1 1 1 1
1 1 1 1 1 1 1
...

基本上,一个矩阵。我想实现的东西,需要分组矩阵的行,所以我想添加一个唯一键的每一行,像这样:

 (1,[1 1 1 1 1 ...])
(2,[1 1 1 1 1 ...])
(3,[1 1 1 1 1 ...])
...

我想有些东西天真:设置一个全局变量,写一个lambda函数来遍历全局变量:

 #初始化全局指数
全球global_index
global_index = 0#函数生成键
高清generateKeys(X):
    global_index + = 1
    回报(global_index,X)#读取数据,并对其进行操作
数据= sc.textFile(/ data.txt中)......有些preprocessing ...Data.Map中(generateKeys)

和它似乎无法识别全局变量的存在。

有没有想到这样做一个简单的方法?

谢谢,
杰克


解决方案

 >>>登陆舰= [
... [1,1,1,1,1,1],
... [1,1,1,1,1,1],
... [1,1,1,1,1,1],
... [1,1,1,1,1,1],
... [1,1,1,1,1,1],
... [1,1,1,1,1,1],
... [1,1,1,1,1,2],
... [1,1,1,2,1,2]
...]
...
>>>列表(枚举(登陆舰))
[(0,[1,1,1,1,1,1]),
 (1,[1,1,1,1,1,1]),
 (2,[1,1,1,1,1,1]),
 (3,[1,1,1,1,1,1]),
 (4,[1,1,1,1,1,1]),
 (5,[1,1,1,1,1,1]),
 (6,[1,1,1,1,1,2]),
 (7,[1,1,1,2-,1,2])]

枚举在迭代的每个项目生成唯一索引,并产生元组值(索引,original_item)

如果你想开始与其他编号比 0 ,传递起点值枚举作为第二个参数。

 >>>列表(枚举(登陆舰,1))
[(1,[1,1,1,1,1,1]),
 (2,[1,1,1,1,1,1]),
 (3,[1,1,1,1,1,1]),
 (4,[1,1,1,1,1,1]),
 (5,[1,1,1,1,1,1]),
 (6,[1,1,1,1,1,1]),
 (7,[1,1,1,1,1,2]),
 (8,[1,1,1,2-,1,2])]

请注意,该列表用于从获得真正的价值进行枚举这是迭代器而不是一个函数,返回列表。

备选:全球可用的ID分配器

枚举易于使用,但是如果你需要在你的code的diferrent件assing ID,它
会变得困难或不可能。对于这样的情况下,全局可用发生器(如在起草
OP)将是要走的道路。

和itertools 提供计数这可以为我们的需要:

 >>>从进口和itertools计数
>>> idgen = COUNT()

现在,我们有(全球可用) idgen 发电机准备产生唯一的ID。

我们可以通过一个函数测试 PRID (打印ID):

 >>>高清PRID():
... ID = idgen.next()
...打印ID
...
>>> PRID()
0
>>> PRID()
1
>>> PRID()
2
>>> PRID()
3

由于它的工作原理,我们可以测试它的值列表:

 >>> LST = ['100','101','102','103','104','105','106','107','108','109']

和定义实际功能,当与一个所谓的价值将返回元组(ID,值)

 >>>高清assignId(VAL):
...回报(idgen.next(),VAL)
...

请注意,没有必要宣布 idgen 全球,我们不会改变它的值( idgen 只会改变它的调用时内部状态,但仍然会保持不变发电机)。

测试,如果它的工作原理:

 >>> assignId(ahahah)
(4,'ahahah')

和尝试就行了:

 >>>图(assignId,LST)
[(5,'100'),
 (6,'101'),
 (7,'102'),
 (8,'103'),
 (9,'104'),
 (10,'105'),
 (11,'106'),
 (12,'107'),
 (13,'108'),
 (14,'109')]

主要diferrence到枚举的解决方案是,我们可以将IDS一一在code的任何地方
没有从一个在做这一切的一切处理枚举

 >>> assignId(孤独线)
(15,'孤独线)

I have simple dataset on HDFS that I'm loading into Spark. It looks like this:

1 1 1 1 1 1 1
1 1 1 1 1 1 1
1 1 1 1 1 1 1
...

basically, a matrix. I'm trying to implement something that requires grouping matrix rows, and so I'm trying to add a unique key for every row like so:

(1, [1 1 1 1 1 ... ])
(2, [1 1 1 1 1 ... ])
(3, [1 1 1 1 1 ... ])
...

I tried something somewhat naive: set a global variable and write a lambda function to iterate over the global variable:

# initialize global index
global global_index
global_index = 0

# function to generate keys
def generateKeys(x):
    global_index+=1
    return (global_index,x)

# read in data and operate on it
data = sc.textFile("/data.txt")

...some preprocessing...

data.map(generateKeys)

And it seemed to not recognize the existence of the global variable.

Is there an easy way that comes to mind to do this?

Thanks, Jack

解决方案

>>> lsts = [
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 2],
...     [1, 1, 1, 2, 1, 2]
...     ]
...
>>> list(enumerate(lsts))
[(0, [1, 1, 1, 1, 1, 1]),
 (1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 2]),
 (7, [1, 1, 1, 2, 1, 2])]

enumerate generates unique index for each item in the iterable and yields tuples with values (index, original_item)

If you want to start numbering with other than 0, pass the starting value to enumerate as second parameter.

>>> list(enumerate(lsts, 1))
[(1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 1]),
 (7, [1, 1, 1, 1, 1, 2]),
 (8, [1, 1, 1, 2, 1, 2])]

Note, that the list is used to get real values out from enumerate which is iterator and not a function, returning lists.

Alternative: globally available id assigner

enumerate is easy to use, but if you would need to assing id in diferrent pieces of your code, it would become difficult or impossible. For such a case, globally available generator (as drafter in OP) would be the way to go.

itertools provide count which can serve our need:

>>> from itertools import count
>>> idgen = count()

Now we have (globally available) idgen generator ready to yield unique ids.

We can test it by a function prid (print id):

>>> def prid():
...     id = idgen.next()
...     print id
...
>>> prid()
0
>>> prid()
1
>>> prid()
2
>>> prid()
3

As it works we can test it on list of values:

>>> lst = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109']

and define actual function, which when called with a value would return tuple (id, value)

>>> def assignId(val):
...     return (idgen.next(), val)
...

note, that there is no need to declare idgen as global as we are not going to change it's value (the idgen will only change it's internal status when called, but will still remain the same generator).

Test, if it works:

>>> assignId("ahahah")
(4, 'ahahah')

and try it on the list:

>>> map(assignId, lst)
[(5, '100'),
 (6, '101'),
 (7, '102'),
 (8, '103'),
 (9, '104'),
 (10, '105'),
 (11, '106'),
 (12, '107'),
 (13, '108'),
 (14, '109')]

The main diferrence to enumerate solution is, we can assign ids one by one anywhere in the code without doing it all from within one all processing enumerate.

>>> assignId("lonely line")
(15, 'lonely line')

这篇关于星火:在RDD地图使用迭代lambda函数()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆