在Spark中定期更新全局变量 [英] Updating a global variable periodically in Spark

查看:625
本文介绍了在Spark中定期更新全局变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在执行Spark Streaming应用程序中的模式匹配之类的操作.我想要更新一个变量,例如 broadcast变量,但是该变量是可变的.有没有办法做到这一点?有什么建议吗?

I'm doing something like pattern matching in spark streaming app. What I want is updating a variable like broadcast variable, which however is mutable. Is there a way to do that? Any advice?

编辑

很抱歉,不清楚.我在日志上做了一些CEP的事情.我需要在spark应用程序运行时从elasticsearch加载规则.而且我想将这些规则应用到工作端(在每个RDD上).

Sorry for not being so clear. I am doing some CEP stuff on logs. I need to load the rules from elasticsearch while the spark application is running. And I wanna apply these rules on the worker side (on each RDD).

推荐答案

此处的想法是为广播变量编写一个包装,该包装会定期刷新. Catch是在允许RDD-RDD操作的转换(或任何其他变体)中调用此函数.

The idea here is to write a wrapper over the broadcast variable that gets refreshed periodically. Catch is to call this function inside transform (or any other variation) which allows RDD-RDD operations.

BroadcastWrapper类的代码片段:

Code Snipped for the BroadcastWrapper class:

public class BroadcastWrapper {

private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
    return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
   JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
   return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
    Date currentDate = Calendar.getInstance().getTime();
    long diff = currentDate.getTime()-lastUpdatedAt.getTime();
    if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
        if (var != null)
           var.unpersist();
        lastUpdatedAt = new Date(System.currentTimeMillis());

        //Your logic to refresh
        ReferenceData data = getRefData();

        var = getSparkContext(sparkContext).broadcast(data);
   }
   return var;
 }
}

要使用此方法,我们可以做类似->

To use this method we can do something like ->

objectStream.transform(stream -> {
    Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

 /*Your code here*/

});

请在另一个主题上查看我的答案,以提高清晰度 https://stackoverflow.com/a/41259333/3166245

Please see my answer on another thread for better clarity https://stackoverflow.com/a/41259333/3166245

这篇关于在Spark中定期更新全局变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆