定期更新的Flink源 [英] Flink source for periodical update

查看:130
本文介绍了定期更新的Flink源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为长时间运行的flink作业实现外部配置.我的想法是创建一个自定义源,该源定期(每5分钟一次)通过http从外部服务中轮询JSON编码的配置.

I'm trying to implement external config for long-running flink job. My idea is to create custom source that periodically (every 5 minutes) polls JSON-encoded config from external service by http.

如何创建每N分钟执行一次操作的源? 如何将该配置重新广播给所有执行者?

How to create source that perform action every N minutes? How can I rebroadcast this config to all executors?

推荐答案

首先,您需要创建一个事件类,该类将定义事件流具有的所有属性,然后创建所有的getter,setter和其他方法.此类的一个示例是

first, you need to make an event class which will define all the attributes that your event stream has and then makes all getters, setters and other methods. An example of this class will be

 public class qrsIntervalStreamEvent {

    public Integer Sensor_id;
    public long time;
    public Integer qrsInterval;


    public qrsIntervalStreamEvent(Integer sensor_id, long time, Integer qrsInterval) {
        Sensor_id = sensor_id;
        this.time = time;
        this.qrsInterval = qrsInterval;
    }


    public Integer getSensor_id() {
        return Sensor_id;
    }

    public void setSensor_id(Integer sensor_id) {
        Sensor_id = sensor_id;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public Integer getQrsInterval() {
        return qrsInterval;
    }

    public void setQrsInterval(Integer qrsInterval) {
        this.qrsInterval = qrsInterval;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof qrsIntervalStreamEvent)) return false;

        qrsIntervalStreamEvent that = (qrsIntervalStreamEvent) o;

        if (getTime() != that.getTime()) return false;
        if (getSensor_id() != null ? !getSensor_id().equals(that.getSensor_id()) : that.getSensor_id() != null)
            return false;
        return getQrsInterval() != null ? getQrsInterval().equals(that.getQrsInterval()) : that.getQrsInterval() == null;
    }

    @Override
    public int hashCode() {
        int result = getSensor_id() != null ? getSensor_id().hashCode() : 0;
        result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
        result = 31 * result + (getQrsInterval() != null ? getQrsInterval().hashCode() : 0);
        return result;
    }


    @Override
    public String toString() {
        return "StreamEvent{" +
                "Sensor_id=" + Sensor_id +
                ", time=" + time +
                ", qrsInterval=" + qrsInterval +
                '}';
    }


} //class

现在,假设您要在5个事件/5秒内发送这些事件,那么您可以编写类似这样的代码

Now let's say you want to send these events at x events/ 5 seconds then you can write code something like this

public class Qrs_interval_Gen extends RichParallelSourceFunction<qrsIntervalStreamEvent> {
@Override
public void run(SourceContext<qrsIntervalStreamEvent> sourceContext) throws Exception {


    int qrsInterval;
    int Sensor_id;
    long currentTime;
    Random random = new Random();

    Integer InputRate = 10;

    Integer Sleeptime = 1000 * 5 / InputRate  ;


    for(int i = 0 ; i <= 100000 ; i++){


        // int randomNum = rand.nextInt((max - min) + 1) + min;
        Sensor_id =  1;

        qrsInterval =  10 + random.nextInt((20-10)+ 1);
       // currentTime = System.currentTimeMillis();
        currentTime = i;

        //System.out.println("qrsInterval = " + qrsInterval + ", Sensor_id = "+ Sensor_id );
        try {
            Thread.sleep(Sleeptime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        qrsIntervalStreamEvent stream = new qrsIntervalStreamEvent(Sensor_id,currentTime,qrsInterval);

            sourceContext.collect(stream);

    } // for loop


}

    @Override
    public void cancel() {

    }
}

这里的整个逻辑是由

如果您想每秒发送x个事件,那么您的睡眠时间将与此相反.例如每秒发送10个事件

if you want to send x events/ second then your sleep time will be inverse of that. For example to send 10 events/second

睡眠时间= 1000/10 = 100毫秒

Sleeptime = 1000 / 10 = 100 milli-seconds

类似地,对于发送10个事件/5秒,睡眠时间将为

Similarly, for sending 10 events/ 5 seconds, the sleep time will be

睡眠时间= 1000 * 5/10 = 500毫秒

Sleeptime = 1000 * 5 / 10 = 500 milli-seconds

希望有帮助,如果您有任何疑问,请告诉我

Hope it helps, let me know if you have any questions

这篇关于定期更新的Flink源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆