无法使用PySpark读取CSV字符串 [英] Can't read CSV string using PySpark
问题描述
方案为::EventHub-> Azure Databricks(使用pyspark)
The scenario is: EventHub -> Azure Databricks (using pyspark)
文件格式: CSV(带引号,竖线分隔和自定义架构)
File format: CSV (Quoted, Pipe delimited and custom schema )
我正在尝试读取来自eventhub的CSV字符串.Spark已成功使用正确的架构创建了数据框,但在每条消息后,数据框最终都为空.
I am trying to read CSV strings comming from eventhub. Spark is successfully creating the dataframe with the proper schema, but the dataframe end up empty after every message.
我设法在流环境之外进行了一些测试,从文件中获取数据时一切正常,但是当数据来自字符串时失败.
I managed to do some tests outside streaming environment, and when getting the data from a file, all goes well, but it fails when the data comes from a string.
所以我找到了一些链接可以帮助我,但是没有一个起作用:
So I found some links to help me on this, but none worked:
现在我有以下代码:
schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)
df.show()
甚至可以处理CSV文件吗?
Is that even possible to do with CSV files?
推荐答案
您可以通过 |
上的 Row
和 split
构造这样的架构分隔符
You can construct schema like this via Row
and split
on |
delimiter
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
x[1],\
x[2],\
x[3],\
x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])
for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')
schemaDF.printSchema()
root
|-- Decisao: string (nullable = true)
|-- PedidoID: string (nullable = true)
|-- De_LastUpdated: string (nullable = true)
|-- col4: string (nullable = true)
|-- col5: string (nullable = true)
这篇关于无法使用PySpark读取CSV字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!