Java并发:在任务池中查找任务失败 [英] Java Concurrency: Find task failed in a pool of tasks

查看:170
本文介绍了Java并发:在任务池中查找任务失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

SSCCE 之后显示从服务器提取记录(例如)。 ExecutorService 用于创建一个 ThreadPool 的2个线程,我已经调用所有这些线程与超时为3秒。
故意我做了一些任务失败。



$ b

现在我的问题是,如何获得EmpID失败的任务?

MainClass:

  public class MultiThreadEx {
public static void main(String [] args){

String [] empIDArray = {
100,200,300,400,500,
600,700,800 ,900,1000,
1100,1200,1300,1400,1500

List< ThreadTaskEach> taskList = new ArrayList< ThreadTaskEach>();
try {
for(String empID:empIDArray){
taskList.add(new ThreadTaskEach(empID));
}
} catch(Exception e){
System.out.println(Exception occured:+ e.getMessage());
}

List< Future< Map< String,String>>> futureList = null;
try {
ExecutorService service = Executors.newFixedThreadPool(2);
futureList = service.invokeAll(taskList,3,TimeUnit.SECONDS);
service.shutdown();
} catch(InterruptedException ie){
System.out.println(Exception occured:+ ie.getMessage());
}

for(Future< Map< String,String>> future:futureList){
try {
Map< String,String& resultMap = future.get();

for(String key:resultMap.keySet()){
System.out.println(resultMap.get(key));
}

} catch(ExecutionException ee){
System.out.println(Exception occured:+ ee.getMessage());
} catch(InterruptedException ie){
System.out.println(Exception occured:+ ie.getMessage());
} catch(CancellationException e){
System.out.println(Exception occured:+ e.getMessage());
}
}

}
}

线程类

 类ThreadTaskEach实现Callable< Map< String,String>> {

private String empID;

public ThreadTaskEach(String empID){
this.empID = empID;
}

@Override
public Map< String,String> call()throws Exception {
try {
return prepareMap(empID);
} catch(Exception e){
System.out.println(Exception occured:+ e.getMessage());
throw new Exception(Exception occured:+ e.getMessage());
}
}

private Map< String,String> prepareMap(String empID)throws InterruptedException {
Map< String,String> map = new HashMap< String,String>();

if(Integer.parseInt(empID)%500 == 0){
Thread.sleep(5000);
}

map.put(empID,empID +:+ Thread.currentThread()。getId());

return map;
}
}

在上述代码500中,1000 ..在3秒内完成。

 地图< String,String> resultMap = future.get(); 

所以当我对这些任务说future.get()时,我得到 CancellationException 。但是如何从任务中获取EmpID

解决方案

而不是使用 invokeAll ,您可以逐个提交每个任务并将每个未来存储在地图中:

 未来<?> f = executor.submit(task); 
map.put(f,task.getId());

现在,当您尝试获取时,如果您有异常,可以使用地图返回到id。但是,您需要对每个 future.get()设置超时,这对您的用例可能不实用。



另一种方法是使用 invokeAll 这将保证以与任务相同的顺序返回futures


返回代表任务的Futures列表与由


只要使用列表,迭代顺序就是固定的,你可以匹配以下两个列表:

  for(int i = 0; i  Future< Map< String,String>> future = futureList.get(i)
try {
Map< String,String> resultMap = future.get();

for(String key:resultMap.keySet()){
System.out.println(resultMap.get(key));
}
} catch(ExecutionException ee){
System.out.println(任务中的异常+ taskList.get(i).getId());
}
}

只要确保使用具有稳定迭代顺序(ArrayList很好)。


Following SSCCE shows fetching records from a server(say). ExecutorService is used to create a ThreadPool of 2 threads and I have invoked all these threads with Timeout of 3 seconds. Intentionally I made some tasks to fail.

Now my question is, how to get the EmpID's of the tasks which failed?

MainClass:

public class MultiThreadEx {
    public static void main(String[] args) {

        String[] empIDArray = {
                "100", "200", "300", "400", "500", 
                "600", "700", "800", "900", "1000", 
                "1100", "1200", "1300", "1400", "1500"
            };

        List<ThreadTaskEach> taskList = new ArrayList<ThreadTaskEach>();
        try {
            for(String empID : empIDArray) {
                taskList.add(new ThreadTaskEach(empID));
            }
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
        }

        List<Future<Map<String, String>>> futureList = null;
        try {
            ExecutorService service = Executors.newFixedThreadPool(2);
            futureList = service.invokeAll(taskList, 3, TimeUnit.SECONDS);
            service.shutdown();
        } catch(InterruptedException ie) {
            System.out.println("Exception occured: " + ie.getMessage());
        }

        for(Future<Map<String, String>> future : futureList) {
            try {
                Map<String, String> resultMap = future.get();

                for(String key : resultMap.keySet()) {
                    System.out.println(resultMap.get(key));
                }

            } catch(ExecutionException ee) {
                System.out.println("Exception occured: " + ee.getMessage());
            } catch (InterruptedException ie) {
                System.out.println("Exception occured: " + ie.getMessage());
            } catch(CancellationException e) {
                System.out.println("Exception occured: " + e.getMessage());
            }
        }

    }
}

Thread class

class ThreadTaskEach implements Callable<Map<String, String>>{

    private String empID;

    public ThreadTaskEach(String empID) {
        this.empID = empID;
    }

    @Override
    public Map<String, String> call() throws Exception {
        try {
            return prepareMap(empID);
        } catch(Exception e) {
            System.out.println("Exception occured: " + e.getMessage());
            throw new Exception("Exception occured: " + e.getMessage());
        }
    }

    private Map<String, String> prepareMap(String empID) throws InterruptedException {
        Map<String, String> map = new HashMap<String, String>();

        if(Integer.parseInt(empID) % 500 == 0) {
            Thread.sleep(5000);
        }

        map.put(empID, empID + ": " + Thread.currentThread().getId());

        return map;
    }
}

In the above code 500, 1000 .. fails to complete within 3 seconds.

Map<String, String> resultMap = future.get();

So when I say future.get() for these tasks I am getting CancellationException. But how to get the EmpID from the task?

解决方案

Instead of using invokeAll, you could submit each task one by one and store each future in a map:

Future<?> f = executor.submit(task);
map.put(f, task.getId());

Now when you try to get, if you have an exception, you can use the map to go back to the id. However you would need to put a timeout on each future.get() which is maybe not practical for your use case.

The alternative is to use the specification of invokeAll which guarantees to return the futures in the same order as the tasks that were submitted.

Returns a list of Futures representing the tasks, in the same sequential order as produced by the iterator for the given task list.

As long as you use a List, the iteration order is fixed and you can match the two lists:

    for (int i = 0; i < futureList.size(); i++) {
        Future<Map<String, String>> future = futureList.get(i)
        try {
            Map<String, String> resultMap = future.get();

            for(String key : resultMap.keySet()) {
                System.out.println(resultMap.get(key));
            }
        } catch(ExecutionException ee) {
            System.out.println("Exception in task " + taskList.get(i).getId());
        }
    }

Just make sure that you use a collection with a stable iteration order (ArrayList is fine).

这篇关于Java并发:在任务池中查找任务失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆