将类型为数组[字符串]的两列合并为新的数组[字符串]列 [英] Merge two columns of type Array[string] into a new Array[string] column

查看:20
本文介绍了将类型为数组[字符串]的两列合并为新的数组[字符串]列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark SQLDataFrame中有两列,每一列中的每个条目都是一个字符串数组。

val  ngramDataFrame = Seq(
  (Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")

我想合并每一行中的数组,以形成新列中的单个数组。我的代码如下:

def concat_array(firstarray: Array[String], 
                 secondarray: Array[String]) : Array[String] = 
                                     { (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))

我可以在两个数组上成功使用concat_array函数。但是,当我运行上面的代码时,我得到以下异常:

异常:作业因阶段故障而中止:阶段16.0中的任务0失败1次,最近的失败:阶段16.0中丢失的任务0.0(TID 12,本地主机):org.apache.spak.SparkException:无法执行用户定义的函数(anonFun$1:(ARRAY,数组)=>数组)在org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown源)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)在scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)在scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)在org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)在组织.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(线程.Java:748)起因:java.lang.ClassCastException:scala.cipltion.muable.Wrapped数组$ofRef不能强制转换为[Ljava.lang.String;在$line80.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:76)...又有13个驱动程序堆栈跟踪:

推荐答案

Arjun您创建的UDF中有一个错误。当您传递数组类型列时。数据类型不是数组[字符串],而是WRAPPEDARY[字符串]。下面我正在粘贴修改后的UDF和输出。

val SparkCtxt = new SparkContext(sparkConf)

val sqlContext = new SQLContext(SparkCtxt)

import sqlContext.implicits

import org.apache.spark.sql.functions._
val temp=SparkCtxt.parallelize(Seq(Row(Array("String1","String2"),Array("String3","String4"))))
val df= sqlContext.createDataFrame(temp,
  StructType(List(
    StructField("Col1",ArrayType(StringType),true),
    StructField("Col2",ArrayType(StringType),true)
  )
  )    )

def concat_array(firstarray: mutable.WrappedArray[String],
                 secondarray: mutable.WrappedArray[String]) : mutable.WrappedArray[String] =
{
 (firstarray ++ secondarray)
}
val concatUDF = udf(concat_array _)
val df2=df.withColumn("udftest",concatUDF(df.col("Col1"), df.col("Col2")))
df2.select("udftest").foreach(each=>{println("***********")
println(each(0))})
df2.show(true)

输出:

+------------------+------------------+--------------------+
|              Col1|              Col2|             udftest|
+------------------+------------------+--------------------+
|[String1, String2]|[String3, String4]|[String1, String2...|
+------------------+------------------+--------------------+

WrapedArray(String1,String2,String3,String4)

这篇关于将类型为数组[字符串]的两列合并为新的数组[字符串]列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆