定期更新的Flink源 [英] Flink source for periodical update
问题描述
我正在尝试为长时间运行的flink作业实现外部配置.我的想法是创建一个自定义源,该源定期(每5分钟一次)通过http从外部服务中轮询JSON编码的配置.
I'm trying to implement external config for long-running flink job. My idea is to create custom source that periodically (every 5 minutes) polls JSON-encoded config from external service by http.
如何创建每N分钟执行一次操作的源? 如何将该配置重新广播给所有执行者?
How to create source that perform action every N minutes? How can I rebroadcast this config to all executors?
推荐答案
首先,您需要创建一个事件类,该类将定义事件流具有的所有属性,然后创建所有的getter,setter和其他方法.此类的一个示例是
first, you need to make an event class which will define all the attributes that your event stream has and then makes all getters, setters and other methods. An example of this class will be
public class qrsIntervalStreamEvent {
public Integer Sensor_id;
public long time;
public Integer qrsInterval;
public qrsIntervalStreamEvent(Integer sensor_id, long time, Integer qrsInterval) {
Sensor_id = sensor_id;
this.time = time;
this.qrsInterval = qrsInterval;
}
public Integer getSensor_id() {
return Sensor_id;
}
public void setSensor_id(Integer sensor_id) {
Sensor_id = sensor_id;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public Integer getQrsInterval() {
return qrsInterval;
}
public void setQrsInterval(Integer qrsInterval) {
this.qrsInterval = qrsInterval;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof qrsIntervalStreamEvent)) return false;
qrsIntervalStreamEvent that = (qrsIntervalStreamEvent) o;
if (getTime() != that.getTime()) return false;
if (getSensor_id() != null ? !getSensor_id().equals(that.getSensor_id()) : that.getSensor_id() != null)
return false;
return getQrsInterval() != null ? getQrsInterval().equals(that.getQrsInterval()) : that.getQrsInterval() == null;
}
@Override
public int hashCode() {
int result = getSensor_id() != null ? getSensor_id().hashCode() : 0;
result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
result = 31 * result + (getQrsInterval() != null ? getQrsInterval().hashCode() : 0);
return result;
}
@Override
public String toString() {
return "StreamEvent{" +
"Sensor_id=" + Sensor_id +
", time=" + time +
", qrsInterval=" + qrsInterval +
'}';
}
} //class
现在,假设您要在5个事件/5秒内发送这些事件,那么您可以编写类似这样的代码
Now let's say you want to send these events at x events/ 5 seconds then you can write code something like this
public class Qrs_interval_Gen extends RichParallelSourceFunction<qrsIntervalStreamEvent> {
@Override
public void run(SourceContext<qrsIntervalStreamEvent> sourceContext) throws Exception {
int qrsInterval;
int Sensor_id;
long currentTime;
Random random = new Random();
Integer InputRate = 10;
Integer Sleeptime = 1000 * 5 / InputRate ;
for(int i = 0 ; i <= 100000 ; i++){
// int randomNum = rand.nextInt((max - min) + 1) + min;
Sensor_id = 1;
qrsInterval = 10 + random.nextInt((20-10)+ 1);
// currentTime = System.currentTimeMillis();
currentTime = i;
//System.out.println("qrsInterval = " + qrsInterval + ", Sensor_id = "+ Sensor_id );
try {
Thread.sleep(Sleeptime);
} catch (InterruptedException e) {
e.printStackTrace();
}
qrsIntervalStreamEvent stream = new qrsIntervalStreamEvent(Sensor_id,currentTime,qrsInterval);
sourceContext.collect(stream);
} // for loop
}
@Override
public void cancel() {
}
}
这里的整个逻辑是由
如果您想每秒发送x个事件,那么您的睡眠时间将与此相反.例如每秒发送10个事件
if you want to send x events/ second then your sleep time will be inverse of that. For example to send 10 events/second
睡眠时间= 1000/10 = 100毫秒
Sleeptime = 1000 / 10 = 100 milli-seconds
类似地,对于发送10个事件/5秒,睡眠时间将为
Similarly, for sending 10 events/ 5 seconds, the sleep time will be
睡眠时间= 1000 * 5/10 = 500毫秒
Sleeptime = 1000 * 5 / 10 = 500 milli-seconds
希望有帮助,如果您有任何疑问,请告诉我
Hope it helps, let me know if you have any questions
这篇关于定期更新的Flink源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!