将文件名附加到RDD [英] Append Filename to RDD

查看:90
本文介绍了将文件名附加到RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含我的数据文件的文件夹.每个文件的大小约为1 GB.

I have a folder containing my data files. Each file has a size of about 1 GB.

我需要的是RDD中的文件名.

What I need is the filename inside the RDD.

以下操作无法按预期进行:

Following does not work as expected:

import glob
rdds = []
for filename in glob.iglob('/data/*'):
    rdd = sc.textFile(filename).map(lambda row: (filename, row))
    rdds.append(rdd)

allData = sc.union(rdds)

使用此代码,文件名始终是最后读取文件的文件名

using this, filename is always the filename of the last read file

我也尝试过:

import glob
rdds = []
for filename in glob.iglob('/data/*'):
    def f(name=filename):
        return name
    rdd = sc.textFile(filename).map(lambda row: (f(), row))
    rdds.append(rdd)

allData = sc.union(rdds)

但这会产生错误:广播只能在驱动程序中序列化

sc.wholeTextFile(),因为单个文件太大.

sc.wholeTextFile() is not an option because the single files are to big.

推荐答案

我想您想要这样的东西:

I guess you want something like this:

import functools

def proc(f):
    return sc.textFile(f).map(lambda x: (f, x))

rdd = functools.reduce(
    lambda rdd1, rdd2: rdd1.union(rdd2),
    (proc(f) for f in glob.glob("/data/*")))

或使用 sc.union :

sc.union([proc(f) for f in glob.glob("/data/*")])

这篇关于将文件名附加到RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆