avro 类型的 createDataFrame 中的无限递归 [英] Infinite recursion in createDataFrame for avro types

查看:24
本文介绍了avro 类型的 createDataFrame 中的无限递归的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在此示例中,我从 createDataFrame 调用内部收到 StackOverflowError.它起源于涉及 Java 类型推断的 Scala 代码,该代码在无限循环中调用自身.

I'm getting a StackOverflowError from inside the createDataFrame call in this example. It originates in scala code involving java type inferencing which calls itself in an infinite loop.

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

堆栈跟踪的底部如下所示:

The bottom of the stack trace looks like this:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

这看起来类似于 http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html 但我使用的是 Spark 1.4.1,它比这晚错误已修复.

This looks similar to the bug reported in http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html but I'm using Spark 1.4.1 which is later than when this bug was repaired.

Event 类是由 avro 从这个 avsc 生成的.它确实包含 double 和 long 字段,据报告这会导致问题,但用字符串替换 double 不会改变症状.

The Event class is generated by avro from this avsc. It does contain double and long fields which has been reported as causing problems but replacing double with string doesn't change the symptoms.

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

有人可以建议吗?谢谢!

Could someone please advise? Thanks!

推荐答案

spark-avro 项目正在进行工作以解决此问题,请参阅:https://github.com/databricks/spark-avro/pull/217https://github.com/databricks/spark-avro/pull/216

There is work being done in the spark-avro project to address this issue see: https://github.com/databricks/spark-avro/pull/217 and https://github.com/databricks/spark-avro/pull/216

一旦合并,应该有一个函数可以将 Avro 对象的 RDD 转换为 DataSet(行的 DataSet 相当于一个 DataFrame),而不会出现生成类中 getSchema() 函数的循环引用问题.

Once this is merged, there should be a function to convert an RDD of Avro objects into a DataSet (a DataSet of Rows is equivalent to a DataFrame), without the circular reference issue with the getSchema() function in the generated class.

这篇关于avro 类型的 createDataFrame 中的无限递归的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆