Spark-创建嵌套的DataFrame [英] Spark - Creating Nested DataFrame
问题描述
我从PySpark开始,在创建带有嵌套对象的DataFrame时遇到麻烦.
I'm starting with PySpark and I'm having troubles with creating DataFrames with nested objects.
这是我的例子.
我有用户.
$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}
用户下订单.
$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}
我喜欢加入它以获得这样的结构,其中订单以数组形式嵌套在用户中.
And I like to join it to get such a struct where orders are array nested in users.
$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
我该怎么做? 是否有任何嵌套的联接或类似的东西?
How can I do that ? Is there any kind of nested join or something similar ?
>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
>>> order = sqlContext.read.json("order.json")
>>> order.printSchema();
root
|-- id: long (nullable = true)
|-- price: double (nullable = true)
|-- userid: long (nullable = true)
>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
我知道可以使用join和foldByKey进行此操作,但是有没有更简单的方法呢?
I know there is possibility to do this using join and foldByKey, but is there any simpler way ?
我正在使用@ zero323
I'm using solution by @zero323
def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")
我添加了第二个嵌套结构行"
I add second nested structure 'lines'
>>> lines = sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
|-- id: long (nullable = true)
|-- orderid: long (nullable = true)
|-- product: string (nullable = true)
orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
| | |-- lines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _1: long (nullable = true)
| | | | |-- _2: long (nullable = true)
| | | | |-- _3: string (nullable = true)
在此列之后,行的名称将丢失. 有什么想法吗?
After this column names from lines are lost. Any ideas ?
我试图手动指定架构.
EDIT 3: I tried to manual specify schema.
from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema
grouped = lines.rdd.groupBy(lambda r: r.orderid)
grouped = grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)
错误:
TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>
推荐答案
仅在Spark 2.0或更高版本中有效
首先,我们需要几个导入:
First we'll need a couple of imports:
from pyspark.sql.functions import struct, collect_list
其余是简单的聚合和联接:
The rest is a simple aggregation and join:
orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")
combined = users.join(
orders
.groupBy("userId")
.agg(collect_list(struct(*orders.columns)).alias("orders"))
.withColumnRenamed("userId", "id"), ["id"])
对于示例数据,结果为:
For the example data the result is:
combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders |
+---+-----+---------------------------+
|1 |UserA|[[1,202.3,1], [2,343.99,1]]|
|2 |UserB|[[3,399.99,2]] |
+---+-----+---------------------------+
具有架构:
combined.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
和JSON表示形式
for x in combined.toJSON().collect():
print(x)
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
这篇关于Spark-创建嵌套的DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!