pyspark中的地图功能内部操作 [英] operation inside map function in pyspark

查看:74
本文介绍了pyspark中的地图功能内部操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从文件名中获取数据(因为它包含一些信息.)并在不使用loop的情况下将它们写到csvfile_info文件中.我是pyspark的新手.请有人帮助我编写代码,并让我知道如何进行.这就是我尝试过的...

I want to take the data from file name(as it contains some info.) and write those in csvfile_info file without using loop . I am new in pyspark. Please some one help me in code and let me know how can i proceed. This is what i tried...

代码:c = os.path.join("-------")

Code: c = os.path.join("-------")

input_file = sc.textFile(fileDir)
file1= input_file.split('_')
csvfile_info= open(c,'a')
details= file1.map(lambda p:
    name=p[0], 
    id=p[1],
    from_date=p[2],
    to_date=p[3],
    TimestampWithExtension=p[4]\
    file_timestamp=TimestampWithExtension.split('.')[0]\
    info = '{0},{1},{2},{3},{4},{5} \n'.\
    format(name,id,from_date,to_date,file_timestamp,input_file)\
    csvfile_info.write(info)
    )

推荐答案

不要尝试在 map()函数内部写入数据.您应该将每个记录映射到适当的字符串,然后将结果rdd转储到文件中.试试这个:

Don't try to write the data inside of the map() function. You should instead map each record to the appropriate string, and then dump the resultant rdd to a file. Try this:

input_file = sc.textFile(fileDir)  # returns an RDD

def map_record_to_string(x):
    p = x.split('_')
    name=p[0]
    id=p[1]
    from_date=p[2]
    to_date=p[3]
    TimestampWithExtension=p[4]

    file_timestamp=TimestampWithExtension.split('.')[0]
    info = '{0},{1},{2},{3},{4},{5} \n'.format(
        name,
        id,
        from_date,
        to_date,
        file_timestamp,
        input_file
    )
    return info

details = input_file.map(map_record_to_string)  # returns a different RDD
details.saveAsTextFile("path/to/output")

注意:我尚未测试此代码,但这是您可以采用的一种方法.

Note: I haven't tested this code, but this is one approach you could take.

说明

来自文档 input_file = sc.textFile(fileDir)将返回

From the docs, input_file = sc.textFile(fileDir) will return an RDD of strings with the file contents.

您要执行的所有操作都在RDD的内容上,即文件的元素.在RDD上调用 split()是没有意义的,因为 split()是一个字符串函数.您要执行的操作是调用 split(),并对RDD的每个记录(文件中的行)执行其他操作.这正是 map()的功能.

All of the operations you want to do are on the contents of the RDD, the elements of the file. Calling split() on the RDD doesn't make sense, because split() is a string function. What you want to do instead is call split() and the other operations on each record (line in the file) of the RDD. This is exactly what map() does.

RDD就像一个可迭代的对象,但是您不能使用传统的循环对其进行操作.这是允许并行化的抽象.从用户的角度来看, map(f)函数将函数 f 应用于RDD中的每个元素,就像在循环中那样.功能上调用 input_file.map(f)等效于以下内容:

An RDD is like an iterable, but you don't operate on it with a traditional loop. It's an abstraction that allows for parallelization. From the user's perspective the map(f) function applies the function f to each element in the RDD, as it would be done in a loop. Functionally calling input_file.map(f) is equivalent to the following:

# let rdd_as_list be a list of strings containing the contents of the file
map_output = []
for record in rdd_as_list:
    map_output.append(f(record))

或等效地:

# let rdd_as_list be a list of strings containing the contents of the file
map_output = map(f, rdd_as_list)

在RDD上调用 map()会返回一个新的RDD,其内容是应用该函数的结果.在这种情况下, details 是一个新的RDD,并且在 map_record_to_string 处理之后,它包含 input_file 的行.

Calling map() on an RDD returns a new RDD, whose contents are the results of applying the function. In this case, details is a new RDD and it contains the rows of input_file after they have been processed by map_record_to_string.

您还可以将 map()步骤编写为 details = input_file.map(lambda x:map_record_to_string(x)),如果这样更易于理解.

You could have also written the map() step as details = input_file.map(lambda x: map_record_to_string(x)) if that makes it easier to understand.

这篇关于pyspark中的地图功能内部操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆