Spark - 创建嵌套数据帧 [英] Spark - Creating Nested DataFrame

查看:27
本文介绍了Spark - 创建嵌套数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 PySpark 开始,但在创建带有嵌套对象的 DataFrame 时遇到了麻烦.

这是我的例子.

我有用户.

$ cat user.json{"id":1,"name":"UserA"}{"id":2,"name":"UserB"}

用户有订单.

$ cat order.json{"id":1,"price":202.30,"userid":1}{"id":2,"price":343.99,"userid":1}{"id":3,"price":399.99,"userid":2}

我喜欢加入它以获得这样一个结构,其中订单是嵌套在用户中的数组.

$ cat join.json{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

我该怎么做?是否有任何类型的嵌套连接或类似的东西?

<预><代码>>>>user = sqlContext.read.json("user.json")>>>user.printSchema();根|-- id: long (nullable = true)|-- 名称:字符串(可为空 = 真)>>>order = sqlContext.read.json("order.json")>>>order.printSchema();根|-- id: long (nullable = true)|-- 价格:双倍(可为空 = 真)|-- userid: long (nullable = true)>>>加入 = sqlContext.read.json("join.json")>>>加入.printSchema();根|-- id: long (nullable = true)|-- 名称:字符串(可为空 = 真)|-- 订单:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- id: long (nullable = true)|||-- 价格:双倍(可为空 = 真)|||-- userid: long (nullable = true)

我知道可以使用 join 和 foldByKey 来做到这一点,但有没有更简单的方法?

我正在使用 @zero323 的解决方案

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")

我添加了第二个嵌套结构线"

<预><代码>>>>行 = sqlContext.read.json(path + "lines.json")>>>线.printSchema();根|-- id: long (nullable = true)|-- orderid: long (nullable = true)|-- 产品:字符串(可为空 = 真)订单 = joinTable(订单,行,id",orderid",行")加入 = joinTable(用户,订单,id",userid",订单")加入.printSchema()根|-- id: long (nullable = true)|-- 名称:字符串(可为空 = 真)|-- 订单:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- id: long (nullable = true)|||-- 价格:双倍(可为空 = 真)|||-- userid: long (nullable = true)|||-- 行:数组(可为空 = 真)||||-- 元素: struct (containsNull = true)|||||-- _1: 长(可为空 = 真)|||||-- _2: 长(可为空 = 真)|||||-- _3: 字符串(可为空 = 真)

此后行中的列名丢失.有什么想法吗?

编辑 3:我尝试手动指定架构.

from pyspark.sql.types import *字段 = []fields.append(StructField("_1", LongType(), True))内部 = ArrayType(lines.schema)fields.append(StructField("_2", 内部))new_schema = StructType(fields)打印新模式grouped = lines.rdd.groupBy(lambda r: r.orderid)grouped = grouped.map(lambda x: (x[0], list(x[1])))g = sqlCtx.createDataFrame(grouped, new_schema)

错误:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) 不能接受类型为 

解决方案

这仅适用于 Spark 2.0 或更高版本

首先我们需要一些导入:

from pyspark.sql.functions import struct, collect_list

剩下的就是简单的聚合和连接:

orders = spark.read.json("/path/to/order.json")users = spark.read.json("/path/to/user.json")合并 = users.join(订单.groupBy("userId").agg(collect_list(struct(*orders.columns)).alias("orders")).withColumnRenamed("userId", "id"), ["id"])

对于示例数据,结果为:

combined.show(2, False)

+---+-----+---------------------------+|id |名称 |订单 |+---+-----+---------------------------+|1 |用户A|[[1,202.3,1], [2,343.99,1]]||2 |用户B|[[3,399.99,2]] |+---+-----+---------------------------+

使用架构:

combined.printSchema()

root|-- id: long (nullable = true)|-- 名称:字符串(可为空 = 真)|-- 订单:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- id: long (nullable = true)|||-- 价格:双倍(可为空 = 真)|||-- userid: long (nullable = true)

和 JSON 表示:

for x in combine.toJSON().collect():打印(x)

{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

I'm starting with PySpark and I'm having troubles with creating DataFrames with nested objects.

This is my example.

I have users.

$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}

Users have orders.

$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

And I like to join it to get such a struct where orders are array nested in users.

$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

How can I do that ? Is there any kind of nested join or something similar ?

>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> order =  sqlContext.read.json("order.json")
>>> order.printSchema();
root
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- userid: long (nullable = true)

>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

EDIT: I know there is possibility to do this using join and foldByKey, but is there any simpler way ?

EDIT2: I'm using solution by @zero323

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")

I add second nested structure 'lines'

>>> lines =  sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
 |-- id: long (nullable = true)
 |-- orderid: long (nullable = true)
 |-- product: string (nullable = true)

orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
 |    |    |-- lines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _1: long (nullable = true)
 |    |    |    |    |-- _2: long (nullable = true)
 |    |    |    |    |-- _3: string (nullable = true)

After this column names from lines are lost. Any ideas ?

EDIT 3: I tried to manual specify schema.

from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema

grouped =  lines.rdd.groupBy(lambda r: r.orderid)
grouped =  grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)

Error:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>

解决方案

This will work only in Spark 2.0 or later

First we'll need a couple of imports:

from pyspark.sql.functions import struct, collect_list

The rest is a simple aggregation and join:

orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")

combined = users.join(
    orders
        .groupBy("userId")
        .agg(collect_list(struct(*orders.columns)).alias("orders"))
        .withColumnRenamed("userId", "id"), ["id"])

For the example data the result is:

combined.show(2, False)

+---+-----+---------------------------+
|id |name |orders                     |
+---+-----+---------------------------+
|1  |UserA|[[1,202.3,1], [2,343.99,1]]|
|2  |UserB|[[3,399.99,2]]             |
+---+-----+---------------------------+

with schema:

combined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

and JSON representation:

for x in combined.toJSON().collect():
    print(x)     

{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

这篇关于Spark - 创建嵌套数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆