使用 pyspark 合并多行记录 [英] Merge MultiLine records using pyspark

查看:121
本文介绍了使用 pyspark 合并多行记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类似多行和多分隔符的数据集.我正在使用 spark 2.3 来读取相同的内容.

我想用不同的分隔符转换成单行文件

1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|12223212|*|1212|*|0|*|0|*||*|ABD[c0re] 分数12-- 12--P|*|1234|*|阿斯达|##|1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|2334343|*|1212|*|0|*|0|*||*|ABD[c0re] 分数12-- 12--P|*|1223|*|阿斯达|##|1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|

预期输出

1223232~1212~0~0~~ABDP~1234~asda12223212~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1234~asda1223232~1212~0~0~~ABDP~1234~阿斯达2334343~1212~0~0~~ABD[c0re] 分数 12-- 12--P~1223~asda1223232~1212~0~0~~ABDP~1234~阿斯达

此文件最初是使用 UNIX sed 命令转换的.但是文件的大小增加了 50GB,并且服务器正在挂起.

while read myline做sed -i 's/\r//g' $mylinesed -i ':a;N;$!ba;s/\n//g' $myline # 删除新行sed -i 's+|\#\#|+\n+g' $myline #替换所有 |##|作为新线sed -i 's/~/-/g' $mylinesed -i 's/\\//g' $mylinesed -i 's+|\*|+~+g' $myline # 转换所有 |*|作为~sed -i 's+|\#\#|++g' $myline完成<文件名

这必须用多字符分隔符和数据中的换行符重写为 spark,它无法解决.

我已经编写了 spark 代码,但不确定如何处理 |#|作为换行符或 eod 而不是换行符并用空格重新换行符可能是

df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)

解决方案

我正在设法执行按文件分区的数据,但它也需要一个大集群,因为您提到文件超过 30Gb.

>

from pyspark.sql.window import Window导入 pyspark.sql.functions 作为 fdf = spark.read.csv('your_path', schema='value string')df = df.withColumn('文件名', f.input_file_name())df = df.repartition('文件名')df = df.withColumn('index', f.monotonically_increasing_id())w = Window.partitionBy('文件名').orderBy('index')df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))(df.select('值').sort('组').show(truncate=False))

输出

+--------------------------------------------------------+|价值|+-------------------------------------------------------+|1223232~1212~0~0~~ABDP~1234~asda ||12223212~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1234~asda||1223232~1212~0~0~~ABDP~1234~asda ||2334343~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1223~asda ||1223232~1212~0~0~~ABDP~1234~asda |+-------------------------------------------------------+

I have a dataset like which as a multiline and multi delitmer. I am using spark 2.3 to read the same.

i wanted to convert into a single line file with a different delimeter

1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
12223212|*|1212|*|0|*|0|*||*|ABD
[c0re] score 
12-- 12--P|*|1234|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
2334343|*|1212|*|0|*|0|*||*|ABD
[c0re] score 
12-- 12--P|*|1223|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|

Expected output

1223232~1212~0~0~~ABDP~1234~asda
12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda
1223232~1212~0~0~~ABDP~1234~asda
2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda
1223232~1212~0~0~~ABDP~1234~asda

This file was initially converted using a UNIX sed commands. However the size of the file grew and is 50GB and the servers is being hung.

while read myline
do
sed -i 's/\r//g' $myline
sed -i ':a;N;$!ba;s/\n//g' $myline # removes new line
sed -i 's+|\#\#|+\n+g' $myline #replaces all |##| as new line
sed -i 's/~/-/g' $myline 
sed -i 's/\\/  /g' $myline
sed -i 's+|\*|+~+g' $myline # converts all |*| as ~
sed -i 's+|\#\#|++g' $myline
done < filename

This has to be rewritten into spark with multichracters delimters and newline in data its notworking out.

I have the spark code written as but unsure how to treat the |#| as newline or eod instead of newline and reaplce newline with space may be

df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)

解决方案

I'm managing to execute your data partitioned by file, but it will requires a big cluster as well, since you mentioned that has files over 30Gb.

from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.read.csv('your_path', schema='value string')
df = df.withColumn('filename', f.input_file_name())
df = df.repartition('filename')

df = df.withColumn('index', f.monotonically_increasing_id())

w = Window.partitionBy('filename').orderBy('index')
df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))

df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))
df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))

df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))
(df
 .select('value')
 .sort('group')
 .show(truncate=False))

Output

+-------------------------------------------------------+
|value                                                  |
+-------------------------------------------------------+
|1223232~1212~0~0~~ABDP~1234~asda                       |
|12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
|1223232~1212~0~0~~ABDP~1234~asda                       |
|2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
|1223232~1212~0~0~~ABDP~1234~asda                       |
+-------------------------------------------------------+

这篇关于使用 pyspark 合并多行记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆