将索引列添加到Apache Spark Dataset< Row>中.使用java [英] Add index column to apache spark Dataset<Row> using java

查看:190
本文介绍了将索引列添加到Apache Spark Dataset< Row>中.使用java的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下问题为scala和pyspark提供了解决方案,此问题中提供的解决方案不适用于连续的索引值.

The below question has solution for scala and pyspark and the solution provided in this question is not for consecutive index values.

Spark数据帧:如何添加索引Column:Aka分布式数据索引

我在Apache-spark中有一个现有的数据集,我想根据索引从中选择一些行.我计划添加一个索引列,其中包含从1开始的唯一值,并且基于该列的值,我将获取行. 我发现以下添加使用order by的索引的方法:

I have an existing Dataset in Apache-spark and i want to select some rows from it based on the index. I am planning to add one index column that contains unique values staring from 1 and based on the values of that column i will fetch rows. I found below method to add index that uses order by:

df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));

我不想使用order by.我需要按照它们在数据集中出现的顺序排列索引.有帮助吗?

I do not want to use order by. I need index in the same order they are present in Dataset. Any help?

推荐答案

根据我的收集,您正在尝试向数据帧添加索引(具有连续值).不幸的是,Spark中没有内置功能可以做到这一点.您只能使用df.withColumn("index",monotonicallyIncreasingId)添加一个递增的索引(但不一定具有连续值).

From what I gather, you are trying to add an index (with consecutive values) to a dataframe. Unfortunately, there is no built in function that does that in Spark. You can only add an increasing index (but not necessarily with consecutive values) with df.withColumn("index", monotonicallyIncreasingId).

尽管如此,RDD API中仍然存在一个zipWithIndex函数,该函数可以完全满足您的需求.因此,我们可以定义一个将数据帧转换为RDD,添加索引并将其转换回数据帧的函数.

Nonetheless, there exists a zipWithIndex function in the RDD API that does exactly what you need. We can thus define a function that transforms the dataframe into a RDD, adds the index and transforms it back into a dataframe.

我不是Java火花专家(scala更加紧凑),所以可能有更好的选择.这就是我的做法.

I'm not an expert in spark in java (scala is much more compact) so it might be possible to do better. Here is how I would do it.

public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
    JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
        Row r = t._1;
        Long index = t._2 + 1;
        ArrayList<Object> list = new ArrayList<>();
        r.toSeq().iterator().foreach(x -> list.add(x));
        list.add(index);
        return RowFactory.create(list);
    });
    StructType newSchema = df.schema()
            .add(new StructField(name, DataTypes.LongType, true, null));
    return df.sparkSession().createDataFrame(rdd, newSchema);
}

这是您将如何使用它.请注意,与我们的方法相反,内置的spark函数的作用.

And here is how you would use it. Notice what the built in spark function does in contrast with what our approach does.

Dataset<Row> df = spark.range(5)
    .withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");

// df
+---+-----------+
| id|     index1|
+---+-----------+
|  0|          0|
|  1| 8589934592|
|  2|17179869184|
|  3|25769803776|
|  4|25769803777|
+---+-----------+

// result
+---+-----------+----------+
| id|     index1|good_index|
+---+-----------+----------+
|  0|          0|         1|
|  1| 8589934592|         2|
|  2|17179869184|         3|
|  3|25769803776|         4|
|  4|25769803777|         5|
+---+-----------+----------+

这篇关于将索引列添加到Apache Spark Dataset&lt; Row&gt;中.使用java的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆