如何将 HashMap 附加到 Flink 中的 Configuration 对象? [英] How to attach a HashMap to a Configuration object in Flink?

查看:30
本文介绍了如何将 HashMap 附加到 Flink 中的 Configuration 对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Flink 中的每个节点之间共享一个 HashMap 并允许节点更新该 HashMap.到目前为止,我有这个代码:

I want to share a HashMap across every node in Flink and allow the nodes to update that HashMap. I have this code so far:

object ParallelStreams {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //Is there a way to attach a HashMap to this config variable?
    val config = new Configuration()
    config.setClass("HashMap", Class[CustomGlobal])
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
      override def toMap: util.Map[String, String] = {
        new HashMap[String, String]()
      }
    }

    class MyCoMap extends RichCoMapFunction[String, String, String] {
      var users: HashMap[String, String] = null
      //How do I get access the HashMap I attach to the global config here?
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
        val globalConf = globalParams[Configuration]
        val hashMap = globalConf.getClass

      }
      //Other functions to override here
    }
}

我想知道您是否可以将自定义对象附加到此处创建的 config 变量 val config = new Configuration()?(请参阅上面代码中的注释).

I was wondering if you can attach a custom object to the config variable created here val config = new Configuration()? (Please see comments in the code above).

我注意到您只能附加原始值.我创建了一个扩展 ExecutionConfig.GlobalJobParameters 的自定义类,并通过执行 config.setClass("HashMap", Class[CustomGlobal]) 附加该类,但我不确定是否那样你应该怎么做?

I noticed you can only attach primitive values. I created a custom class that extends ExecutionConfig.GlobalJobParameters and attached that class by doing config.setClass("HashMap", Class[CustomGlobal]) but I am not sure if that is how you are supposed to do it?

推荐答案

将参数分配给运算符的常用方法是将它们作为函数类中的常规成员变量.在计划构建期间创建和分配的功能对象被序列化并发送给所有工作人员.所以你不必通过配置传递参数.

The common way to distribute parameters to operators is to have them as regular member variables in the function class. The function object that is created and assigned during plan construction is serialized and shipped to all workers. So you don't have to pass parameters via a configuration.

这将如下所示

class MyMapper(map: HashMap) extends MapFunction[String, String] {
 // class definition
}


val inStream: DataStream[String] = ???

val myHashMap: HashMap = ???
val myMapper: MyMapper = new MyMapper(myHashMap)
val mappedStream: DataStream[String] = inStream.map(myMapper)

myMapper 对象被序列化(使用 Java 序列化)并发送以供执行.所以map的类型必须实现Java的Serializable接口.

The myMapper object is serialized (using Java serialization) and shipped for execution. So the type of map must implement the Java Serializable interface.

我错过了您希望地图可从所有并行任务更新的部分.这在 Flink 中是不可能的.您必须完全复制地图并全部更新(通过广播),或者为此使用外部系统(键值存储).

I missed the part that you want the map to be updatable from all parallel tasks. That is not possible with Flink. You would have to either fully replicate the map and all updated (by broadcasting) or use an external system (key-value store) for that.

这篇关于如何将 HashMap 附加到 Flink 中的 Configuration 对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆