提高使用者进程中的共享内存和同步队列问题/崩溃 [英] Boost shared memory and synchronized queue issue/crash in consumer process

查看:65
本文介绍了提高使用者进程中的共享内存和同步队列问题/崩溃的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从子进程中使用c ++中的同步队列.我在C ++()( http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-ci.html )

I'm trying to consume from a child process a synchronized queue in c++. I'm using this synchronized queue in C++ () (http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html)

我将队列修改为可以在boost中序列化,并且还替换了使用的 boost :: mutex io_mutex _ 来代替使用进程内互斥锁(感谢@Sehe) boost :: interprocess :: interprocess_mutexio_mutex _ 并且在锁定时我将具有 boost :: mutex :: scoped_lock lock(io_mutex _); 的每一行更改为 scoped_lock< interprocess_mutex>锁定(io_mutex _);

I modified the queue to be serializable in boost and also replaced the used boost::mutex io_mutex_ to use instead an inteprocess mutex (thanks @Sehe) boost::interprocess::interprocess_mutex io_mutex_ And when locking I changed every line that has boost::mutex::scoped_lock lock(io_mutex_); to scoped_lock<interprocess_mutex> lock(io_mutex_);

template<class T>
class SynchronizedQueue
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & sQueue;
        ar & io_mutex_;
        ar & waitCondition;
    }
    ... // queue implementation (see [http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html][2])

}

在我的测试"应用中,我正在创建同步队列并将该类的100个实例存储在其中:

In my Test app, I'm creating the synchronized queue and storing in it 100 instances of this class:

class gps_position
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & degrees;
        ar & minutes;
        ar & seconds;
    }
public:
 int degrees;
 int minutes;
 float seconds;

 gps_position() {};
 gps_position(int d, int m, float s) :
 degrees(d), minutes(m), seconds(s)
 {}
};

消费者和生产者之间的通用定义:

Common definitions between Consumer and producer:

 char *SHARED_MEMORY_NAME = "MySharedMemory";
 char *SHARED_QUEUE_NAME  =  "MyQueue";
 typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

生产者过程代码:

    // Remove shared memory if it was created before
    shared_memory_object::remove(SHARED_MEMORY_NAME);
    // Create a new segment with given name and size
    managed_shared_memory mysegment(create_only,SHARED_MEMORY_NAME, 65536);
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)();
    //Insert data in the queue
    for(int i = 0; i < 100; ++i)  {
        gps_position position(i, 2, 3);
        myQueue->push(position);
    }
    // Start 1 process (for testing for now)
    STARTUPINFO info1={sizeof(info1)};
    PROCESS_INFORMATION processInfo1;
    ZeroMemory(&info1, sizeof(info1));
    info1.cb = sizeof info1 ; //Only compulsory field
    ZeroMemory(&processInfo1, sizeof(processInfo1));
    // Launch child process
    LPTSTR szCmdline = _tcsdup(TEXT("ClientTest.exe"));
    CreateProcess(NULL, szCmdline, NULL, NULL, TRUE, 0, NULL, NULL, &info1, &processInfo1);
    // Wait a little bit ( 5 seconds) for the started client process to load
    WaitForSingleObject(processInfo1.hProcess, 5000);

    /* THIS TESTING CODE WORK HERE AT PARENT PROCESS BUT NOT IN CLIENT PROCESS
    // Open the managed segment memory
    managed_shared_memory openedSegment(open_only, SHARED_MEMORY_NAME);
    //Find the synchronized queue using it's name
    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    while (true) {
        if (myQueue->pop(position)) {
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }*/


    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) continue;

    // Close process and thread handles. 
    CloseHandle( processInfo1.hThread );

我的消费者代码如下:

    //Open the managed segment memory
    managed_shared_memory segment(open_only, SHARED_MEMORY_NAME);
    //Find the vector using it's name
    MySynchronisedQueue *myQueue = segment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    // Pop each position until the queue become empty and output its values
    while (true)
    {
        if (myQueue->pop(position)) { // CRASH HERE
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }

当我运行创建队列并创建子进程(生产者)的父进程(生产者)时,尝试从队列中弹出"子进程会崩溃.

When I run the parent process (producer) that create the queue and create the child (consumer) process, the child crash when trying to 'pop' from the queue.

我在这里做错了什么?任何想法 ?感谢您的任何见解.这是我使用增强和共享内存创建的第一个应用程序.

What I'm doing wrong here ? Any idea ? Thanks for any insight. This is my first app creating using boost and shared memory.

我的目标是能够从多个进程使用此队列.在上面的示例中,我仅创建一个子进程以确保首先工作,然后再创建其他子进程.这个想法是,队列将预先由项目填充,并且多个创建的过程将从其中弹出"项目,而不会彼此冲突.

My goal is to be able to consume this queue from multiple process. In the example above I'm creating only one child process to make sure first it works before creating other child process. The idea is the queue will be filled in advance by items and multiple created process will 'pop' items from it without clashing on each other.

推荐答案

更新的代码:

  • 如果您要共享队列,则应该使用interprocess_mutex;这意味着需要进行一系列的依赖变更.
  • 您的队列如果要共享队列,应该使用共享内存分配器
  • 必须在互斥对象下提出条件,以便在所有平台上均具有可靠的行为
  • 未能锁定 toString()中.即使您复制了集合,但这还远远不够,因为容器在复制过程中可能会被修改.
  • 队列设计很有意义(返回" empty()"的线程安全"函数的用途是什么?在处理返回值之前,它可能不再为空或只是为空...这些称为 竞赛条件 ,会导致很难跟踪错误
  • Boost序列化与什么有关系?似乎只是在弄乱图片,因为这不是必需的,也不被使用.
  • 同样适用于Boost Any.为什么在 toString()中使用 any ?由于队列的设计,因此 typeid始终总是 gpsposition .
  • boost :: lexical_cast<> 类似(如果您已经拥有字符串流,为什么还要进行字符串连接?)
  • 为什么 empty() toString() sizeOfQueue() 不是 const ?
  • you should be using interprocess_mutex if you're gonna share the queue; This implies a host of dependent changes.
  • your queue should be using a shared-memory allocator if you're gonna share the queue
  • the conditions should be raised under the mutex for reliable behaviour on all platforms
  • you failed to lock inside toString(). Even though you copy the collection, that's not nearly enough because the container may get modified during that copy.
  • The queue design makes much sense (what is the use of a "thread safe" function that returns empty()? It could be no longer empty/just empty before you process the return value... These are called race conditions and lead to really hard to track bugs
  • What has Boost Serialization got to do with anything? It seems just there to muddle the picture, because it's not required and not being used.
  • Likewise for Boost Any. Why is any used in toString()? Due to the design of the queue, the typeid is always gpsposition anyways.
  • Likewise for boost::lexical_cast<> (why are you doing string concatenation if you already have the stringstream anyways?)
  • Why are empty(), toString(), sizeOfQueue() not const?

我强烈建议使用 boost :: interprocess :: message_queue .似乎是您实际要使用的(因为您不知何故

I highly recommend to use boost::interprocess::message_queue. This seems to be what you actually wanted to use (since you were somehow

这是经过修改的版本,可将容器放入共享内存中,并且可以正常工作:

Here's a modified version that puts the container in shared memory and it works:

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <sstream>

namespace bip = boost::interprocess;

template <class T> class SynchronizedQueue {

  public:
    typedef bip::allocator<T, bip::managed_shared_memory::segment_manager> allocator_type;
  private:
    bip::deque<T, allocator_type> sQueue;
    mutable bip::interprocess_mutex io_mutex_;
    mutable bip::interprocess_condition waitCondition;
  public:
    SynchronizedQueue(allocator_type alloc) : sQueue(alloc) {} 

    void push(T element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        sQueue.push_back(element);
        waitCondition.notify_one();
    }
    bool empty() const {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.empty();
    }
    bool pop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        if (sQueue.empty()) {
            return false;
        }

        element = sQueue.front();
        sQueue.pop_front();

        return true;
    }
    unsigned int sizeOfQueue() const {
        // try to lock the mutex
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.size();
    }
    void waitAndPop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        while (sQueue.empty()) {
            waitCondition.wait(lock);
        }

        element = sQueue.front();
        sQueue.pop();
    }

    std::string toString() const {
        bip::deque<T> copy;
        // make a copy of the class queue, to reduce time locked
        {
            boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
            copy.insert(copy.end(), sQueue.begin(), sQueue.end());
        }

        if (copy.empty()) {
            return "Queue is empty";
        } else {
            std::stringstream os;
            int counter = 0;

            os << "Elements in the Synchronized queue are as follows:" << std::endl;
            os << "**************************************************" << std::endl;

            while (!copy.empty()) {
                T object = copy.front();
                copy.pop_front();
                os << "Element at position " << counter << " is: [" << typeid(object).name()  << "]\n";
            }
            return os.str();
        }
    }
};

struct gps_position {
    int degrees;
    int minutes;
    float seconds;

    gps_position(int d=0, int m=0, float s=0) : degrees(d), minutes(m), seconds(s) {}
};

static char const *SHARED_MEMORY_NAME = "MySharedMemory";
static char const *SHARED_QUEUE_NAME  =  "MyQueue";
typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

#include <boost/interprocess/shared_memory_object.hpp>
#include <iostream>

void consumer()
{
    bip::managed_shared_memory openedSegment(bip::open_only, SHARED_MEMORY_NAME);

    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;

    while (openedQueue->pop(position)) {
        std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
        std::cout << "\n";
    }
}

void producer() {
    bip::shared_memory_object::remove(SHARED_MEMORY_NAME);

    bip::managed_shared_memory mysegment(bip::create_only,SHARED_MEMORY_NAME, 65536);

    MySynchronisedQueue::allocator_type alloc(mysegment.get_segment_manager());
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)(alloc);

    for(int i = 0; i < 100; ++i)          
        myQueue->push(gps_position(i, 2, 3));

    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) 
        continue;
}

int main() {
    producer();
    // or enable the consumer code for client:
    // consumer();
}

这篇关于提高使用者进程中的共享内存和同步队列问题/崩溃的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆