如何将HashMap附加到Flink中的Configuration对象? [英] How to attach a HashMap to a Configuration object in Flink?

查看:614
本文介绍了如何将HashMap附加到Flink中的Configuration对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在Flink中的每个节点之间共享一个HashMap,并允许节点更新该HashMap.到目前为止,我已经有了这段代码:

I want to share a HashMap across every node in Flink and allow the nodes to update that HashMap. I have this code so far:

object ParallelStreams {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //Is there a way to attach a HashMap to this config variable?
    val config = new Configuration()
    config.setClass("HashMap", Class[CustomGlobal])
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
      override def toMap: util.Map[String, String] = {
        new HashMap[String, String]()
      }
    }

    class MyCoMap extends RichCoMapFunction[String, String, String] {
      var users: HashMap[String, String] = null
      //How do I get access the HashMap I attach to the global config here?
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
        val globalConf = globalParams[Configuration]
        val hashMap = globalConf.getClass

      }
      //Other functions to override here
    }
}

我想知道是否可以将自定义对象附加到在val config = new Configuration()此处创建的config变量中? (请参阅上面的代码中的注释).

I was wondering if you can attach a custom object to the config variable created here val config = new Configuration()? (Please see comments in the code above).

我注意到您只能附加原始值.我创建了一个扩展了ExecutionConfig.GlobalJobParameters的自定义类,并通过执行config.setClass("HashMap", Class[CustomGlobal])附加了该类,但是我不确定这是否应该这样做?

I noticed you can only attach primitive values. I created a custom class that extends ExecutionConfig.GlobalJobParameters and attached that class by doing config.setClass("HashMap", Class[CustomGlobal]) but I am not sure if that is how you are supposed to do it?

推荐答案

向操作员分配参数的常用方法是使它们成为函数类中的常规成员变量.在计划构建期间创建和分配的功能对象被序列化并交付给所有工作人员.因此,您不必通过配置传递参数.

The common way to distribute parameters to operators is to have them as regular member variables in the function class. The function object that is created and assigned during plan construction is serialized and shipped to all workers. So you don't have to pass parameters via a configuration.

如下所示

class MyMapper(map: HashMap) extends MapFunction[String, String] {
 // class definition
}


val inStream: DataStream[String] = ???

val myHashMap: HashMap = ???
val myMapper: MyMapper = new MyMapper(myHashMap)
val mappedStream: DataStream[String] = inStream.map(myMapper)

myMapper对象被序列化(使用Java序列化)并交付执行.因此,map的类型必须实现Java Serializable接口.

The myMapper object is serialized (using Java serialization) and shipped for execution. So the type of map must implement the Java Serializable interface.

编辑:我错过了您希望从所有并行任务中更新地图的部分.对于Flink,这是不可能的.您将必须完全复制地图并通过广播广播所有更新的地图,或者为此使用外部系统(键值存储).

I missed the part that you want the map to be updatable from all parallel tasks. That is not possible with Flink. You would have to either fully replicate the map and all updated (by broadcasting) or use an external system (key-value store) for that.

这篇关于如何将HashMap附加到Flink中的Configuration对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆