ProtoBuf字段的Spark,Kryo序列化问题 [英] Spark, Kryo Serialization Issue with ProtoBuf field

查看:217
本文介绍了ProtoBuf字段的Spark,Kryo序列化问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在转换RDD时,运行与protobuf字段的序列化有关的spark作业时,我看到一个错误.

I am seeing an error when running my spark job relating to Serialization of a protobuf field when transforming an RDD.

com.esotericsoftware.kryo.KryoException:java.lang.UnsupportedOperationException序列化跟踪:otherAuthors_(com.thomsonreuters.kraken.medusa.dbor.proto.Book $ DBBooks)

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)

错误似乎是在此时创建的:

The error seems to be created at this point:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {
      tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) &&     !isBookPublished(o)).mapPartitions( it =>
      it.map{ord =>
        (ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))}))
}

val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>
  opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue()
}

val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>
  opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue()
}

该字段是在protobuf中指定的列表,如下所示:

The field is a list specified in the protobuf as the below:

otherAuthors_ = java.util.Collections.emptyList()

您可以看到,尽管该代码仍在通过网络传输,但实际上并没有利用Book Protobuf中的该字段.

As you can see the code is not actually utilising that field from the Book Protobuf, although it still is being transmitted over the network.

有人对此有任何建议吗?

Has anyone got any advice on this?

推荐答案

好,这是老问题,但这是子孙后代的答案.默认的Kryo序列化程序不适用于某些集合.有一个第三方库可以帮助您: kryo-serializers

OK, old question but here is an answer for the future generations. Default kryo serializers don't work well with some collections. There is a third party library that helps with it: kryo-serializers

在您的情况下,您可能需要在创建spark配置时提供自定义的kryo注册器:

In your case you probably need to provide a custom kryo registrator when creating spark config:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "MyKryoRegistrator")

在注册人中具有所需的自定义注册:

With needed custom registrations in your registrator:

class MyKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() );
        // Probably should use proto serializer for your proto classes
        kryo.register( Book.class, new ProtobufSerializer() );
    } 
}

这篇关于ProtoBuf字段的Spark,Kryo序列化问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆