Pyspark:如何在数据框列中转换json字符串 [英] Pyspark: How to transform json strings in a dataframe column

查看:723
本文介绍了Pyspark:如何在数据框列中转换json字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是或多或少简单的python代码,它们在功能上完全按照我的意愿提取.我要在数据框中过滤的列的数据模式基本上是一个json字符串.

The following is more or less straight python code which functionally extracts exactly as I want. The data schema for the column I'm filtering out within the dataframe is basically a json string.

但是,为此我不得不大大提高内存需求,并且我只在单个节点上运行.使用collect可能是不好的,并且在单个节点上创建所有这些实际上并没有利用Spark的分布式特性.

However, I had to greatly bump up the memory requirement for this and I'm only running on a single node. Using a collect is probably bad and creating all of this on a single node really isn't taking advantage of the distributed nature of Spark.

我想要一个以Spark为中心的解决方案.谁能帮我按摩下面的逻辑以更好地利用Spark?另外,作为学习点:请说明为什么/如何使更新更好.

I'd like a more Spark centric solution. Can anyone help me massage the logic below to better take advantage of Spark? Also, as a learning point: please provide an explanation for why/how the updates make it better.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json

from pyspark.sql.types import SchemaStruct, SchemaField, StringType


input_schema = SchemaStruct([
    SchemaField('scrubbed_col_name', StringType(), nullable=True)
])


output_schema = SchemaStruct([
    SchemaField('val01_field_name', StringType(), nullable=True),
    SchemaField('val02_field_name', StringType(), nullable=True)
])


example_input = [
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_a"},
        {"val01_field_name": "val01_a", "val02_field_name": "val02_b"},
        {"val01_field_name": "val01_b", "val02_field_name": "val02_c"}]''',
    '''[{"val01_field_name": "val01_c", "val02_field_name": "val02_a"}]''',
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_d"}]''',
]

desired_output = {
    'val01_a': ['val_02_a', 'val_02_b', 'val_02_d'],
    'val01_b': ['val_02_c'],
    'val01_c': ['val_02_a'],
}


def capture(dataframe):
    # Capture column from data frame if it's not empty
    data = dataframe.filter('scrubbed_col_name != null')\
                    .select('scrubbed_col_name')\
                    .rdd\
                    .collect()

    # Create a mapping of val1: list(val2)
    mapping = {}
    # For every row in the rdd
    for row in data:
        # For each json_string within the row
        for json_string in row:
            # For each item within the json string
            for val in json.loads(json_string):
                # Extract the data properly
                val01 = val.get('val01_field_name')
                val02 = val.get('val02_field_name')
                if val02 not in mapping.get(val01, []):
                    mapping.setdefault(val01, []).append(val02)
    return mapping

推荐答案

一种可能的解决方案:

(df
  .rdd  # Convert to rdd
  .flatMap(lambda x: x)  # Flatten rows
  # Parse JSON. In practice you should add proper exception handling
  .flatMap(lambda x: json.loads(x))
  # Get values
  .map(lambda x: (x.get('val01_field_name'), x.get('val02_field_name')))
  # Convert to final shape
  .groupByKey())

鉴于输出规范,此操作并非完全有效(您是否真的需要分组值?),但仍比collect好得多.

Given output specification this operation is not exactly efficient (do you really require grouped values?) but still much better than collect.

这篇关于Pyspark:如何在数据框列中转换json字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆