Avro类型的createDataFrame中的无限递归 [英] Infinite recursion in createDataFrame for avro types

查看:70
本文介绍了Avro类型的createDataFrame中的无限递归的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在此示例中,我从createDataFrame调用内部获取了StackOverflowError.它起源于涉及Java类型推断的scala代码,该代码在无限循环中调用自己.

I'm getting a StackOverflowError from inside the createDataFrame call in this example. It originates in scala code involving java type inferencing which calls itself in an infinite loop.

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

堆栈跟踪的底部看起来像这样:

The bottom of the stack trace looks like this:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

这看起来与

This looks similar to the bug reported in http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html but I'm using Spark 1.4.1 which is later than when this bug was repaired.

Event类是由avro从此avsc生成的.它确实包含据报道会引起问题的double和long字段,但是用string替换double并不会改变症状.

The Event class is generated by avro from this avsc. It does contain double and long fields which has been reported as causing problems but replacing double with string doesn't change the symptoms.

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

有人可以建议吗?谢谢!

Could someone please advise? Thanks!

推荐答案

spark-avro项目正在解决此问题,请参见: https://github.com/databricks/spark-avro/pull/216

There is work being done in the spark-avro project to address this issue see: https://github.com/databricks/spark-avro/pull/217 and https://github.com/databricks/spark-avro/pull/216

一旦合并,应该有一个函数可以将Avro对象的RDD转换为DataSet(行的DataSet等效于DataFrame),而生成类中的getSchema()函数没有循环引用问题

Once this is merged, there should be a function to convert an RDD of Avro objects into a DataSet (a DataSet of Rows is equivalent to a DataFrame), without the circular reference issue with the getSchema() function in the generated class.

这篇关于Avro类型的createDataFrame中的无限递归的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆