Java executors:当任务完成时如何在不阻塞的情况下收到通知? [英] Java executors: how to be notified, without blocking, when a task completes?

查看:46
本文介绍了Java executors:当任务完成时如何在不阻塞的情况下收到通知?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个队列,里面装满了我需要提交给执行程序服务的任务.我希望他们一次处理一个.我能想到的最简单的方法是:

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. 从队列中取出一个任务
  2. 提交给执行者
  3. 在返回的 Future 上调用 .get 并阻塞直到结果可用
  4. 从队列中取出另一个任务...

但是,我试图避免完全阻塞.如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我将耗尽堆栈空间,因为它们中的大多数将保持阻塞线程.

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

我想要的是提交一个任务并提供一个回调,该回调在任务完成时被调用.我将使用该回调通知作为发送下一个任务的标志.(functionaljava 和 jetlang 显然使用了这种非阻塞算法,但我看不懂他们的代码)

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

如何使用 JDK 的 java.util.concurrent 来做到这一点,而没有编写自己的执行程序服务?

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(为我提供这些任务的队列本身可能会阻塞,但这是一个稍后要解决的问题)

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

推荐答案

定义一个回调接口来接收你想要在完成通知中传递的任何参数.然后在任务结束时调用它.

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交给 ExecutorService.或者,请参阅下面的 Java 8 内置机制.

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

<小时>

使用 CompletableFuture,Java 8 包含了一种更复杂的方法来组合管道,其中进程可以异步和有条件地完成.这是一个人为但完整的通知示例.


With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

这篇关于Java executors:当任务完成时如何在不阻塞的情况下收到通知?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆