如何为Pyspark createDataFrame(rdd,schema)定义架构? [英] How to define schema for Pyspark createDataFrame(rdd, schema)?
问题描述
我查看了 spark-rdd到数据框.
我将gzip压缩的json读入rdd
I read my gziped json into rdd
rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
我想将其转换为spark数据框.链接的SO问题中的第一种方法不起作用.这是文件的第一行
I want to convert it to spark dataframe. The first method from the linked SO question does not work. This is the first row form the file
{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
如何推断模式?
SO回答
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
为什么范围(32)?
推荐答案
range(32)
只是一个示例-它们正在生成具有32列的架构,每个列都有编号作为一个名字.如果您确实要定义架构,则需要显式定义每列:
range(32)
in that example is just an example - they are generating schema with 32 columns, each of them having the number as a name. If you really want to define schema, then you need to explicitly define every column:
from pyspark.sql.types import *
schema = StructType([
StructField('code_event', IntegerType(), True),
StructField('code_event_system', StringType(), True),
...
])
但是更好的方法是避免使用RDD API,并使用以下代码将文件直接读取到数据帧中(请参见
But better way would be to avoid use of the RDD API, and directly read the file into a dataframe with following code (see documentation):
>>> data = spark.read.json('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
>>> data.printSchema()
root
|-- code_event: string (nullable = true)
|-- code_event_system: string (nullable = true)
|-- company_id: string (nullable = true)
|-- date_event: string (nullable = true)
|-- date_event_real: string (nullable = true)
|-- ecode_class: string (nullable = true)
|-- ecode_event: string (nullable = true)
|-- eperiod_event: string (nullable = true)
|-- etl_date: string (nullable = true)
....
这篇关于如何为Pyspark createDataFrame(rdd,schema)定义架构?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!