根据pyspark中的条件在spark中合并两行 [英] Combine two rows in spark based on a condition in pyspark

查看:545
本文介绍了根据pyspark中的条件在spark中合并两行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下格式的输入记录: 输入数据格式

I have input record in following format: Input data format

我希望以以下格式对数据进行透入式格式化: 输出数据格式

I want the data to be transofmmed in the following format: Output data format

我想根据条件类型合并两行.

I want to combine my 2 rows based on the condition type.

据我所知,我需要获取3个数据字段的组合键,并在它们相等时比较类型字段.

As per my knowledge I need to take the composite key of the 3 data fields and compare the type fields once they are equal.

有人可以帮助我使用Python在Spark中实施吗?

Can someone please help me with the implementation in Spark using Python?

以下是我在pyspark中使用RDD的尝试

Following is my try using RDD in pyspark

record = spark.read.csv("wasb:///records.csv",header=True).rdd
print("Total records: %d")%record.count()
private_ip = record.map(lambda fields: fields[2]).distinct().count()
private_port = record.map(lambda fields: fields[3]).distinct().count()
destination_ip = record.map(lambda fields: fields[6]).distinct().count()
destination_port = record.map(lambda fields: fields[7]).distinct().count()
print("private_ip:%d, private_port:%d, destination ip:%d, destination_port:%d")%(private_ip,private_port,destination_ip,destination_port)
types = record.map(lambda fields: ((fields[2],fields[3],fields[6],fields[7]),fields[0])).reduceByKey(lambda a,b:a+','+b)
print types.first()

以下是我到目前为止的输出.

And following is my output till now.

((u'100.79.195.101', u'54835', u'58.96.162.33', u'80'), u'22-02-2016 13:11:03,22-02-2016 13:13:53')

推荐答案

希望这会有所帮助!
(编辑注释:在获取更新的需求后对代码进行了调整)

Hope this helps!
(Edit note: tweaked code after getting the updated requirement)

import pyspark.sql.functions as func
#create RDD
rdd = sc.parallelize([(22,'C','xxx','yyy','zzz'),(23,'D','xxx','yyy','zzz'),(24,'C','xxx1','yyy1','zzz1'),(25,'D','xxx1','yyy1','zzz1')])

#convert RDD to dataframe
df = rdd.toDF(['Date','Type','Data1','Data2','Data3'])
df.show()

#group by 3 data columns to create list of date & type
df1 = df.sort("Data1","Data2","Data3","Type").groupBy("Data1","Data2","Data3").agg(func.collect_list("Type"),func.collect_list("Date")).withColumnRenamed("collect_list(Type)", "Type_list").withColumnRenamed("collect_list(Date)", "Date_list")
#add 2 new columns by splitting above date list based on type list's value
df2 = df1.where((func.col("Type_list")[0]=='C') & (func.col("Type_list")[1]=='D')).withColumn("Start Date",df1.Date_list[0]).withColumn("End Date",df1.Date_list[1])
#select only relevant columns as an output
df2.select("Data1","Data2","Data3","Start Date","End Date").show()



使用RDD的替代解决方案:-
(编辑注释:在摘要下方添加,因为@AnmolDave也对RDD解决方案感兴趣)



Alternate solution using RDD :-
(Edit note: added below snippet as @AnmolDave is interested in RDD solution as well)

import pyspark.sql.types as typ
rdd = sc.parallelize([('xxx','yyy','zzz','C',22),('xxx','yyy','zzz','D',23),('xxx1','yyy1','zzz1','C', 24),('xxx1','yyy1','zzz1','D', 25)])
reduced = rdd.map(lambda row: ((row[0], row[1], row[2]), [(row[3], row[4])]))\
    .reduceByKey(lambda x,y: x+y)\
    .map(lambda row: (row[0], sorted(row[1], key=lambda text: text[0])))\
    .map(lambda row: (
            row[0][0],
            row[0][1],
            row[0][2],
            ','.join([str(e[0]) for e in row[1]]),
            row[1][0][1],
            row[1][1][1]
        )
    )\
    .filter(lambda row: row[3]=="C,D")

schema_red = typ.StructType([
        typ.StructField('Data1', typ.StringType(), False),
        typ.StructField('Data2', typ.StringType(), False),
        typ.StructField('Data3', typ.StringType(), False),
        typ.StructField('Type', typ.StringType(), False),
        typ.StructField('Start Date', typ.StringType(), False),
        typ.StructField('End Date', typ.StringType(), False)
    ])

df_red = sqlContext.createDataFrame(reduced, schema_red)
df_red.show()

这篇关于根据pyspark中的条件在spark中合并两行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆