Python的星火如何映射表RDD领域到另一个RDD [英] Python Spark How to Map Fields of one rdd to another rdd

查看:305
本文介绍了Python的星火如何映射表RDD领域到另一个RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很新的蟒蛇火花根据上述主题,我想一个RDD的字段映射到另一个Rdd.Here领域就是例子

I am very new to python spark as per above subject i want to map the fields of one Rdd to the field of another Rdd.Here is the example

RDD1集:

c_id    name 
121210  abc
121211  pqr

RDD2:

c_id   cn_id cn_value
121211  0     0
121210  0     1

所以匹配C_ID将命名替换为 CNID 和聚集在 cn_value 。所以输出会喜欢这个ABC 0 0 0 PQR 1

So the matched c_id will replace by name with cnid and the aggregated the cn_value. So the output will like this abc 0 0 pqr 0 1

from pyspark import SparkContext
import csv
sc = SparkContext("local", "spark-App")
file1 = sc.textFile('/home/hduser/sample.csv').map(lambda line:line.split(',')).filter(lambda line:len(line)>1)
file2 = sc.textFile('hdfs://localhost:9000/sample2/part-00000').map(lambda line:line.split(','))
file1_fields = file1.map(lambda x: (x[0],x[1]))
file2_fields = file2.map(lambda x: (x[0],x[1],float(x[2])))

如何通过把一些code在这里实现我的目标。

How can i achieve my goal by putting some code here.

任何帮助将非常AP preciated
感谢您

Any Help will highly appreciated thanks you

推荐答案

操作你正在寻找被称为加入。鉴于你的结构它可能最好使用 DataFrames 火花CSV (我假设第二个文件是用逗号分隔的为好,但没有标头)。让我们开始以虚拟数据:

Operation you're looking for is called join. Given a structure of your it is probably best to use DataFrames and spark-csv (I assume that the second file is comma-separated as well, but has no header). Lets start with dummy data:

file1 = ... # path to the first file
file2 = ... # path to the second file

with open(file1, "w") as fw:
    fw.write("c_id,name\n121210,abc\n121211,pqr")

with open(file2, "w") as fw:
    fw.write("121211,0,0\n121210,0,1")

阅读第一档:

df1 = (sqlContext.read 
    .format('com.databricks.spark.csv')
    .options(header='true', inferSchema='true')
    .load(file1))

加载第二个文件:

Load second file:

schema = StructType(
    [StructField(x, LongType(), False) for x in ("c_id", "cn_id", "cn_value")])

df2 = (sqlContext.read 
    .format('com.databricks.spark.csv')
    .schema(schema)
    .options(header='false')
    .load(file2))

最后加入:

combined = df1.join(df2, df1["c_id"] == df2["c_id"])
combined.show()

## +------+----+------+-----+--------+
## |  c_id|name|  c_id|cn_id|cn_value|
## +------+----+------+-----+--------+
## |121210| abc|121210|    0|       1|
## |121211| pqr|121211|    0|       0|
## +------+----+------+-----+--------+

修改

通过RDDS你有,你可以做这样的事情:

With RDDs you have you can do something like this:

file1_fields.join(file2_fields.map(lambda x: (x[0], x[1:])))

这篇关于Python的星火如何映射表RDD领域到另一个RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆