发送到ExecutorService的作业的运行时间 [英] Running time of a job sent to a ExecutorService

查看:81
本文介绍了发送到ExecutorService的作业的运行时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

美好的一天,

我正在编写一个程序,其中对从文本文件读取的每一行调用一个方法.由于此方法的每次调用都独立于其他任何行的读取,因此我可以并行调用它们.为了最大限度地利用CPU,我使用ExecutorService来提交每个run()调用.由于文本文件有1500万行,因此我需要错开ExecutorService运行以一次不创建太多作业(OutOfMemory异常).我还想跟踪每次提交的运行的运行时间,因为我发现有些运行尚未完成.问题是,当我尝试将Future.get方法与超时一起使用时,超时是指它进入ExecutorService队列的时间,而不是指它甚至从开始运行就开始运行的时间.我想花些时间,因为它开始运行,而不是因为它进入了队列.

I am writing a program where a method is called for each line read from a text file. As each call of this method is independent of any other line read I can call them on parallel. To maximize cpu usage I use a ExecutorService where I submit each run() call. As the text file has 15 million lines, I need to stagger the ExecutorService run to not create too many jobs at once (OutOfMemory exception). I also want to keep track of the time each submitted run has been running as I have seen that some are not finishing. The problem is that when I try to use the Future.get method with timeout, the timeout refers to the time since it got into the queue of the ExecutorService, not since it started running, if it even started. I would like to get the time since it started running, not since it got into the queue.

代码如下:

ExecutorService executorService= Executors.newFixedThreadPool(ncpu);
line = reader.readLine();
long start = System.currentTimeMillis();
HashMap<MyFut,String> runs = new HashMap<MyFut, String>();
HashMap<Future, MyFut> tasks = new HashMap<Future, MyFut>();
while ( (line = reader.readLine()) != null ) { 

String s = line.split("\t")[1];
final String m = line.split("\t")[0];
MyFut f = new MyFut(s, m);
tasks.put(executorService.submit(f), f);

runs.put(f, line);

while (tasks.size()>ncpu*100){
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    Iterator<Future> i = tasks.keySet().iterator();
    while(i.hasNext()){
        Future task = i.next();
        if (task.isDone()){
            i.remove();

        } else {
            MyFut fut = tasks.get(task);
            if (fut.elapsed()>10000){
                System.out.println(line);
                task.cancel(true);
                i.remove();
            }
        }
    }
}
}

private static class MyFut implements Runnable{

private long start;
String copy;
String id2;

public MyFut(String m, String id){
    super();

    copy=m;
    id2 = id;
}

public long elapsed(){
    return System.currentTimeMillis()-start;
}



@Override
public void run() {
    start = System.currentTimeMillis();
    do something...
}

}

如您所见,我尝试跟踪已发送的作业数,如果超过了阈值,我会稍等片刻,直到一些作业完成.我还尝试检查是否有任何作业花费太长时间才能取消它,请牢记哪个失败,然后继续执行.这不是我希望的那样.一项任务执行10秒的时间远远超过了需要的时间(根据机器和CPU的数量,我会在70到130秒内完成1000行代码).

As you can see I try to keep track of how many jobs I have sent and if a threshold is passed I wait a bit until some have finished. I also try to check if any of the jobs is taking too long to cancel it, keeping in mind which failed, and continue execution. This is not working as I hoped. 10 seconds execution for one task is much more than needed (I get 1000 lines done in 70 to 130s depending on machine and number of cpu).

我做错了什么?我的Runnable类中的run方法不应该仅在ExecutorService中的某些线程空闲并开始对其工作时才调用吗?我得到了很多结果,这些结果花费了超过10秒的时间.有没有更好的方法来实现我正在尝试的目标?

What am I doing wrong? Shouldn't the run method in my Runnable class be called only when some Thread in the ExecutorService is free and starts working on it? I get a lot of results that take more than 10 seconds. Is there a better way to achieve what I am trying?

谢谢.

推荐答案

您正在使工作更加努力. Java的框架提供了您想要的一切,您只需要使用它即可.

You are making your work harder as it should be. Java’s framework provides everything you want, you only have to use it.

使用绑定队列限制未决工作项的数量,但是Executors.newFixedThreadPool()返回的ExecutorService使用未绑定的队列.一旦有界队列已满,要等待的策略可以通过RejectedExecutionHandler实施.整个过程看起来像这样:

Limiting the number of pending work items works by using a bounded queue, but the ExecutorService returned by Executors.newFixedThreadPool() uses an unbound queue. The policy to wait once the bounded queue is full can be implemented via a RejectedExecutionHandler. The entire thing looks like this:

static class WaitingRejectionHandler implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
      executor.getQueue().put(r);// block until capacity available
    } catch(InterruptedException ex) {
      throw new RejectedExecutionException(ex);
    }
  }
}
public static void main(String[] args)
{
  final int nCPU=Runtime.getRuntime().availableProcessors();
  final int maxPendingJobs=100;
  ExecutorService executorService=new ThreadPoolExecutor(nCPU, nCPU, 1, TimeUnit.MINUTES,
    new ArrayBlockingQueue<Runnable>(maxPendingJobs), new WaitingRejectionHandler());

  // start flooding the `executorService` with jobs here

仅此而已.

测量作业内部的时间很容易,因为它与多线程无关:

Measuring the elapsed time within a job is quite easy as it has nothing to do with multi-threading:

long startTime=System.nanoTime();
// do your work here
long elpasedTimeSoFar = System.nanoTime()-startTime;

但是一旦您使用了有界队列,也许您就不再需要它了.

But maybe you don’t need it anymore once you are using the bounded queue.

带超时的Future.get方法不会不是的方式是指自从其进入ExecutorService队列以来的时间,它是指调用get方法本身的时间.换句话说,它告诉get方法允许等待多长时间,仅此而已.

By the way the Future.get method with timeout does not refer to the time since it got into the queue of the ExecutorService, it refers to the time of invoking the get method itself. In other words, it tells how long the get method is allowed to wait, nothing more.

这篇关于发送到ExecutorService的作业的运行时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆