如何对List< Objects>的byte []进行解码.到Dataset< Row>在火花? [英] How to decode a byte[] of List<Objects> to Dataset<Row> in spark?
问题描述
我在我的项目中将spark-sql-2.3.1v和kafka与java8一起使用. 我正在尝试将主题接收的byte []转换为kafka用户方的数据集.
Me using spark-sql-2.3.1v , kafka with java8 in my project. I am trying to convert topic received byte[] to Dataset at kafka consumer side.
这是详细信息
我有
class Company{
String companyName;
Integer companyId;
}
我定义为
public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);
但是消息定义为
class Message{
private List<Company> companyList;
private String messageId;
}
我试图定义为
StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);
我使用序列化将消息作为byte []发送到kafka主题.
I sent the Message to kafka topic as byte[] using serialization .
我在Consumer上成功接收到消息字节[]. 我正在尝试将其转换为数据集?怎么做?
I successfully received the message byte [] at consumer . Which I am trying to convert as Dataset ?? how to do it ?
Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");
messagesDs.printSchema();
root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));
comapanyListDs.printSchema();
root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)
Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));
遇到错误:
线程"main" org.apache.spark.sql.AnalysisException中的异常:给定输入列,无法解析"companyName
":[col];
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName
' given input columns: [col];
如何获取数据集记录,如何获取?
How to get Dataset records , how to get it ?
推荐答案
爆炸时,您的结构体使用"col"命名.
Your struct got named with "col" when exploding.
由于您的Bean类没有"col"属性,因此失败,并提到了错误.
Since your Bean class doesn't have "col" attribute, it is failing with mentioned error.
线程主要" org.apache.spark.sql.AnalysisException中的异常: 给定输入列无法解析"companyName":[col];
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];
您可以执行以下选择以使相关列作为普通列: 像这样:
You can do following select to get relevant columns as plain column: Something like this:
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
我还没有测试语法,但是一旦从struct的每一行中获取了普通列,就必须立即进行下一步.
I haven't tested syntax but must work your next step as soon as you get plain columns from struct for each row.
这篇关于如何对List< Objects>的byte []进行解码.到Dataset< Row>在火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!