在线程之间传递一组对象 [英] Passing a Set of Objects between threads

查看:143
本文介绍了在线程之间传递一组对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理的当前项目要求我实现一种有效地将一组连续运行的线程对象传递到主线程的方式。当前的设置类似于以下内容。



我有一个主线程创建一个新的线程。这个新线程连续运行,并调用基于定时器的方法。此方法从在线源中获取一组消息,并将其组织在TreeSet中。



此TreeSet然后需要传回主线程,以便它包含的消息可以独立于循环计时器进行处理。



为了更好的参考,我的代码看起来像以下

  //由主线程启动时调用。 
void StartProcesses()
{
if(this.IsWindowing)
{
return;
}

this._windowTimer = Executors.newSingleThreadScheduledExecutor();

Runnable task = new Runnable(){
public void run(){
WindowCallback();
}
};

this.CancellationToken = false;
_windowTimer.scheduleAtFixedRate(task,
0,this.SQSWindow,TimeUnit.MILLISECONDS);

this.IsWindowing = true;
}

////////////////////////////////// ////////////////////////////

private void WindowCallback()
{
ArrayList< Message> messages = new ArrayList< Message>();

// TODO创建监视器
如果((!CancellationToken))
{
try
{
// TODO修复epochWindowTime
long epochWindowTime = 0;
int numberOfMessages = 0;
映射< String,String>属性;

//设置SQS客户端
AmazonSQS client = new AmazonSQSClient(new
ClasspathPropertiesFileCredentialsProvider());

client.setEndpoint(this.AWSSQSServiceUrl);

//获取NumberOfMessages以优化
//从队列中接收所有消息

GetQueueAttributesRequest attributesRequest =
new GetQueueAttributesRequest() ;
attributesRequest.setQueueUrl(this.QueueUrl);
attributesRequest.withAttributeNames(
ApproximateNumberOfMessages);
attributes = client.getQueueAttributes(attributesRequest)。
getAttributes();

numberOfMessages = Integer.valueOf(attributes.get(
ApproximateNumberOfMessages))。intValue();

//确定是否需要从队列
接收消息if(numberOfMessages> 0)
{

if(numberOfMessages< 10 )
{
//只是做内联比
//旋转线程
ReceiveTask(numberOfMessages);
}
else
{
// TODO为此
创建一个多线程版本ReceiveTask(numberOfMessages);
}
}

如果(!CancellationToken)
{

// TODO测试
_setLock.lock();

迭代器< Message> _setIter = _set.iterator();
// TODO
while(_setIter.hasNext())
{
Message temp = _setIter.next();

Long value = Long.valueOf(temp.getAttributes()。
get(Timestamp));
if(value.longValue()< epochWindowTime)
{
messages.add(temp);
_set.remove(temp);
}
}

_setLock.unlock();

// TODO重复消除消息

// TODO重新排序消息

// TODO用结果
}

if((!CancellationToken)&(message.size()> 0))
{
if(messages.size()< 10)
{
对<整数,整数> range =
new Pair< Integer,Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages,range);
}
else
{
// TODO创建一个将这项工作分配到
之间的方法//多个线程
Pair< Integer,Integer> range =
new Pair< Integer,Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages,range);
}
}
} catch(AmazonServiceException ase){
ase.printStackTrace();
} catch(AmazonClientException ace){
ace.printStackTrace();
}
}
}

一些人可以看到的评论,我目前的首选方法是处理这是通过在定时器线程中创建一个事件,如果有消息。然后主线程将正在侦听此事件并妥善处理。



目前我不熟悉Java如何处理事件,或者如何创建/监听它们。我也不知道是否可以创建事件,并且包含在它们之间的信息在线程之间传递。



有人可以给我一些关于我的方法是否可行的建议/见解?如果是这样,我可以在哪里找到有关如何实施这些信息的信息,因为我目前的搜索尝试并没有证明是富有成果的。



如果没有,我可以提供一些关于我将如何处理的建议,请记住,如果可能的话,我想避免管理套接字。 p>

编辑1:



主线程还将负责发出基于它收到的消息,或发出命令来获取所需的信息。由于这个原因,主线程不能等待接收消息,并且应该以基于事件的方式处理它们。

解决方案

生产者 - 消费者模式:



连续地堆叠队列中的对象(消息)。
另一个线程(消费者)从队列中读取和删除对象。



如果您的问题符合此要求,请尝试BlockingQueue。
http://docs.oracle .com / javase / 7 / docs / api / java / util / concurrent / BlockingQueue.html



这是简单而有效的。 b
$ b

如果队列为空,消费者将被阻止,这意味着线程等待(所以不要使用cpu时间),直到生产者放置一些对象。否则cosumer会连续地消耗对象。
如果队列已满,生产者将被阻止,直到消费者消费一些对象,在队列中创建一个空格,反之亦然。



这里有一个例子:
(一个队列在生产者和消费者中应该是同一个对象)






(Producer thread)

 消息消息= createMessage(); 
queue.put(message);

(消费者线程)

 消息消息= queue.take(); 
handleMessage(message);


The current project I am working on requires that I implement a way to efficiently pass a set of objects from one thread, that runs continuously, to the main thread. The current setup is something like the following.

I have a main thread which creates a new thread. This new thread operates continuously and calls a method based on a timer. This method fetches a group of messages from an online source and organizes them in a TreeSet.

This TreeSet then needs to be passed back to the main thread so that the messages it contains can be handled independent of the recurring timer.

For better reference my code looks like the following

// Called by the main thread on start.  
void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task,
            0, this.SQSWindow, TimeUnit.MILLISECONDS);

    this.IsWindowing = true;
}

/////////////////////////////////////////////////////////////////////////////////

private void WindowCallback()
{
    ArrayList<Message> messages = new ArrayList<Message>();

    //TODO create Monitor
    if((!CancellationToken))
    {
        try
        {
            //TODO fix epochWindowTime
            long epochWindowTime = 0;
            int numberOfMessages = 0;
            Map<String, String> attributes;

            // Setup the SQS client
            AmazonSQS client = new AmazonSQSClient(new 
                    ClasspathPropertiesFileCredentialsProvider());

            client.setEndpoint(this.AWSSQSServiceUrl);

            // get the NumberOfMessages to optimize how to 
            // Receive all of the messages from the queue

            GetQueueAttributesRequest attributesRequest = 
                    new GetQueueAttributesRequest();
            attributesRequest.setQueueUrl(this.QueueUrl);
            attributesRequest.withAttributeNames(
                    "ApproximateNumberOfMessages");
            attributes = client.getQueueAttributes(attributesRequest).
                    getAttributes();

            numberOfMessages = Integer.valueOf(attributes.get(
                    "ApproximateNumberOfMessages")).intValue();

            // determine if we need to Receive messages from the Queue
            if (numberOfMessages > 0)
            {

                if (numberOfMessages < 10)
                {
                    // just do it inline it's less expensive than 
                    //spinning threads
                    ReceiveTask(numberOfMessages);
                }
                else
                {
                    //TODO Create a multithreading version for this
                    ReceiveTask(numberOfMessages);
                }
            }

            if (!CancellationToken)
            {

                //TODO testing
                _setLock.lock();

                Iterator<Message> _setIter = _set.iterator();
                //TODO
                while(_setIter.hasNext())
                {
                    Message temp = _setIter.next();

                    Long value = Long.valueOf(temp.getAttributes().
                            get("Timestamp"));
                    if(value.longValue() < epochWindowTime)
                    {
                        messages.add(temp);
                        _set.remove(temp);
                    }
                }

                _setLock.unlock();

                // TODO deduplicate the messages

                // TODO reorder the messages

                // TODO raise new Event with the results
            }

            if ((!CancellationToken) && (messages.size() > 0))
            {
                if (messages.size() < 10)
                {
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
                else
                {
                    //TODO Create a way to divide this work among 
                    //several threads
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
            }
        }catch (AmazonServiceException ase){
            ase.printStackTrace();
        }catch (AmazonClientException ace) {
            ace.printStackTrace();
        }
    }
}

As can be seen by some of the commenting, my current preferred way to handle this is by creating an event in the timer thread if there are messages. The main thread will then be listening for this event and handle it appropriately.

Presently I am unfamiliar with how Java handles events, or how to create/listen for them. I also do not know if it is possible to create events and have the information contained within them passed between threads.

Can someone please give me some advice/insight on whether or not my methods are possible? If so, where might I find some information on how to implement them as my current searching attempts are not proving fruitful.

If not, can I get some suggestions on how I would go about this, keeping in mind I would like to avoid having to manage sockets if at all possible.

EDIT 1:

The main thread will also be responsible for issuing commands based on the messages it receives, or issuing commands to get required information. For this reason the main thread cannot wait on receiving messages, and should handle them in an event based manner.

解决方案

Producer-Consumer Pattern:

One thread(producer) continuosly stacks objects(messages) in a queue. another thread(consumer) reads and removes objects from the queue.

If your problem fits to this, Try "BlockingQueue". http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

It is easy and effective.

If the queue is empty, consumer will be "block"ed, which means the thread waits(so do not uses cpu time) until producer puts some objects. otherwise cosumer continuosly consumes objects. And if the queue is full, prducer will be blocked until consumer consumes some objects to make a room in the queue, vice versa.

Here's a example: (a queue should be same object in both producer and consumer)


(Producer thread)

Message message = createMessage();
queue.put(message);

(Consumer thread)

Message message = queue.take();
handleMessage(message);

这篇关于在线程之间传递一组对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆