将索引列添加到Apache Spark Dataset< Row>中.使用java [英] Add index column to apache spark Dataset<Row> using java
问题描述
以下问题为scala和pyspark提供了解决方案,此问题中提供的解决方案不适用于连续的索引值.
The below question has solution for scala and pyspark and the solution provided in this question is not for consecutive index values.
Spark数据帧:如何添加索引Column:Aka分布式数据索引
我在Apache-spark中有一个现有的数据集,我想根据索引从中选择一些行.我计划添加一个索引列,其中包含从1开始的唯一值,并且基于该列的值,我将获取行. 我发现以下添加使用order by的索引的方法:
I have an existing Dataset in Apache-spark and i want to select some rows from it based on the index. I am planning to add one index column that contains unique values staring from 1 and based on the values of that column i will fetch rows. I found below method to add index that uses order by:
df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));
我不想使用order by.我需要按照它们在数据集中出现的顺序排列索引.有帮助吗?
I do not want to use order by. I need index in the same order they are present in Dataset. Any help?
推荐答案
根据我的收集,您正在尝试向数据帧添加索引(具有连续值).不幸的是,Spark中没有内置功能可以做到这一点.您只能使用df.withColumn("index",monotonicallyIncreasingId
)添加一个递增的索引(但不一定具有连续值).
From what I gather, you are trying to add an index (with consecutive values) to a dataframe. Unfortunately, there is no built in function that does that in Spark. You can only add an increasing index (but not necessarily with consecutive values) with df.withColumn("index", monotonicallyIncreasingId
).
尽管如此,RDD API中仍然存在一个zipWithIndex
函数,该函数可以完全满足您的需求.因此,我们可以定义一个将数据帧转换为RDD,添加索引并将其转换回数据帧的函数.
Nonetheless, there exists a zipWithIndex
function in the RDD API that does exactly what you need. We can thus define a function that transforms the dataframe into a RDD, adds the index and transforms it back into a dataframe.
我不是Java火花专家(scala更加紧凑),所以可能有更好的选择.这就是我的做法.
I'm not an expert in spark in java (scala is much more compact) so it might be possible to do better. Here is how I would do it.
public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
r.toSeq().iterator().foreach(x -> list.add(x));
list.add(index);
return RowFactory.create(list);
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, null));
return df.sparkSession().createDataFrame(rdd, newSchema);
}
这是您将如何使用它.请注意,与我们的方法相反,内置的spark函数的作用.
And here is how you would use it. Notice what the built in spark function does in contrast with what our approach does.
Dataset<Row> df = spark.range(5)
.withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id| index1|
+---+-----------+
| 0| 0|
| 1| 8589934592|
| 2|17179869184|
| 3|25769803776|
| 4|25769803777|
+---+-----------+
// result
+---+-----------+----------+
| id| index1|good_index|
+---+-----------+----------+
| 0| 0| 1|
| 1| 8589934592| 2|
| 2|17179869184| 3|
| 3|25769803776| 4|
| 4|25769803777| 5|
+---+-----------+----------+
这篇关于将索引列添加到Apache Spark Dataset< Row>中.使用java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!