以编程方式推断模式以准备火花数据帧.来自RDD< Row>当某些Row对象可能包含不同数量的元素时 [英] Programmatically infer schema to prepare spark dataframe Dataset<Row> from RDD<Row> when some Row objects may contain different number of elements

查看:61
本文介绍了以编程方式推断模式以准备火花数据帧.来自RDD< Row>当某些Row对象可能包含不同数量的元素时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 neo4j-spark连接器.我可以通过调用RDD<Row> #L236"rel =" nofollow noreferrer> loadNodeRdds()方法.但是当我尝试获取调用

I am fetching neo4j node information in spark rdd using neo4j-spark connector. I can obtain RDD<Row> by calling loadNodeRdds() method. But when I try obtaining dataframe calling loadDataframe() method, it throws exception (skip stack trace if you find it too long as main question might turn out to be different in the end):

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true) AS Condition#4
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

(skipped a lot of rows as it made question reach its character limit)

我无法从上面的大量stackrace中获得很多收益.

I was not able to get much from big stackrace above.

所以我采用了JavaRDD<Row>并尝试通过以编程方式指定

So I took JavaRDD<Row> and tried converting it to DataFrame<Row> by programmatically specifying StructType schema.

StructType schema = loadSchema();
Dataset<Row> df = ss.createDataFrame(neo4jJavaRdd , schema);

这引发了一些类似的例外.

This threw somewhat similar exception.

所以我要做的是,我获取了单个neo4j节点的各个属性,先从中准备了Row,然后准备了JavaRDD<Row>,然后尝试通过以编程方式指定如下的模式来从中创建数据框:

So what I did is that I took individual properties of single neo4j node, prepared Row and then JavaRDD<Row> from it and then tried to create dataframe from it by programatically specifying schema as follows:

Row row1 = RowFactory.create("val1", " val2", "val3", "val4", "val5", "val6", 152214d, "val7", 152206d, 11160d, "val8");
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("attr1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr3", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr4", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr5", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr6", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr7", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attrd3", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr8", DataTypes.StringType, true));

这行得通.

因此,我检查了所有节点,并意识到并非所有节点(即JavaRDD<Row>中的所有Row)都具有相同数量的属性.这一定是导致数据帧准备失败的原因.我可以通过编程方式以某种方式处理它,而无需创建并指定pojo .

So I checked all nodes and realized that not all nodes (that is all Rows in JavaRDD<Row>) have same number of attributes. This is must be causing data frame preparation to fail. Can I handle it some way programatically without requiring to create and specify pojo.

推荐答案

使用 neo4j-spark-connector .

  1. 通常,如果要准备数据框,则最好不要返回neo4j的对象类型,特别是节点和关系.就像下面这样,返回节点是不可取的:

  1. In general, if you are going to prepare dataframe, its not preferrable to return neo4j's object types, specifically node and relationship. That is something like below returning node is not preferrable:

MATCH(n {id:'xyz'}) RETURN n

相反,返回属性:

MATCH(n {id:'xyz'}) RETURN properties(n)

  • 如果不确定所有节点将具有相同数量的属性,则最好显式返回它们,而不是返回属性并获取JavaRDD.因为这将需要我们再次处理JavaRDD,为不存在的属性添加NULL.那不是这样做:

  • If you are unsure that all nodes will not have same number of properties, then its better to return them explicitly, instead of returning properties and obtaining JavaRDD. Since that will require us to process JavaRDD again to add NULL for non existent properties. That, is instead of doing this:

    MATCH(n {id:'xyz'}) RETURN properties(n)
    

    以这种方式返回:

    MATCH(n {id:'xyz'}) RETURN n.prop1 AS prop1, n.prop2 AS prop2, ..., n.propN AS propN
    

    Neo4j本身将为不存在的属性添加NULL,如下图所示,我们不必再次遍历它们.通过返回此值,我可以使用loadDataframe()方法直接获取neo4j节点信息.

    Neo4j will itself add NULLs for non existing properties as it can be seen image below, we dont have to iterate over them again. By returning this, I was able to obtain neo4j node information directly by using loadDataframe() method.

    这篇关于以编程方式推断模式以准备火花数据帧.来自RDD&lt; Row&gt;当某些Row对象可能包含不同数量的元素时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆