如何为Pyspark createDataFrame(rdd,schema)定义架构? [英] How to define schema for Pyspark createDataFrame(rdd, schema)?

查看:190
本文介绍了如何为Pyspark createDataFrame(rdd,schema)定义架构?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我查看了 spark-rdd到数据框.

我将gzip压缩的json读入rdd

I read my gziped json into rdd

rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')

我想将其转换为spark数据框.链接的SO问题中的第一种方法不起作用.这是文件的第一行

I want to convert it to spark dataframe. The first method from the linked SO question does not work. This is the first row form the file

{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}

如何推断模式?

SO回答

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

为什么范围(32)?

推荐答案

range(32)只是一个示例-它们正在生成具有32列的架构,每个列都有编号作为一个名字.如果您确实要定义架构,则需要显式定义每列:

range(32) in that example is just an example - they are generating schema with 32 columns, each of them having the number as a name. If you really want to define schema, then you need to explicitly define every column:

from pyspark.sql.types import *
schema = StructType([
    StructField('code_event', IntegerType(), True),
    StructField('code_event_system', StringType(), True),
    ...
    ])

但是更好的方法是避免使用RDD API,并使用以下代码将文件直接读取到数据帧中(请参见

But better way would be to avoid use of the RDD API, and directly read the file into a dataframe with following code (see documentation):

>>> data = spark.read.json('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
>>> data.printSchema()
root
 |-- code_event: string (nullable = true)
 |-- code_event_system: string (nullable = true)
 |-- company_id: string (nullable = true)
 |-- date_event: string (nullable = true)
 |-- date_event_real: string (nullable = true)
 |-- ecode_class: string (nullable = true)
 |-- ecode_event: string (nullable = true)
 |-- eperiod_event: string (nullable = true)
 |-- etl_date: string (nullable = true)
....

这篇关于如何为Pyspark createDataFrame(rdd,schema)定义架构?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆