提升1.49条件变量问题 [英] Boost 1.49 Condition Variable problem

查看:61
本文介绍了提升1.49条件变量问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你好,



我试图在我的应用程序中使用Boost Conditional变量同步两个不同的线程,如下所示:



主线程将创建一个名为MIH-User的TCP服务器和对象实例,并向event_handler注册一个回调。



Main.cpp

  / *   * 
*默认MIH事件处理程序。
*
* @param msg已收到消息。
* @param ec错误代码。
* /

void event_handler(odtone :: mih :: message& msg, const boost :: system :: error_code& ec)
{
if (ec)
{
log _( 0 ,__ FUNCTION __, 错误:,ec.message());
return ;
}

switch (msg.mid())
{
// 源服务器收到HO完整消息
case odtone :: mih :: indication :: n2n_ho_complete:
{
if (ec)
{
log _( 0 ,__ FUNCTION __, 错误:,ec 。信息());
return ;
}

mih :: id mobile_id; // 移动节点MIHF ID TLV
mih :: link_tuple_id source_id; // 源链接ID TLV
mih :: link_tuple_id target_id; // 目标链接ID TLV
mih :: ho_result ho_res; // 切换结果TLV

// 反序列化收到的MIH消息N2N切换完成指示
msg>> mih :: indication()
& mih :: tlv_mobile_node_mihf_id(mobile_id)
& mih :: tlv_link_identifier(source_id)
& mih :: tlv_new_link_identifier(target_id)
& MIH :: tlv_ho_result(ho_res);

log _( 0 已收到N2N_HO_Complete.Indication,其中HO-Result =,ho_res.get(),
来自,msg.source()。to_string(), ,用于Mobile-IP =,mobile_id.to_string());

// 查找与此指标对应的源交易
src_transaction_ptr t;
tpool-> find(msg.source(),mobile_id.to_string(),t);
{
boost :: lock_guard< boost :: mutex>锁(叔> MUT);
t-> response_received = true ;
t-> ho_complete_result = ho_res;
t-> tid = msg.tid();
}
t-> cond.notify_one();
}
break ;

}
}

int main( int argc, char ** argv)
{
odtone :: setup_crash_handler();
boost :: asio :: io_service ios;

sap :: user usr(cfg,ios,boost :: bind(& event_handler,_1,_2));
mMihf =& usr;

// 使用本地MIHF注册MIH-Usr
register_mih_user(cfg);

// 与同行mihfs挂起的交易池
ho_transaction_pool池(IOS);
tpool =& pool;

// io_service对象提供I / O服务,例如套接字,
// 服务器对象将使用。
tcp_server server(ios) ,cfg.get< ushort>(kConf_Server_Port));
}





TCP服务器将侦听新的传入连接,并在接收到新连接后将创建一个新连接对应于源事务处理机器的线程也将它添加到公共事务池中,如下所示:



TCP服务器

  void  handle_request(std :: string arg1,std :: string arg2)
{
src_transaction_ptr t( new src_transaction(arg1,arg2));
tpool-> add(t);
t-> run();
}

void handle_read( const boost :: system: :error_code& error, size_t bytes_transferred)
{
if (!error )
{
// 拆分收到的消息定义;作为分隔符
std :: vector< std :: string>可疑交易报告;
boost :: split(strs,mMessage,boost :: is_any_of( ) );
log _( 0 收到的消息来自TCP客户端:,mMessage);

// 第一个值是HO命令启动消息
if ((strs.at( 0 )。compare( INIT)== 0 )&&(strs.size( )== 3 ))
{
// 第二个值是MIHF ID,第三个是IP地址
// 如果我们收到Init-Message,则启动Source事务
boost :: thread thrd(& tcp_connection :: handle_request,< span class =code-keyword> this ,strs.at( 1 ),strs.at( 2 ));
}
else if ((strs.at( 0 )。compare( TEST)== 0 )&&(strs.size()== 3 ))
{
int max_iterations = atoi(strs.at( 2 )。c_str());
for int i = 1 ; i< = max_iterations; i ++)
{
boost :: thread thrd(& tcp_connection :: handle_request,
this ,strs.at( 1 ),boost :: lexical_cast< std :: string>(i ));
}
}
else
log _( 0 错误:无法识别的消息。);

memset(& mMessage [ 0 ], 0 ,max_length);

mSocket.async_read_some(boost :: asio :: buffer(mMessage,max_length),
boost :: bind(& tcp_connection :: handle_read,shared_from_this(),
boost :: asio :: placeholders :: error,
boost :: asio :: placeholders :: bytes_transferred));

}
}





源事务处理机器将在不同的状态之间移动它必须冻结执行状态,直到它通过主线程收到一个指示,此时为n2n_ho_complete,它会将response_received设置为如下:



来源交易机



<前lang =c ++> / * *
*运行源状态机事务。
* /

void src_transaction :: run()
{
// Previuos状态。

wait_ho_complete_indication_state:
{
log _( 1 在SRC_WAIT_HO_COMPLETE_INDICATION状态为移动IP =,ip_address);
mState = SRC_WAIT_HO_COMPLETE_INDICATION;

boost :: unique_lock< boost :: mutex>锁(MUT);
while (!response_received)
{
cond.wait(lock);
}

response_received = false ;

// 做一些事情
}

// 其他州

返回;
}





response_received是一个公共变量,类的每个实例都有自己的变量。当通过主线程接收到指示时,它将查找与该指示匹配的源事务并将其response_received设置为true。



所以我的问题是:每当我尝试执行代码,整个程序挂起在wait_ho_complete_indication_state上,程序没有响应任何东西。例如,如果我请求为源事务创建10个线程。程序将创建所有这些程序并且它们开始同时工作,直到其中一个到达wait_ho_complete_indication_state,然后一切都冻结。即使主线程完全没有响应,即使它通过event_handler接收到指示。



那么我的代码是否正确使用条件变量?



请帮忙解决这个问题。



非常感谢。

解决方案

Hello,

I am trying to use Boost Conditional variable in my application to synchronize two different threads as following:

The main thread, will create a TCP server and instance of object called MIH-User and register a callback to an event_handler.

Main.cpp

/**
* Default MIH event handler.
*
* @param msg Received message.
* @param ec Error code.
*/
void event_handler(odtone::mih::message &msg, const boost::system::error_code &ec)
{
if (ec)
{
    log_(0, __FUNCTION__, " error: ", ec.message());
    return;
}

switch (msg.mid())
{
    // Source Server received HO Complete Message
    case odtone::mih::indication::n2n_ho_complete:
    {
        if (ec)
        {
            log_(0, __FUNCTION__, " error: ", ec.message());
            return;
        }

        mih::id             mobile_id;          // Mobile node MIHF ID TLV
        mih::link_tuple_id  source_id;          // Source Link ID TLV
        mih::link_tuple_id  target_id;          // Target Link ID TLV
        mih::ho_result      ho_res;             // Handover Result TLV

        // Deserialize received MIH message "N2N Handover Complete Indication"
        msg >> mih::indication()
            & mih::tlv_mobile_node_mihf_id(mobile_id)
            & mih::tlv_link_identifier(source_id)
            & mih::tlv_new_link_identifier(target_id)
            & mih::tlv_ho_result(ho_res);

        log_(0, "has received a N2N_HO_Complete.Indication with HO-Result=", ho_res.get(), 
            " from ", msg.source().to_string(), ", for Mobile-IP=", mobile_id.to_string());

        // Find the source transaction which corresponds to this Indication
        src_transaction_ptr t;
        tpool->find(msg.source(), mobile_id.to_string(), t);
        {
            boost::lock_guard<boost::mutex> lock(t->mut);
            t->response_received = true;
            t->ho_complete_result = ho_res;
            t->tid = msg.tid();
        }
        t->cond.notify_one();
    }
    break;

}
}

int main(int argc, char **argv)
{
        odtone::setup_crash_handler();
        boost::asio::io_service ios;

    sap::user usr(cfg, ios, boost::bind(&event_handler, _1, _2));
    mMihf = &usr;

    // Register the MIH-Usr with the local MIHF
    register_mih_user(cfg);

    // Pool of pending transactions with peer mihfs
    ho_transaction_pool pool(ios);
    tpool = &pool;

    // The io_service object provides I/O services, such as sockets, 
    // that the server object will use.
    tcp_server server(ios, cfg.get<ushort>(kConf_Server_Port));
}



The TCP server will listen for new incoming connections and upon the reception of a new connection it will create a new thread corresponding to a source transaction machine also it will add it to a common transaction pool as following:

TCP Server

void handle_request(std::string arg1,std::string arg2)
{
    src_transaction_ptr t(new src_transaction(arg1, arg2));
    tpool->add(t);
    t->run();
}

void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
{
    if (!error)
    {
        // Split received message defining ";" as a delimiter
        std::vector<std::string> strs;
        boost::split(strs, mMessage, boost::is_any_of(":"));
        log_(0, "Received Message from TCP Client: ", mMessage);

        // The first value is the HO Command Initiation message
        if ((strs.at(0).compare("INIT") == 0) && (strs.size() == 3))
        {
            // The second value is the MIHF ID and the third is the IP address
            // Start Source transaction if we receive "Init-Message"
            boost::thread thrd(&tcp_connection::handle_request, this, strs.at(1), strs.at(2));
        }
        else if ((strs.at(0).compare("TEST") == 0) && (strs.size() == 3))
        {
            int max_iterations = atoi(strs.at(2).c_str());
            for (int i = 1; i <= max_iterations; i++)
            {
                boost::thread thrd(&tcp_connection::handle_request,
                    this, strs.at(1), boost::lexical_cast<std::string>(i));
            }
        }
        else
            log_(0, "Error: Unrecognized message.");

        memset(&mMessage[0], 0, max_length);

        mSocket.async_read_some(boost::asio::buffer(mMessage, max_length),
            boost::bind(&tcp_connection::handle_read, shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    }
}



The source transaction machine will move between different states and in one of the states it will have to freeze the execution until it receives an indication through the main thread which is the "n2n_ho_complete" at this time, it will set the response_received to ture as following:

Source Transaction Machine

/**
* Run Source State Machine transaction.
*/
void src_transaction::run()
{        
// Previuos states.

wait_ho_complete_indication_state:
{
    log_(1, "is in SRC_WAIT_HO_COMPLETE_INDICATION State for Mobile IP=", ip_address);
    mState = SRC_WAIT_HO_COMPLETE_INDICATION;

    boost::unique_lock<boost::mutex> lock(mut);
    while (!response_received)
    {
        cond.wait(lock);
    }

    response_received = false;

    // Do some stuff
}

    // Other states

return;
}



The response_received is a public variable and each instance of the class has its own variable. When an indication is received through the main thread, it will look for the source transaction that matches that indication and sets its response_received to true.

So my problem is: whenever I try to execute the code, the whole program hangs on the wait_ho_complete_indication_state ,and the program doesn't respond to anything. And for example if I request the creation of a 10 threads for a source transaction. The program will create all of them and they start to work concurrently, until one of them reaches the wait_ho_complete_indication_state , then everything freezes. Even the main thread doesn't respond at all, even if it received an indication throught the event_handler.

So is my code correct for using the conditional variable?

Please help with this issue.

Thanks a lot.

解决方案

这篇关于提升1.49条件变量问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆