如何产生 pandas 数据框行以激发数据框 [英] How to yield pandas dataframe rows to spark dataframe

查看:97
本文介绍了如何产生 pandas 数据框行以激发数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在进行转换,我已经创建了 some_function(iter)生成器以生成 yield Row(id = index,api = row ['api'],A = row ['A'],B = row ['B'] 来产生从熊猫数据帧到rdd的转换行并触发数据帧.我遇到了错误.(我必须使用熊猫来转换数据,因为有很多的旧代码)

Hi I'm making transformation, I have created some_function(iter) generator to yield Row(id=index, api=row['api'], A=row['A'], B=row['B'] to yield transformed rows from pandas dataframe to rdd and to spark dataframe. I'm getting errors. (I must use pandas to transform data as there is a large amount of legacy code)

输入Spark DataFrame

Input Spark DataFrame

respond_sdf.show()
    +-------------------------------------------------------------------+
    |content                                                            |
    +-------------------------------------------------------------------+
    |{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
    |{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
    +-------------------------------------------------------------------+

转换后的预期Spark Dataframe

Expected Spark Dataframe after transformation

transform_df.show()
    +-------------------+
    |  api   |  A  |  B |
    +-------------------+
    | api_1  |  1  |  4 |
    | api_1  |  3  |  5 |
    | api_1  |  4  |  6 |
    | api_2  |  7  | 10 |
    | api_2  |  8  | 11 |
    | api_2  |  9  | 12 |
    +-------------------+

最小示例代码

#### IMPORT PYSPARK ###

import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[*]') \
    .getOrCreate()
sc = spark.sparkContext


####### INPUT DATAFRAME WITH LIST OF JSONS ########################

rdd_list = [["{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"],
            ["{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }"]]

schema = StructType([StructField('content', StringType(), True)])

jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)


####### TRANSFORMATION DATAFRAME ########################

# Pandas transformation function returning pandas dataframe
def pandas_function(url_json):
    # Complex Pandas transformation
    url = url_json[0]
    json = url_json[1]
    df = pd.DataFrame(eval(json))
    return df

# Generator returing Row from pandas dataframe
def some_function(iter):
  # Pandas generator
  pandas_df = pandas_function(iter)
  for index, row in pandas_df.iterrows():
      ## ERROR COMES FROM THIS ROW
      yield Row(id=index, api=row['api'], A=row['A'], B=row['B'])

# Creating transformation spark dataframe
schema = StructType([
  StructField('API', StringType(), True),
  StructField('A', IntegerType(), True),
  StructField('B', IntegerType(), True)
  ])


rdd = respond_sdf.rdd.map(lambda x: some_function(x))
transform_df = spark.createDataFrame(rdd,schema)
transform_df.show()

我在下面遇到错误:

raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>

完整错误:

Py4JJavaError: An error occurred while calling o462.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 37.0 failed 1 times, most recent failure: Lost task 2.0 in stage 37.0 (TID 97, dpc, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 612, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1408, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>

我从下面的链接中获取建议: pySpark将mapPartitions的结果转换为Spark DataFrame

I'm following advice from the link below: pySpark convert result of mapPartitions to spark DataFrame

推荐答案

在Spark 3.0中,还有一个 mapInPandas 函数应该更有效,因为存在无需分组.

In Spark 3.0 there is also a mapInPandas function which should be more efficient because there is no need to group by.

import pyspark.sql.functions as F

def pandas_function(iterator):
    for df in iterator:
        yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))

transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()

另一种方式:使用 pandas_udf apply :

import pyspark.sql.functions as F

@F.pandas_udf("api string, A int, B int", F.PandasUDFType.GROUPED_MAP)
def pandas_function(url_json):
    df = pd.DataFrame(eval(url_json['content'][0]))
    return df

transformed_df = respond_sdf.groupBy(F.monotonically_increasing_id()).apply(pandas_function)
transformed_df.show()

+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_2|  7| 10|
|api_2|  8| 11|
|api_2|  9| 12|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
+-----+---+---+

旧答案(不是很可扩展...):

def pandas_function(url_json):
    df = pd.DataFrame(eval(url_json))
    return df

transformed_df = spark.createDataFrame(pd.concat(respond_sdf.rdd.map(lambda r: pandas_function(r[0])).collect()))
transformed_df.show()
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_2|  7| 10|
|api_2|  8| 11|
|api_2|  9| 12|
+-----+---+---+

这篇关于如何产生 pandas 数据框行以激发数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆