如何编写数据集编码器以支持在Scala Spark中将函数映射到org.apache.spark.sql.Dataset [String] [英] How do I write a Dataset encoder to support mapping a function to a org.apache.spark.sql.Dataset[String] in Scala Spark

查看:139
本文介绍了如何编写数据集编码器以支持在Scala Spark中将函数映射到org.apache.spark.sql.Dataset [String]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从Spark 1.6到Spark 2.2 *的移动带来了错误错误:无法找到存储在'Dataset'中的类型的编码器.尝试将方法应用于通过查询实木复合地板表返回的数据集时,将使用原始类型(Int,String等)".我过分简化了代码以演示相同的错误.该代码查询实木复合地板文件以返回以下数据类型:'org.apache.spark.sql.Dataset [org.apache.spark.sql.Row]'我应用了一个函数来提取一个字符串和一个整数,返回一个字符串.返回以下内容数据类型:Array [String]接下来,我需要执行需要单独功能的大量操作.在此测试函数中,我尝试附加一个字符串,该字符串产生与我的详细示例相同的错误.我尝试了一些编码器示例,并使用了"case",但没有提出可行的解决方案.任何建议/示例将不胜感激

Moving from Spark 1.6 to Spark 2.2* has brought the error "error: Unable to find encoder for type stored in a 'Dataset'. Primitive types (Int, String, etc)" when trying to apply a method to a dataset returned from querying a parquet table. I have oversimplified my code to demonstrate the same error. The code queries a parquet file to return the following datatype: 'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' I apply a function to extract a string and integer , returning a string. Returning the following datatype: Array[String] Next, I need to perform extensive manipulations requiring a separate function. In this test function, I try to append a string producing the same error as my detailed example. I have tried some encoder examples and use of the ‘case’ but have not come up with a workable solution. Any suggestions/ examples would be appreciated

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

为进一步提炼问题,我认为可以将这种情况(尽管我尚未尝试所有可能的解决方案)简化为以下代码:

To distill the problem further, i believe this scenario (though I have not tried all possible solutions to) can be simplified further to the following code:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }

推荐答案

第一个解决方案不适用于我的初始(生产)数据集,而是产生错误"org.apache.spark.SparkException:任务不可序列化"(有趣的是,尽管两者都存储为相同的数据类型(org.apache.spark.sql.Dataset [String] = [value:string]),但我认为这是相关的.编码器错误(如上所示)实际上解决了我的玩具问题,但并没有增加到生产数据集.对于为什么我的应用程序从1.6版本升级到2.3版本时产生火花,我有点困惑,因为我不必做任何事情多年以来一直为我的应用程序提供特殊的适应性,并成功运行了该应用程序,以进行最有可能数以万亿计的计算.其他探索包括将我的方法包装为Serializable,探索@transient关键字,并利用"org.apache.spark.serializer".KryoSerializer",将我的方法编写为函数和chan将所有vars都化为"vals"(紧随"stack"上的相关文章).

The first solution does not work on my initial (production) data set, rather producing the error "org.apache.spark.SparkException: Task not serializable" (interestingly though both stored as the same data type (org.apache.spark.sql.Dataset[String] = [value: string]) which I believe to be related. I included yet another solution to my test data set that eliminates the initial Encoder error and as shown actually works on my toy problem, does not ramp to a production data set. A bit confused as to exactly why my application is sidelined in the movement from 1.6 to 2.3 version spark as I didn't have to make any special accommodations to my application for years and have run it successfully for calculations that most likely count in the trillions. Other explorations have included wrapping my method as Serializable, explorations of the @transient keyword, leveraging the "org.apache.spark.serializer.KryoSerializer", writing my methods as functions and changing all vars to 'vals' (following related posts on 'stack').

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

scala>

这篇关于如何编写数据集编码器以支持在Scala Spark中将函数映射到org.apache.spark.sql.Dataset [String]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆