如何对List< Objects>的byte []进行解码.到Dataset< Row>在火花? [英] How to decode a byte[] of List<Objects> to Dataset<Row> in spark?

查看:68
本文介绍了如何对List< Objects>的byte []进行解码.到Dataset< Row>在火花?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的项目中将spark-sql-2.3.1v和kafka与java8一起使用. 我正在尝试将主题接收的byte []转换为kafka用户方的数据集.

Me using spark-sql-2.3.1v , kafka with java8 in my project. I am trying to convert topic received byte[] to Dataset at kafka consumer side.

这是详细信息

我有

class Company{
    String companyName;
    Integer companyId;
}

我定义为

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

但是消息定义为

class Message{
    private List<Company> companyList;
    private String messageId;
}

我试图定义为

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

我使用序列化将消息作为byte []发送到kafka主题.

I sent the Message to kafka topic as byte[] using serialization .

我在Consumer上成功接收到消息字节[]. 我正在尝试将其转换为数据集?怎么做?

I successfully received the message byte [] at consumer . Which I am trying to convert as Dataset ?? how to do it ?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)    

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

遇到错误:

线程"main" org.apache.spark.sql.AnalysisException中的异常:给定输入列,无法解析"companyName":[col];

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];

如何获取数据集记录,如何获取?

How to get Dataset records , how to get it ?

推荐答案

爆炸时,您的结构体使用"col"命名.

Your struct got named with "col" when exploding.

由于您的Bean类没有"col"属性,因此失败,并提到了错误.

Since your Bean class doesn't have "col" attribute, it is failing with mentioned error.

线程主要" org.apache.spark.sql.AnalysisException中的异常: 给定输入列无法解析"companyName":[col];

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];

您可以执行以下选择以使相关列作为普通列: 像这样:

You can do following select to get relevant columns as plain column: Something like this:

    Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

我还没有测试语法,但是一旦从struct的每一行中获取了普通列,就必须立即进行下一步.

I haven't tested syntax but must work your next step as soon as you get plain columns from struct for each row.

这篇关于如何对List&lt; Objects&gt;的byte []进行解码.到Dataset&lt; Row&gt;在火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆