如何知道ES上的项目可以重新提交给ES的ExecutorService完成时间 [英] How do I know when ExecutorService has finished if items on the ES can resubmit to the ES

查看:91
本文介绍了如何知道ES上的项目可以重新提交给ES的ExecutorService完成时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Java应用程序可处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹.为此,每个ExecutorService都将处理每个文件夹,该ExecutorService的最大池大小与计算机的CPU数量不匹配.

例如,如果我们有8个CPU的计算机,则(理论上)可以同时处理八个文件夹,如果我们有16个CPU的计算机,则可以同时处理16个文件夹.如果只有1个CPU,则将pool-size设置为3,以便在I/O阻止一个文件夹的情况下允许CPU继续执行操作.

但是,实际上我们并没有一个ExecutorService,因为每个文件夹可以经历多个阶段,所以我们不止一个.

Process1(使用ExecutorService1)→Process2(ExecutorService2)→Process3(ExecutorService3)

进程1,2,3等全部实现Callable,并且都具有自己的关联ExecutorService.我们有一个FileLoader流程,我们启动该流程,然后加载文件夹,然后为每个文件夹创建一个可调用的Process1并提交给Process1执行程序,对于每个可调用的Process1,它将执行其工作,然后提交给另一个可调用的对象,可能是Process2,Process3等等,但是我们从不退缩,例如Process3永远不会提交给Process1. 实际上,我们有 12 个进程,但是任何特定的文件夹都不可能通过所有12个进程

但是我意识到这是有缺陷的,因为在使用16 CPU的计算机的情况下,每个ES的池大小可以为16,因此我们实际上正在运行48个线程,这将导致过多的争用.

所以我要做的是让所有进程(Process1,Process2…)都使用相同的ExecutorService,这样我们就只能匹配CPU的工作线程.

但是,在当前情况下,我们有一个SongLoader进程,该进程仅提交了一个任务(所有文件夹的加载),然后调用shutdown(),直到将所有内容都提交给Process0然后关闭,该操作才能完成()直到所有内容都发送到Process1等等,否则才能成功.

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

但是,如果所有内容都在同一ES上,并且Process1提交给Process2,则它将不再起作用,因为在调用shutdown()时,不是所有Process1都已提交给Process2的文件夹,因此将其过早关闭. /p>

那么当该ES上的任务可以提交给同一ES上的其他任务时,如何使用单个ExecutorService检测何时完成了所有工作?

还是有更好的方法?

注意,您可能会想,为什么他不合并Process1,2&的逻辑? 3成一个单一的过程.困难在于,尽管我最初是按文件夹对歌曲进行分组的,但有时歌曲会被分成较小的组,并且它们被分配到生产线中的各个进程中,而不一定是同一进程,实际上总共有12个进程.

根据肖尔姆斯的想法进行尝试

主线程

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

使用流程代码使用 MainAnalyserService

中的此功能提交新任务

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

它似乎正在运行,但因

而失败

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

现在,我无法释放一个调用future.get()的线程(它等待完成),而同时其他线程正在添加到列表中.

解决方案

我同意Shloim的观点,在这里您不需要多个ExecutorService实例-仅一个(大小与您拥有的CPU数量相同)就足够了并且实际上是最佳的.实际上,我认为您可能不需要ExecutorService.如果您使用信号完整性的外部机制,那么简单的Executor就可以完成这项工作.

我将从建立一个类来表示一个较大的工作项的整体开始.如果需要使用每个子工作项的结果,则可以使用队列,但是如果您只想知道是否还有工作要做,则只需要一个计数器即可.

例如,您可以执行以下操作:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

如果您担心pendingItems计数器的同步开销,则可以使用AtomicInteger代替.然后,您需要一个单独的机制来通知等待线程我们已经完成;例如,您可以使用CountDownLatch.这是一个示例实现:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

您将这样称呼:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一个子级工作项,但是您可以使用任意数量的子级工作项,只要它们都使用相同的pendingItems计数器来跟踪还有多少工作要做.

My Java application works on music files within folders, it is designed to process multiple folders in parallel and independently. To do this each folder is processed by an ExecutorService that has a maximum pool size that matches no of CPUs of the computer.

For example, if we have 8-CPU computer then eight folders can (in theory) be processed concurrently, if we have a 16-CPU computer then 16 folders can be processed concurrently. If we only have 1 CPU then we set pool-size to 3, to allow the CPU to continue doing something if one folder blocked on I/O.

However, we don't actually have just one ExecutorService we have more than one because each folder can go through a number of stages.

Process1 (uses ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)

Process 1,2,3 etc all implements Callable and all have their own associated ExecutorService. There is a FileLoader process that we kick off and this loads folders and then create a Process1 callable for each folder and submits to Process1 executor, for each Process1 callable it will do its work and then submit to a different callable, this maybe Process2, Process3 ecetera but we never go backwards, e.g Process3 will never submit to Process1. We actually have 12 processes, but any particular folder is unlikeley to go through all 12 processes

But I realized that this is flawed because in the case of a 16-CPU computer each ES can have pool-size of 16, so we actually have 48 threads running and this will just lead too much contention.

So what I was going to do was have all processes (Process1, Process2…) use the same ExecutorService, that way we only ever worker threads matching CPUs.

However, in my current situation, we have a SongLoader process that has just one task submitted (loading of all folders) and we then call shutdown(), this won't complete until everything has been submitted to Process0, then shutdown() on Process0 won't succeed until everything sent to Process1 and so on.

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

However, if everything was on same ES and Process1 was submitting to Process2 this would no longer work because at the time shutdown() was called not all folders that Process1 would have submitted to Process2 so it would be shut down prematurely.

So how do I detect when all work has been completed using a single ExecutorService when tasks on that ES can submit to other tasks on the same ES?

Or is there a better approach?

Note, you might just think why doesnt he just merge the logic of Process1,2 & 3 into a single Process. The difficulty is that although I initially I groups songs by folder, sometimes the songs gets split into smaller groups and they get allocated to seperate processes doiwn the line and not neessarily the same process, there are actually 12 processes in total.

Attempt based on Sholms idea

Main Thread

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

With Process code submitting new tasks using this function from MainAnalyserService

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

It looked like it was working but it failed with

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

and I now releize I cannot hyave one thread calling future.get() (which waits until done), whilst at the same time other threads are adding to the list.

解决方案

I agree with Shloim that you don't need multiple ExecutorService instances here -- just one (sized to the number of CPUs you have available) is sufficient and actually optimal. Actually, I think you might not need ExecutorService; a simple Executor can do the job if you use an external mechanism of signaling completeness.

I would start by building a class to represent the entirety of a larger work item. If you need to consume the results from each child work item, you could use a queue, but if you just want to know if there is work left to do, you only need a counter.

For example, you could do something like this:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

If you're worried about synchronization overhead for the pendingItems counter, you can use an AtomicInteger for it instead. Then you need a separate mechanism for notifying a waiting thread that we are done; for example, you can use a CountDownLatch. Here's an example implementation:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

You would call this like so:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

This example only shows one level of child work items, but you can use any number of child work items as long as they all use the same pendingItems counter to keep track of how much work is left to do.

这篇关于如何知道ES上的项目可以重新提交给ES的ExecutorService完成时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆