ExecutorService - 如何在Runnable / Callable和重用中设置值 [英] ExecutorService - How to set values in a Runnable/Callable and reuse
问题描述
我想使用ExecutorService来运行一系列相同的Runnable / Callable任务。我搜索了一个教程或示例,但没有涉及实际设置一个现有的Runnable / Callable对象的值,然后使用submit()发送该对象回到ExecutorService。
I want to use the ExecutorService to run a series of the same Runnable/Callable tasks. I've searched around for a tutorial or an example, but nothing that involves actually setting the value of an existing Runnable/Callable object and then using submit() to send that object back into the ExecutorService.
基本上,这里是我想做的:
Basically, here's what I want to do:
- 获取服务器列表。
- 迭代服务器列表,调用
InetAddress.getByName(host)
来抓取每个主机上的数据。 - 将数据收集到服务器bean中以存储在数据库中。
- Get a list of servers.
- Iterate thru the list of servers, calling
InetAddress.getByName(host)
to grab data on each host. - Collect that data into Server beans for storage in a database.
所以,现在,有10,000(+)台服务器,它需要永远。所以,我的想法是使用ExecutorService来管理线程池。我不能看出是如何检测,当一个线程完成,所以我可以抓取数据。然后我需要获取列表中的下一个服务器,将其放入任务,然后将submit()返回到ExecutorService。
So, right now, with 10,000(+) servers, it takes forever. So, my thought was to use the ExecutorService to manage a pool of threads. What I can't seem to figure out is how to detect when one thread is finished so I can grab the data. Then I need to get the next server in the list, place it into the Task and then submit() back to the ExecutorService.
这就是说,到目前为止似乎指向下面的ExecutorService,submit(),Callable,Future。
That said, what I've read so far seems to point to the following, ExecutorService, submit(), Callable, Future.
所以,作为一个伪进程:
So, as a psuedo-process:
- 获取服务器列表。
- 使用numThreads线程数设置ExecutorService
- 迭代numThreads并创建numThreads WorkerTask()对象。
- 将()WorkerTask()提交到ExecutorService进行处理。
- 检测WorkerTask()完成时,获取Callable(Future)结果。
- 获取下一个服务器。
- 将服务器值设置为WorkerTask()< - 如何?这是难以捉摸...
- 将WorkerTask()(带有新值)提交()回到ExecutorService。
- 再次迭代。
- ExecutorService.shutdown()...
- Get list of servers.
- Set up ExecutorService with numThreads number of threads
- Iterate numThreads and create numThreads WorkerTask() objects.
- Submit() WorkerTask() to ExecutorService for processing.
- Detect when a WorkerTask() has finished, grab the Callable (Future) result.
- Get the next server.
- Set the server value into the WorkerTask() <-- How? This is elusive ...
- Submit() the WorkerTask() (with the new value) back to the ExecutorService.
- Iterate again.
- ExecutorService.shutdown()...
将是伟大的...特别是将一个新值放入WorkerTask()的例子。此外,我想欣赏任何关于这个提出的解决方案的想法?
So, a tutorial or example of this would be great...especially an example of placing a new value into a WorkerTask(). Also, I'd appreciate any thoughts on this proposed solution? Is this bad, good, or if there is another way, I'm open.
02/09/2014 - 编辑并添加
02/09/2014 - Edit and Add
所以下面是第一个剪切。但是回答一些问题提出:
- 我解决了在工作者中放置新数据的问题,并重新提交到ExecutorService ...查看代码片段。
- 我也解决了获取东西的问题...我只是把Future()结果转换为Worker类...看到代码片段。
- 最后,虽然我可以将每个服务器分配给Worker()和Future(),我担心当前的10,000将增长,内存将成为一个问题。
Hi, So the following is a first cut at this. But to answer some question being posed:
- I've solved the issue of placing new data in a Worker and resubmitting to the ExecutorService...see the code snippet.
- I've also solved the issue of "get the stuff"...I simply cast the Future() results to the Worker class...see the code snippet.
- Finally, while I could just allocate every server to a Worker() and a Future(), I'm concerned that the current 10,000 will grow and memory will become an issue.
也就是说,这是第一次尝试,这个工作相当不错。运行得更快,只使用getNumbnerThreads()Worker和Future对象:
That said, here's a first attempt, and this works pretty well. Runs much faster and only uses the getNumbnerThreads() Worker and Future objects:
public List<ServerBean> lookupHostIps ( List<ServerBean> theServerList ) {
//ServerBean serverDto = null;
ServerBean ipDto = null;
List<ServerBean> theResults = new ArrayList<ServerBean>();
List<HostLookupWorker> theWorkers = new ArrayList<HostLookupWorker>( getNumberThreads() );
List<Future<HostLookupWorker>> theFutures = new ArrayList<Future<HostLookupWorker>>( getNumberThreads() );
ExecutorService executor = Executors.newFixedThreadPool ( getNumberThreads() );
// WORKERS : Create the workers...prime them with a server
// bean...
//
for (int j = 0; j < getNumberThreads(); j++) {
//for (int j = 0; j < theServerList.size(); j++) {
theWorkers.add ( new HostLookupWorker( theServerList.get(j) ) );
Future<HostLookupWorker> theFuture = executor.submit ( theWorkers.get ( j ) );
theFutures.add ( j, theFuture );
}
int lloopItems = getNumberThreads(); /* loops thru all servers */
//int lloopThreads = 0; /* loops thru threads */
int lidxThread = 0; /* what thread is ready */
//int lidxFuture = 0; /* what future is ready */
boolean lblnNext = false; /* is a thread done/ready */
int lidxWorkers = 0; /* tracks the futures */
while ( lloopItems < theServerList.size() ) {
// READY : Is one of the threads ready for more work?
if ( lblnNext ) {
// VALUE : Grab the thread by index and set the next
// server value.
theWorkers.get ( lidxThread ).setBean ( theServerList.get(lloopItems) );
getLog().debug ( "Thread [" + lidxThread + "] Assigned Host ["+theServerList.get(lloopItems).getServerName ()+"] " );
// FUTURE : Package a new Future<HostLookupWorker>
// and submit it to the thread pool.
Future<HostLookupWorker> theFuture = executor.submit ( theWorkers.get ( lidxThread ) );
theFutures.add ( lidxThread, theFuture );
lblnNext = false; /* reset to allow for another thread */
lloopItems++; /* increment the main loop counter */
}
while ( !(lblnNext) ) {
try {
if ( theFutures.get(lidxWorkers).get() != null ) {
// GET THE STUFF : Grab the results from the Future...
HostLookupWorker ltheItem = theFutures.get(lidxWorkers).get();
if ( ltheItem.getValue () != null ) {
if (!ltheItem.getValue ().contains("Cannot find host")){
ipDto = new ServerBean ();
ipDto.setServerId ( ltheItem.getBean ().getServerId() );
ipDto.setServerName ( ltheItem.getBean ().getServerName() );
ipDto.setIpAddress ( ltheItem.getValue () );
theResults.add(ipDto);
}
lidxThread = lidxWorkers; /* this thread is ready for more work */
lblnNext = true; /* flag the upper condition to assign new work */
getLog().debug ( "Thread [" + lidxThread + "] Host ["+ltheItem.getHost ()+"] has IP ["+ltheItem.getValue()+"]" );
}
}
else {
getLog().debug ( "Thread [" + lidxThread + "] NULL" );
}
lidxWorkers++; /* next worker/future */
if ( lidxWorkers >= getNumberThreads() ) {
lidxWorkers = 0;
}
}
catch(ExecutionException e){
getLog().error ( e );
}
catch(InterruptedException e){
getLog().error ( e );
}
}
}
executor.shutdown ();
return theResults;
}
这里是Worker / Thread类:
Here's the Worker/Thread class :
import java.net.*;
import java.util.concurrent.Callable;
import com.lmig.cdbatch.dto.ServerBean;
public class HostLookupWorker implements Callable {
private InetAddress node = null;
private String value = null;
private boolean busy = false;
private ServerBean bean = null;
public HostLookupWorker () {
this.busy = false;
}
// public HostLookupWorker ( String theHost ) {
// this.busy = false;
// this.host = theHost;
// }
public HostLookupWorker ( ServerBean theItem ) {
this.busy = false;
this.bean = theItem;
//this.host = theItem.getServerName ().trim ();
}
public String lookup ( String host ) {
if ( host != null ) {
// get the bytes of the IP address
try {
this.node = InetAddress.getByName ( host );
}
catch ( UnknownHostException ex ) {
this.value = "Not Found [" + getHost() + "]";
return "Not Found [" + host + "]";
}
if ( isHostname(host) ) {
getBean().setIpAddress ( node.getHostAddress() );
return node.getHostAddress();
}
else { // this is an IP address
//return node.getHostName();
return host;
}
}
return host;
} // end lookup
public boolean isHostname(String host) {
// Is this an IPv6 address?
if (host.indexOf(':') != -1)
return false;
char[] ca = host.toCharArray();
// if we see a character that is neither a digit nor a period
// then host is probably a hostname
for (int i = 0; i < ca.length; i++) {
if (!Character.isDigit(ca[i])) {
if (ca[i] != '.')
return true;
}
}
// Everything was either a digit or a period
// so host looks like an IPv4 address in dotted quad format
return false;
} // end isHostName
// public void run() {
// value = lookup ( getHost() );
//
// }
public Object call() throws Exception {
Thread.sleep ( 10000 );
this.busy = true;
this.value = lookup ( getHost() );
this.busy = false;
return this;
}
public String getHost() {
return getBean().getServerName ().trim ();
}
public void setHost(String host) {
getBean().setServerName ( host.trim () );
}
public InetAddress getNode() {
return node;
}
public void setNode(InetAddress node) {
this.node = node;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public boolean isBusy() {
return busy;
}
public void setBusy(boolean busy) {
this.busy = busy;
}
public ServerBean getBean() {
return bean;
}
public void setBean(ServerBean bean) {
this.bean = bean;
}
}
因此,总结:
- 该进程工作正常,工作速度很快。
- 我需要修复一点代码,因为有getNumberThreads() - 1当while()循环时,未处理未处理最后完成...
So, to summarize:
- The process does work, and works fast.
- I need to fix the code a little, as there are getNumberThreads() - 1 Futures left unprocessed when the larger while () loop finally finishes...
所以,我现在正在努力的是如何检测线程何时完成...我看过多个例子,一个测试for Future()!= null,其他人测试Future()== null。那么哪一个是正确的?
So, what I'm struggling with now is how to detect when a thread is finished...I've seen multiple examples, one testing for Future() != null, others testing for Future() == null. So which one is right?
推荐答案
我认为最好的aproach在这种情况下会为每个服务器创建一个任务,由pull中的线程执行,然后使用tge future对象来检索任务返回的服务器信息。
I think the best aproach in this case would be create a task for every server since they will be executed by the threads in the pull and then use tge future objects to retrieve server information returned by the task.
这篇关于ExecutorService - 如何在Runnable / Callable和重用中设置值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!