使用Scala从Spark的数组数组中的结构中提取值 [英] Extract value from structure within an array of arrays in spark using scala

查看:297
本文介绍了使用Scala从Spark的数组数组中的结构中提取值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala将json数据读取到spark数据帧中. 架构如下:

 root
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- playerId: string (nullable = true)
 |    |    |-- sources: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- matchId: long (nullable = true)

数据如下:

{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] }

目标是查找playerId是否为1234以及matchId是否为1,然后将isPlayed作为true返回.源的结构不是固定的.除了matchId之外,可能还有其他字段.

我写了一个udf,它考虑了元数据为WrappedArray [String]类型,并且能够读取playerId列

def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => { 
  metadata.contains(playerId)
  })

df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId")))

但是我无法弄清楚如何在给定playerId的情况下查询matchId字段.我尝试将字段读取为WrappedArray [WrappedArray [Long]],但它在metaColumn.sources.matchId列的withColumn中给出了类型转换异常.

我对Spark还是比较陌生.任何帮助将不胜感激.

干杯!

解决方案

在处理JSON时,请了解内置函数explode,该函数会将包含WrappedArray的单个单元格转换为表示以下内容的多行内部的结构.我认为这对您有帮助(两次):

df.select(explode($"metadata").as("metadata"))
  .select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId"))
  .filter($"matchId".equalTo(1))
  .select($"matchId", lit(true).as("isPlayed"))

基本上,我使用explode创建多行(并重命名为方便的名称),将对象树导航到所需的JSON字段,对matchId重复explode/重命名过程,然后过滤掉所有内容不是1这使我最终可以使用lit函数对true的全新列硬编码" true的值,因为所有不是1的东西都消失了.

如果这不是您要找的东西,希望它能为您提供一些指导. functions 对您了解Spark会很有帮助.

I am reading json data into spark data frame using scala. The schema is as follows:

 root
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- playerId: string (nullable = true)
 |    |    |-- sources: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- matchId: long (nullable = true)

The data looks as follows:

{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] }

The goal is to find if playerId is 1234 and matchId is 1, then return isPlayed as true. The structure of sources is not fixed. There might be fields other than matchId.

I wrote a udf considering metadata to be of type WrappedArray[String] and am able to read the playerId column

def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => { 
  metadata.contains(playerId)
  })

df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId")))

But I am not able to figure out how to query the matchId field given playerId. I tried reading the field as WrappedArray[WrappedArray[Long]] but it gives a typecasting exception in withColumn on metadata.sources.matchId column.

I am relatively new to Spark. Any help would be deeply appreciated.

Cheers!

解决方案

When you are dealing with JSON, get to know the built-in function explode, which turns a single cell containing a WrappedArray into multiple rows representing the structs inside. I think it helps here (twice):

df.select(explode($"metadata").as("metadata"))
  .select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId"))
  .filter($"matchId".equalTo(1))
  .select($"matchId", lit(true).as("isPlayed"))

Basically I use explode to create multiple rows (and rename to something convenient), navigate the object tree to the JSON fields I want, repeat the explode/rename process for the matchId, and filter out everything that isn't 1 This allows me finally to use the lit function to "hardcode" values of true for a brand new column called isPlayed since everything that isn't 1 is gone.

If this isn't exactly what you're looking for, hopefully it gives you some pointers. The functions library can be very helpful to you as you get to know Spark.

这篇关于使用Scala从Spark的数组数组中的结构中提取值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆