根据以下代码,如何转换JavaRDD< Integer>.到DataFrame或DataSet [英] From the following code how to convert a JavaRDD<Integer> to DataFrame or DataSet

查看:120
本文介绍了根据以下代码,如何转换JavaRDD< Integer>.到DataFrame或DataSet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

从上面的代码,我无法将JavaRdd(mappartRdd)转换为Java Spark中的DataFrame. 我正在使用下面的方法将JavaRdd转换为DataFrame/DataSet.

From the above code, i am unable to convert the JavaRdd (mappartRdd) to DataFrame in Java Spark. I am using the below to convert JavaRdd to DataFrame/DataSet.

sessn.createDataFrame(mappartRdd, beanClass);

我为createDataFrame尝试了多个选项和不同的重载函数.我面临将其转换为DF的问题.我需要提供什么beanclass才能使代码正常工作?

I tried multiple options and different overloaded functions for createDataFrame. I am facing issues to convert it to DF. what is the beanclass I need to provide for the code to work?

与scala不同,没有像toDF()这样的函数可以将RDD转换为Java中的DataFrame.有人可以根据我的要求协助将其转换.

Unlike scala, there is no function like toDF() to convert the RDD to DataFrame in Java. can someone assist to convert it as per my requirement.

注意:我可以通过如下修改上面的代码直接创建数据集.

Note: I am able to create a Dataset directly by modifying the above code as below.

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

但是我想知道为什么如果我使用createDataFrame不能将JavaRdd转换为DF/DS.任何帮助将不胜感激.

But I want to know why my JavaRdd is not getting converted to DF/DS if i use createDataFrame. Any help will be greatly appreciated.

推荐答案

这似乎是

我认为,您正处于学习阶段.我建议您了解提供的Java的api- https://spark .apache.org/docs/latest/api/java/index.html

关于您的问题,如果您检查createDataFrame api,则如下所示-

Regarding your question, if you check the createDataFrame api, it is as follows-

 def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}

如您所见,它将JavaRDD[Row]和相关的StructType模式作为args.因此,要创建等于Dataset<Row>DataFrame,请在以下代码段-

As you see, it takes JavaRDD[Row] and related StructType schema as args. Hence to create DataFrame which is equal to Dataset<Row> use below snippet-

JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

  StructType schema = new StructType()
                .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
        Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
        df.show(false);
        df.printSchema();

        /**
         * +-----+
         * |value|
         * +-----+
         * |6    |
         * |8    |
         * |6    |
         * +-----+
         *
         * root
         *  |-- value: integer (nullable = true)
         */

这篇关于根据以下代码,如何转换JavaRDD&lt; Integer&gt;.到DataFrame或DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆