Spark Dataset mapGroups操作后,值类型为二进制,甚至在函数中返回String [英] Value Type is binary after Spark Dataset mapGroups operation even return a String in the function

查看:235
本文介绍了Spark Dataset mapGroups操作后,值类型为二进制,甚至在函数中返回String的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

环境:

Spark version: 2.3.0
Run Mode: Local
Java version: Java 8

spark应用程序尝试执行以下操作

The spark application trys to do the following

1)将输入数据转换为数据集[GenericRecord]

1) Convert input data into a Dataset[GenericRecord]

2)按GenericRecord的关键属性分组

2) Group by the key propery of the GenericRecord

3)组后使用mapGroups迭代值列表并以String格式获取一些结果

3) Using mapGroups after group to iterate the value list and get some result in String format

4)在文本文件中将结果输出为字符串.

4) Output the result as String in text file.

写入文本文件时发生错误.Spark推断,在步骤3中生成的数据集具有一个二进制列,而不是一个String列.但实际上,它在mapGroups函数中返回一个String.

The error happens when writing to text file. Spark deduced that the Dataset generated in step 3 has a binary column, not a String column. But actually it returns a String in the mapGroups function.

是否可以进行列数据类型转换或让Spark知道它实际上是字符串列而不是二进制列?

Is there a way to do the column data type convertion or let Spark knows that it is actually a string column not binary?


    val dslSourcePath = args(0)
    val filePath = args(1)
    val targetPath = args(2)
    val df = spark.read.textFile(filePath)

    implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)

    val mapResult = df.flatMap(abc => {
      JavaConversions.asScalaBuffer(some how return a list of Avro GenericRecord using a java library).seq;
    })

    val groupResult = mapResult.groupByKey(result => String.valueOf(result.get("key")))
      .mapGroups((key, valueList) => {
        val result = StringBuilder.newBuilder.append(key).append(",").append(valueList.count(_=>true))
        result.toString()
      })

    groupResult.printSchema()

    groupResult.write.text(targetPath + "-result-" + System.currentTimeMillis())


输出显示它是一个垃圾箱

And the output said it is a bin

root
 |-- value: binary (nullable = true)

Spark发出了一个错误,它不能将二进制形式写为文本:

Spark gives out an error that it can't write binary as text:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a string column, but you have binary.;
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat.verifySchema(TextFileFormat.scala:55)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat.prepareWrite(TextFileFormat.scala:78)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:595)

推荐答案

正如@ user10938362所说,原因是以下代码会将所有数据编码为字节

As @user10938362 said, the reason is the following code will encode all data to bytes

implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)

用下面的代码替换它只会为GenericRecord启用此编码

Replacing it with the following code will just enable this encoding for GenericRecord

implicit def kryoEncoder: Encoder[GenericRecord] = Encoders.kryo

这篇关于Spark Dataset mapGroups操作后,值类型为二进制,甚至在函数中返回String的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆