使用 pyspark 合并多行记录 [英] Merge MultiLine records using pyspark
问题描述
我有一个类似多行和多分隔符的数据集.我正在使用 spark 2.3 来读取相同的内容.
我想用不同的分隔符转换成单行文件
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|12223212|*|1212|*|0|*|0|*||*|ABD[c0re] 分数12-- 12--P|*|1234|*|阿斯达|##|1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|2334343|*|1212|*|0|*|0|*||*|ABD[c0re] 分数12-- 12--P|*|1223|*|阿斯达|##|1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
预期输出
1223232~1212~0~0~~ABDP~1234~asda12223212~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1234~asda1223232~1212~0~0~~ABDP~1234~阿斯达2334343~1212~0~0~~ABD[c0re] 分数 12-- 12--P~1223~asda1223232~1212~0~0~~ABDP~1234~阿斯达
此文件最初是使用 UNIX sed 命令转换的.但是文件的大小增加了 50GB,并且服务器正在挂起.
while read myline做sed -i 's/\r//g' $mylinesed -i ':a;N;$!ba;s/\n//g' $myline # 删除新行sed -i 's+|\#\#|+\n+g' $myline #替换所有 |##|作为新线sed -i 's/~/-/g' $mylinesed -i 's/\\//g' $mylinesed -i 's+|\*|+~+g' $myline # 转换所有 |*|作为~sed -i 's+|\#\#|++g' $myline完成<文件名
这必须用多字符分隔符和数据中的换行符重写为 spark,它无法解决.
我已经编写了 spark 代码,但不确定如何处理 |#|作为换行符或 eod 而不是换行符并用空格重新换行符可能是
df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)
我正在设法执行按文件分区的数据,但它也需要一个大集群,因为您提到文件超过 30Gb.
>
from pyspark.sql.window import Window导入 pyspark.sql.functions 作为 fdf = spark.read.csv('your_path', schema='value string')df = df.withColumn('文件名', f.input_file_name())df = df.repartition('文件名')df = df.withColumn('index', f.monotonically_increasing_id())w = Window.partitionBy('文件名').orderBy('index')df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))(df.select('值').sort('组').show(truncate=False))
输出
+--------------------------------------------------------+|价值|+-------------------------------------------------------+|1223232~1212~0~0~~ABDP~1234~asda ||12223212~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1234~asda||1223232~1212~0~0~~ABDP~1234~asda ||2334343~1212~0~0~~ABD[c0re] 得分 12-- 12--P~1223~asda ||1223232~1212~0~0~~ABDP~1234~asda |+-------------------------------------------------------+
I have a dataset like which as a multiline and multi delitmer. I am using spark 2.3 to read the same.
i wanted to convert into a single line file with a different delimeter
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
12223212|*|1212|*|0|*|0|*||*|ABD
[c0re] score
12-- 12--P|*|1234|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
2334343|*|1212|*|0|*|0|*||*|ABD
[c0re] score
12-- 12--P|*|1223|*|asda|##|
1223232|*|1212|*|0|*|0|*||*|ABDP|*|1234|*|asda|##|
Expected output
1223232~1212~0~0~~ABDP~1234~asda
12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda
1223232~1212~0~0~~ABDP~1234~asda
2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda
1223232~1212~0~0~~ABDP~1234~asda
This file was initially converted using a UNIX sed commands. However the size of the file grew and is 50GB and the servers is being hung.
while read myline
do
sed -i 's/\r//g' $myline
sed -i ':a;N;$!ba;s/\n//g' $myline # removes new line
sed -i 's+|\#\#|+\n+g' $myline #replaces all |##| as new line
sed -i 's/~/-/g' $myline
sed -i 's/\\/ /g' $myline
sed -i 's+|\*|+~+g' $myline # converts all |*| as ~
sed -i 's+|\#\#|++g' $myline
done < filename
This has to be rewritten into spark with multichracters delimters and newline in data its notworking out.
I have the spark code written as but unsure how to treat the |#| as newline or eod instead of newline and reaplce newline with space may be
df = sc.textFile(source_filename).map(lambda x: x.split("|*|")).toDF(header_column)
I'm managing to execute your data partitioned by file, but it will requires a big cluster as well, since you mentioned that has files over 30Gb.
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = spark.read.csv('your_path', schema='value string')
df = df.withColumn('filename', f.input_file_name())
df = df.repartition('filename')
df = df.withColumn('index', f.monotonically_increasing_id())
w = Window.partitionBy('filename').orderBy('index')
df = df.withColumn('group', f.sum(f.lag('value', default=False).over(w).endswith(f.lit('|##|')).cast('int')).over(w))
df = df.withColumn('value', f.regexp_replace('value', '\|\*\|', '~'))
df = df.withColumn('value', f.regexp_replace('value', '\|##\|', ''))
df = df.groupBy('group').agg(f.concat_ws('', f.collect_list('value')).alias('value'))
(df
.select('value')
.sort('group')
.show(truncate=False))
Output
+-------------------------------------------------------+
|value |
+-------------------------------------------------------+
|1223232~1212~0~0~~ABDP~1234~asda |
|12223212~1212~0~0~~ABD[c0re] score 12-- 12--P~1234~asda|
|1223232~1212~0~0~~ABDP~1234~asda |
|2334343~1212~0~0~~ABD[c0re] score 12-- 12--P~1223~asda |
|1223232~1212~0~0~~ABDP~1234~asda |
+-------------------------------------------------------+
这篇关于使用 pyspark 合并多行记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!