Spark Streaming:接收器故障后如何不重新启动接收器 [英] Spark Streaming: how not to restart receiver after receiver's failure

查看:295
本文介绍了Spark Streaming:接收器故障后如何不重新启动接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用的是自定义火花接收器,该接收器从提供的http链接中读取流式数据.如果提供的http链接不正确,则接收器将失败.问题在于,spark将不断重启接收器,并且应用程序将永远不会终止.问题是如果接收器失败,如何告诉Spark终止应用程序.

We are using a custom spark receiver that reads streamed data from a provided http link. If the provided http link is incorrect, the receiver fails. The problem is that spark will continuously restart the receiver, and the application will never terminate. The question is how to tell Spark to terminate the application if the receiver fails.

这是我们的自定义接收器的摘录:

This is an extract of our custom receiver:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

我们有一个使用此接收器的Spark Streaming应用程序:

We have a spark streaming application that uses this receiver:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

如果接收方没有错误,一切都会按预期进行.如果接收器发生故障(例如,使用错误的http链接),spark将不断重新启动接收器,并且应用程序将永远不会终止.

Everything works as expected if the receiver doesn't have errors. If the receiver fails (e.g. with a wrong http link), spark will continuously restart it and the application will never terminate.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

如果接收方失败,我们只想终止整个应用程序.

We just want to terminate the whole application if a receiver fails.

推荐答案

有一种方法可以控制基于自定义接收器的火花流应用程序的生命周期.为您的应用程序定义作业进度监听器,并跟踪正在发生的事情.

There is a way to control the life cycle of Custom receiver based spark-streaming applications. Define job progress listener for your application and keep track of what is happening.

class CustomReceiverListener extends StreamingJobProgressListener {
    private boolean receiverStopped = false;

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);}

    public boolean isReceiverStopped() {
        return receiverStopped;
    }
    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        LOG.info("Update the flag field");
        this.receiverStopped = true;
    }
}

然后在驱动程序中,初始化一个线程以监视receiverStopped标志的状态.该线程完成后,驱动程序将停止流应用程序. (更好的方法是定义驱动程序定义的回调方法,该方法将停止流应用程序.)

And in your driver, initialize a thread to monitor the state of receiverStopped flag. Driver will stop the stream app when this thread is finished. (Better approach is to define a callback method defined by the driver, that will stop the streaming application).

CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
    while (!listener.isReceiverStopped()) {
        LOG.info("Sleepy head...");
        try {
            Thread.sleep(2 * 1000); /*check after 2 seconds*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);

注意:如果接收方有多个实例,请更改CustomReceiverListener的实现,以确保所有接收方实例都已停止.

Note: In case of multiple instances of your receivers, change the implementation of CustomReceiverListener to make sure all the receiver instances are stopped.

这篇关于Spark Streaming:接收器故障后如何不重新启动接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆