使用自定义列读取pyspark中的文件并记录分隔符 [英] Read a file in pyspark with custom column and record delmiter

查看:19
本文介绍了使用自定义列读取pyspark中的文件并记录分隔符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 pyspark.csv 中读取 csv 文件时,有什么方法可以使用自定义记录分隔符.在我的文件中,记录由 ** 而不是换行符分隔.在将 csv 读入 PySpark 数据帧时,有没有办法使用这个自定义行/记录分隔符?我的列分隔符也是 ';'下面的代码正确获取列,但它只算作一行

from pyspark import SparkContextsc = SparkSession.builder.appName('temp').getOrCreate()df = sc.read.format('csv').option("header", "false").option("delimiter", ';').option("inferSchema", "true").load(s3 上的一些文件")

解决方案

我会将它作为纯文本文件读入 rdd,然后在作为换行符的字符处拆分.然后将其转换为数据帧像这样

rdd1= (sc.textFile("/jupyter/nfs/test.txt").flatMap(lambda line: line.split(**")).map(lambda x: x.split(";")))df1=rdd1.toDF([a",b",c"])df1.show()+---+---+---+|一个|乙|| |+---+---+---+|a1|b1|c1||a2|乙2|c2||a3|乙2|c3|+---+---+---+

或者如果这样

<预><代码>rdd2= (sc.textFile("/jupyter/nfs/test.txt").flatMap(lambda line: line.split(**")).map(lambda x: [x]))df2=(rdd2.toDF([abc"]).withColumn("a",f.split(f.col("abc"),";")[0]).withColumn("b",f.split(f.col("abc"),";")[1]).withColumn("c",f.split(f.col("abc"),";")[2]).drop(abc"))df2.show()+---+---+---+|一个|乙|| |+---+---+---+|a1|b1|c1||a2|乙2|c2||a3|乙2|c3|+---+---+---+

test.txt 的样子

a1;b1;c1**a2;b2;c2**a3;b2;c3

Is there any way to use custom record delimiters while reading a csv file in pyspark. In my file records are separated by ** instead of newline. Is there any way of using this custom line/record separator when reading the csv into a PySpark dataframe? Also my column seperators are ';' The code below gets the columns correctly but it counts as only one row

from pyspark import SparkContext 
sc =  SparkSession.builder.appName('temp').getOrCreate()
df = sc.read.format('csv').option("header", "false").option("delimiter", ';').option("inferSchema", "true").load("some-file-on-s3")

解决方案

I would read it as a pure text file into a rdd and then split on the character that is your line break. Afterwards convert it to a dataframe Like this

rdd1= (sc
       .textFile("/jupyter/nfs/test.txt")
       .flatMap(lambda line: line.split("**"))
       .map(lambda x: x.split(";"))
      )
df1=rdd1.toDF(["a","b","c"])
df1.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
| a1| b1| c1|
| a2| b2| c2|
| a3| b2| c3|
+---+---+---+

or if like this


rdd2= (sc
       .textFile("/jupyter/nfs/test.txt")
       .flatMap(lambda line: line.split("**"))
       .map(lambda x: [x])
      )
df2=(rdd2
     .toDF(["abc"])
     .withColumn("a",f.split(f.col("abc"),";")[0])
     .withColumn("b",f.split(f.col("abc"),";")[1])
     .withColumn("c",f.split(f.col("abc"),";")[2])
     .drop("abc")
    )
df2.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
| a1| b1| c1|
| a2| b2| c2|
| a3| b2| c3|
+---+---+---+

where the test.txt looks like

a1;b1;c1**a2;b2;c2**a3;b2;c3

这篇关于使用自定义列读取pyspark中的文件并记录分隔符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆