从以下代码如何转换 JavaRDD<Integer>到 DataFrame 或 DataSet [英] From the following code how to convert a JavaRDD&lt;Integer&gt; to DataFrame or DataSet

查看:38
本文介绍了从以下代码如何转换 JavaRDD<Integer>到 DataFrame 或 DataSet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

从上面的代码中,我无法在 Java Spark 中将 JavaRdd (mappartRdd) 转换为 DataFrame.我正在使用以下将 JavaRdd 转换为 DataFrame/DataSet.

From the above code, i am unable to convert the JavaRdd (mappartRdd) to DataFrame in Java Spark. I am using the below to convert JavaRdd to DataFrame/DataSet.

sessn.createDataFrame(mappartRdd, beanClass);

我为 createDataFrame 尝试了多个选项和不同的重载函数.我面临将其转换为 DF 的问题.我需要为代码工作提供什么 beanclass?

I tried multiple options and different overloaded functions for createDataFrame. I am facing issues to convert it to DF. what is the beanclass I need to provide for the code to work?

与 scala 不同,Java 中没有像 toDF() 这样的函数将 RDD 转换为 DataFrame.有人可以根据我的要求协助转换它.

Unlike scala, there is no function like toDF() to convert the RDD to DataFrame in Java. can someone assist to convert it as per my requirement.

注意:我可以通过如下修改上面的代码直接创建一个数据集.

Note: I am able to create a Dataset directly by modifying the above code as below.

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

但是我想知道如果我使用 createDataFrame,为什么我的 JavaRdd 没有转换为 DF/DS.任何帮助将不胜感激.

But I want to know why my JavaRdd is not getting converted to DF/DS if i use createDataFrame. Any help will be greatly appreciated.

推荐答案

这似乎是 这个问题

我认为,您正处于学习火花的阶段.我建议了解提供的 Java api - https://spark.apache.org/docs/latest/api/java/index.html

I think, you are in learning stage of spark. I would suggest to understand the apis for java provided - https://spark.apache.org/docs/latest/api/java/index.html

关于你的问题,如果你检查createDataFrame api,它如下-

Regarding your question, if you check the createDataFrame api, it is as follows-

 def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}

如您所见,它将 JavaRDD[Row] 和相关的 StructType 模式作为参数.因此创建 DataFrame 等于 Dataset 使用下面的代码片段-

As you see, it takes JavaRDD[Row] and related StructType schema as args. Hence to create DataFrame which is equal to Dataset<Row> use below snippet-

JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

  StructType schema = new StructType()
                .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
        Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
        df.show(false);
        df.printSchema();

        /**
         * +-----+
         * |value|
         * +-----+
         * |6    |
         * |8    |
         * |6    |
         * +-----+
         *
         * root
         *  |-- value: integer (nullable = true)
         */

这篇关于从以下代码如何转换 JavaRDD<Integer>到 DataFrame 或 DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆