如何使用C ++ 11< thread>设计一个从数据源提取数据的系统 [英] How to use C++11 <thread> designing a system which pulls data from sources

查看:179
本文介绍了如何使用C ++ 11< thread>设计一个从数据源提取数据的系统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题来自:
C ++ 11线程不能与虚拟成员函数一起使用



正如注释中所建议的,我在上一篇文章中的问题可能不是正确的问题,因此这里是原始问题:



我想创建一个捕获系统,它将以恒定/动态频率查询几个源(因源而异,比如10次/秒) ,并将数据拉到每个队列。而源不是固定的,它们可以在运行期间添加/删除。



并且有一个监视器从常量频率队列中取出并显示数据。 / p>

那么这个问题最好的设计模式或结构是什么。



对于所有的源拉出器,并且每个拉出器保持线程和指定的拉动功能(不知何故拉动功能可以与拉出器相互作用,说如果源是排出的,则它将要求停止在该线程上的拉动过程) / p>

解决方案

除非你查询源代码的操作是阻塞的(或者你有您不需要 使用线程。我们可以从 Producer 开始,它将同步或异步(线程)分派工作:

  template< typename OutputType> 
class Producer
{
std :: list< OutputType>输出;

protected:
int poll_interval; //秒?毫秒?
virtual OutputType query()= 0;

public:
virtual〜Producer();

int next_poll_interval()const {return poll_interval;} }
void poll(){output.push_back(this-> query()); }

std :: size_t size(){return output.size(); }
//任何你需要的队列的访问器在这里:
// pop_front,swap整个列表等
};

现在我们可以从这个生产者只需在每个子类型中实现查询方法。您可以在构造函数中设置 poll_interval ,并单独保留,或者在每次调用 query 时更改它。有一个生成器组件,没有依赖于分派机制。

  template< typename OutputType> 
class ThreadDispatcher
{
Producer< OutputType> *生产者;
bool shutdown;
std :: thread thread;

static void loop(ThreadDispatcher * self)
{
Producer< OutputType> * producer = self-> producer;

while(!self-> shutdown)
{
producer-> poll();
//一些机制将产生的值传递给所有者
auto delay = //为了参数假定毫米
std :: chrono :: milliseconds(producer-> next_poll_interval ));
std :: this_thread :: sleep_for(delay);
}
}

public:
explicit ThreadDispatcher(Producer< OutputType> * p)
: (loop,this)
{
}

〜ThreadDispatcher()
{
shutdown = true;
thread.join();
}

//再次,读取生成的值所需的访问器到这里
// Producer :: output不同步,所以你不能直接暴露它
//调用线程
};

这是一个简单的调度器的简图,它将在线程中运行你的生产者,然后轮询它经常你问它。请注意,系统不会显示传回产生的值给拥有者,因为我不知道您要如何存取这些值。



另外请注意,到关闭标志 - 它应该是原子的,但它可能会隐式同步任何你选择对生成的值。



对于这个组织,还可以容易地编写同步分派器以在单个线程中查询多个生成器,例如从select / poll循环,或者使用类似Boost.Asio和每个生产者的截止时间计时器。


This question comes from: C++11 thread doesn't work with virtual member function

As suggested in a comment, my question in previous post may not the right one to ask, so here is the original question:

I want to make a capturing system, which will query a few sources in a constant/dynamic frequency (varies by sources, say 10 times / sec), and pull data to each's queues. while the sources are not fixed, they may add/remove during run time.

and there is a monitor which pulls from queues at a constant freq and display the data.

So what is the best design pattern or structure for this problem.

I'm trying to make a list for all the sources pullers, and each puller holds a thread, and a specified pulling function (somehow the pulling function may interact with the puller, say if the source is drain, it will ask to stop the pulling process on that thread.)

解决方案

Unless the operation where you query a source is blocking (or you have lots of them), you don't need to use threads for this. We could start with a Producer which will work with either synchronous or asynchronous (threaded) dispatch:

template <typename OutputType>
class Producer
{
    std::list<OutputType> output;

protected:
    int poll_interval; // seconds? milliseconds?
    virtual OutputType query() = 0;

public:
    virtual ~Producer();

    int next_poll_interval() const { return poll_interval; }
    void poll() { output.push_back(this->query()); }

    std::size_t size() { return output.size(); }
    // whatever accessors you need for the queue here:
    // pop_front, swap entire list, etc.
};

Now we can derive from this Producer and just implement the query method in each subtype. You can set poll_interval in the constructor and leave it alone, or change it on every call to query. There's your general producer component, with no dependency on the dispatch mechanism.

template <typename OutputType>
class ThreadDispatcher
{
    Producer<OutputType> *producer;
    bool shutdown;
    std::thread thread;

    static void loop(ThreadDispatcher *self)
    {
        Producer<OutputType> *producer = self->producer;

        while (!self->shutdown)
        {
            producer->poll();
            // some mechanism to pass the produced values back to the owner
            auto delay = // assume millis for sake of argument
                std::chrono::milliseconds(producer->next_poll_interval());
            std::this_thread::sleep_for(delay);
        }
    }

public:
    explicit ThreadDispatcher(Producer<OutputType> *p)
      : producer(p), shutdown(false), thread(loop, this)
    {
    }

    ~ThreadDispatcher()
    {
        shutdown = true;
        thread.join();
    }

    // again, the accessors you need for reading produced values go here
    // Producer::output isn't synchronised, so you can't expose it directly
    // to the calling thread
};

This is a quick sketch of a simple dispatcher that would run your producer in a thread, polling it however often you ask it to. Note that passing produced values back to the owner isn't shown, because I don't know how you want to access them.

Also note I haven't synchronized access to the shutdown flag - it should probably be atomic, but it might be implicitly synchronized by whatever you choose to do with the produced values.

With this organization, it'd also be easy to write a synchronous dispatcher to query multiple producers in a single thread, for example from a select/poll loop, or using something like Boost.Asio and a deadline timer per producer.

这篇关于如何使用C ++ 11&lt; thread&gt;设计一个从数据源提取数据的系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆