如何在Spark中的RDD中跳过多于一行的标题 [英] How to skip more then one lines of header in RDD in Spark

查看:357
本文介绍了如何在Spark中的RDD中跳过多于一行的标题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的第一个RDD中的数据就像

Data in my first RDD is like

1253
545553
12344896
1 2 1
1 43 2
1 46 1
1 53 2

现在,前3个整数是我需要广播的一些计数器. 之后,所有行的格式都相同

Now the first 3 integers are some counters that I need to broadcast. After that all the lines have the same format like

1 2 1
1 43 2

在对它们进行函数计算之后,我会将3个计数器之后的所有这些值映射到新的RDD. 但是我不明白如何将前三个值分开并正常映射其余三个值.

I will map all those values after 3 counters to a new RDD after doing some computation with them in function. But I'm not able to understand how to separate those first 3 values and map the rest normally.

我的Python代码就是这样

My Python code is like this

documents = sc.textFile("file.txt").map(lambda line: line.split(" "))

final_doc = documents.map(lambda x: (int(x[0]), function1(int(x[1]), int(x[2])))).reduceByKey(lambda x, y: x + " " + y)

仅当前三个值不在文本文件中但与之对应时,它才出错.

It works only when first 3 values are not in the text file but with them it gives error.

我不想跳过前三个值,而是将它们存储在3个广播变量中,然后将剩余的数据集传递给map函数.

I don't want to skip those first 3 values, but store them in 3 broadcast variables and then pass the remaining dataset in map function.

是的,文本文件必须仅采用该格式.我无法删除这3个值/计数器

And yes the text file has to be in that format only. I cannot remove those 3 values/counters

Function1只是进行一些计算并返回值.

Function1 is just doing some computation and returning the values.

推荐答案

  1. Python 2的导入

  1. Imports for Python 2

from __future__ import print_function

  • 准备虚拟数据:

  • Prepare dummy data:

    s = "1253\n545553\n12344896\n1 2 1\n1 43 2\n1 46 1\n1 53 2"
    with open("file.txt", "w") as fw: fw.write(s)
    

  • 读取原始输入:

  • Read raw input:

    raw = sc.textFile("file.txt")
    

  • 提取标头:

  • Extract header:

    header = raw.take(3)
    print(header)
    ### [u'1253', u'545553', u'12344896']
    

  • 过滤行:

  • Filter lines:

    content = raw.zipWithIndex().filter(lambda kv: kv[1] > 2).keys()
    print(content.first())
    ## 1 2 1
    

  • 使用 mapPartitionsWithIndex

  • using mapPartitionsWithIndex

    from itertools import islice
    
    content = raw.mapPartitionsWithIndex(
        lambda i, iter: islice(iter, 3, None) if i == 0 else iter)
    
    print(content.first())
    ## 1 2 1
    

  • 注意:所有功劳归于 pzecevic

    NOTE: All credit goes to pzecevic and Sean Owen (see linked sources).

    这篇关于如何在Spark中的RDD中跳过多于一行的标题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆