如何在Spark 2.2.0中将Json字符串数组转换为特定列的数据集? [英] How to convert Array of Json Strings into Dataset of specific columns in Spark 2.2.0?
本文介绍了如何在Spark 2.2.0中将Json字符串数组转换为特定列的数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个Dataset<String> ds
,其中包含json行.
I have a Dataset<String> ds
which consists of json rows.
示例Json行(这只是数据集中一行的示例)
[
"{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}",
"{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}"
]
ds.printSchema()
ds.printSchema()
root
|-- value: string (nullable = true)
现在我想使用Spark 2.2.0转换为以下数据集
Now I want to convert into the following dataset using Spark 2.2.0
name | address | docs
----------------------------------------------------------------------------------
"foo" | {"state": "CA", "country": "USA"} | [{"subject": "english", "year": 2016}]
"bar" | {"state": "OH", "country": "USA"} | [{"subject": "math", "year": 2017}]
最好使用Java,但只要Java API中有可用的功能,Scala也是可以的
Preferably Java but Scala is also fine as long as there are functions available in Java API
这是我到目前为止尝试过的
Here is what I tried so far
val df = Seq("""["{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}", "{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}" ]""").toDF
df.show(false)
df.show(false)
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|["{"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}", "{"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]}" ]|
推荐答案
我在Java中找到了一种解决方法.我希望这会有所帮助.
I have found a workaround in Java. I hope this helps.
创建一个Bean类(在我的情况下为TempBean)
Create a Bean class (TempBean in my case)
import java.util.List;
import java.util.Map;
public class TempBean
{
String name;
Map<String, String> address;
List<Map<String, String>> docs;
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public Map<String, String> getAddress()
{
return address;
}
public void setAddress(Map<String, String> address)
{
this.address = address;
}
public List<Map<String, String>> getDocs()
{
return docs;
}
public void setDocs(List<Map<String, String>> docs)
{
this.docs = docs;
}
}
在以下导入中使用以下代码:
Use the following code with below imports:
//import com.fasterxml.jackson.core.JsonGenerator;
//import com.fasterxml.jackson.core.JsonParseException;
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.core.type.TypeReference;
//import com.fasterxml.jackson.databind.JsonMappingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
ObjectMapper mapper = new ObjectMapper();
List<String> dfList = ds.collectAsList(); //using your Dataset<String>
List<TempBean> tempList = new ArrayList<TempBean>();
try
{
for (String json : dfList)
{
List<Map<String, Object>> mapList = mapper.readValue(json, new TypeReference<List<Map<String, Object>>>(){});
for(Map<String,Object> map : mapList)
{
TempBean temp = new TempBean();
temp.setName(map.get("name").toString());
temp.setAddress((Map<String,String>)map.get("address"));
temp.setDocs((List<Map<String,String>>)map.get("docs"));
tempList.add(temp);
}
}
}
catch (JsonParseException e)
{
e.printStackTrace();
}
catch (JsonMappingException e)
{
e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
}
创建数据框:
Dataset<Row> dff = spark.createDataFrame(tempList, TempBean.class);
显示数据库
dff.show(false);
+--------------------------------+---------------------------------------+----+
|address |docs |name|
+--------------------------------+---------------------------------------+----+
|Map(state -> CA, country -> USA)|[Map(subject -> english, year -> 2016)]|foo |
|Map(state -> OH, country -> USA)|[Map(subject -> math, year -> 2017)] |bar |
+--------------------------------+---------------------------------------+----+
打印模式:
dff.printSchema();
root
|-- address: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- docs: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- name: string (nullable = true)
这篇关于如何在Spark 2.2.0中将Json字符串数组转换为特定列的数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文