无法使用PySpark读取CSV字符串 [英] Can't read CSV string using PySpark

查看:43
本文介绍了无法使用PySpark读取CSV字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

方案为::EventHub-> Azure Databricks(使用pyspark)

The scenario is: EventHub -> Azure Databricks (using pyspark)

文件格式: CSV(带引号,竖线分隔和自定义架构)

File format: CSV (Quoted, Pipe delimited and custom schema )

我正在尝试读取来自eventhub的CSV字符串.Spark已成功使用正确的架构创建了数据框,但在每条消息后,数据框最终都为空.

I am trying to read CSV strings comming from eventhub. Spark is successfully creating the dataframe with the proper schema, but the dataframe end up empty after every message.

我设法在流环境之外进行了一些测试,从文件中获取数据时一切正常,但是当数据来自字符串时失败.

I managed to do some tests outside streaming environment, and when getting the data from a file, all goes well, but it fails when the data comes from a string.

所以我找到了一些链接可以帮助我,但是没有一个起作用:

So I found some links to help me on this, but none worked:

Pyspark-将json字符串转换为DataFrame

现在我有以下代码:

schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])

df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)

df.show()

甚至可以处理CSV文件吗?

Is that even possible to do with CSV files?

推荐答案

您可以通过 | 上的 Row split 构造这样的架构分隔符

You can construct schema like this via Row and split on | delimiter

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
                   x[1],\
                   x[2],\
                   x[3],\
                   x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])

for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')

schemaDF.printSchema()
root
 |-- Decisao: string (nullable = true)
 |-- PedidoID: string (nullable = true)
 |-- De_LastUpdated: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)

这篇关于无法使用PySpark读取CSV字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆