提高:: ASIO ::插座线程安全 [英] boost::asio::socket thread safety

查看:162
本文介绍了提高:: ASIO ::插座线程安全的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(这是我原来的问题的一个简化版本)

( This is a simplified version of my original question )

我有几个线程写入提振ASIO插座。这似乎工作得很好,没有任何问题。

I have several threads that write to a boost asio socket. This seems to work very well, with no problems.

该文件说一个共享的插座不是线程安全的(<一个href=\"http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/ip__tcp/socket.html\">here,一路下跌的底部),所以我想知道我是否应该与互斥体,或者说保护插座。

The documentation says a shared socket is not thread safe( here, way down at the bottom ) so I am wondering if I should protect the socket with mutex, or something.

这<一个href=\"http://stackoverflow.com/questions/6941626/can-i-share-boostasiotcpsocket-object-between-2-threads-that-perform-read-a\">question坚持保护是必要的,但对如何做到这一点没有任何意见。

This question insists that protection is necessary, but gives no advice on how to do so.

所有的答案,我原来的问题还坚持认为我在做什么危险,而且最催促我async_writes甚至更复杂的东西来代替我写。但是,我不愿意这样做,因为这将复杂化code,它已经工作并没有回答者的说服我,他们知道他们洁具谈论 - 他们似乎已经阅读相同的文件,因为我并猜测,就像我了。

All the answers to my original question also insisted that what I was doing dangerous, and most urged me to replace my writes with async_writes or even more complicated things. However, I am reluctant to do this, since it would complicate code that is already working and none of the answerers convinced me they knew what they ware talking about - they seemed to have read the same documentation as I and were guessing, just as I was.

所以,我写了一个简单的程序来测试写入两个线程强调到一个共享的插座。

So, I wrote a simple program to stress test writing to a shared socket from two threads.

下面是服务器,它只是写出无论从客户端接收

Here is the server, which simply writes out whatever it receives from the client

int main()
{
    boost::asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));

    tcp::socket socket(io_service);
    acceptor.accept(socket);

    for (;;)
    {
        char mybuffer[1256];
        int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
        mybuffer[len] = '\0';
        std::cout << mybuffer;
        std::cout.flush();

    }

  return 0;
}

下面是客户端,它创建写入共享的插座一样快两个线程,因为他们可以

Here is the client, which creates two threads that write to a shared socket as fast as they can

boost::asio::ip::tcp::socket * psocket;

void speaker1()
{
    string msg("speaker1: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}
void speaker2()
{
    string msg("speaker2: hello, server, how are you running?\n");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}

int main(int argc, char* argv[])
{

    boost::asio::io_service io_service;

  // connect to server

    tcp::resolver resolver(io_service);
    tcp::resolver::query query("localhost", "3001");
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;
    psocket = new tcp::socket(io_service);
    boost::system::error_code error = boost::asio::error::host_not_found;
    while (error && endpoint_iterator != end)
    {
        psocket->close();
        psocket->connect(*endpoint_iterator++, error);
    }


    boost::thread t1( speaker1 );
    boost::thread t2( speaker2 );

    Sleep(50000);

}

这个作品!完美的,据我可以告诉。客户端不会崩溃。消息到达服务器没有garbles。他们通常交替到达,从每个线程。有时候,一个线程获得两个或三个消息之前其他,但我不认为这是一个问题,只要没有garbles所有的邮件到达。

This works! Perfectly, as far as I can tell. The client does not crash. The messages arrive at the server without garbles. They usually arrive alternately, one from each thread. Sometimes one thread get two or three messages in before the other, but I do not think this is a problem so long as there are no garbles and all the messages arrive.

我的结论:插座可能不是线程在某些理论意义的安全,但它是如此难以使其失败,我不会担心

My conclusion: the socket may not be thread safe in some theoretical sense, but it is so hard to make it fail that I am not going to worry about it.

推荐答案

重新研究了code为async_write后,我现在相信,任何写操作是线程安全的,当且仅当数据包大小比

After restudying the code for async_write I am now convinced that any write operation is thread safe if and only if the packet size is smaller than

default_max_transfer_size = 65536;

什么情况是,只要一个async_write被称为async_write_some被称为在同一个线程。在游泳池调用某种形式的io_service对象::运行的线程将继续呼吁该写操作async_write_some,直到它完成。

What happens is that as soon as an async_write is called an async_write_some is called in the same thread. Any threads in the pool calling some form of io_service::run will keep on calling async_write_some for that write operation until it completes.

这些async_write_some调用可以并且将交织,如果它已被调用一次以上(分组是大于65536大)

These async_write_some calls can and will interleave if it has to be called more than once (the packets are larger than 65536).

ASIO不排队写入一个插座,你会想到,一个精后对方。为了保证双方的主题交错安全写入考虑下面的一段code的:

ASIO does not queue writes to a socket as you would expect, one finishing after the other. In order to ensure both thread and interleave safe writes consider the following piece of code:

    void my_connection::async_serialized_write(
            boost::shared_ptr<transmission> outpacket) {
        m_tx_mutex.lock();
        bool in_progress = !m_pending_transmissions.empty();
        m_pending_transmissions.push(outpacket);
        if (!in_progress) {
            if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
                boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                        boost::asio::transfer_all(),
            boost::bind(&my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
            } else { // Send single buffer
                boost::asio::async_write(m_socket,
                                    boost::asio::buffer(
                                           m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
                boost::asio::transfer_all(),
                boost::bind(
                        &my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
            }
        }
        m_tx_mutex.unlock();
    }

    void my_connection::handle_async_serialized_write(
    const boost::system::error_code& e, size_t bytes_transferred) {
        if (!e) {
            boost::shared_ptr<transmission> transmission;
            m_tx_mutex.lock();
            transmission = m_pending_transmissions.front();
            m_pending_transmissions.pop();
            if (!m_pending_transmissions.empty()) {
                if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
            boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                    boost::asio::transfer_exactly(
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::bind(
                            &chreosis_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                } else { // Send single buffer
                    boost::asio::async_write(m_socket,
                    boost::asio::buffer(
                            m_pending_transmissions.front()->buffer_references.front(),
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::asio::transfer_all(),
                    boost::bind(
                            &my_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                }
            }
            m_tx_mutex.unlock();
            transmission->handler(e, bytes_transferred, transmission);
        } else {
            MYLOG_ERROR(
            m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
            stop(connection_stop_reasons::stop_async_handler_error);
        }
    }

这基本上使得一个队列一次发送一个数据包。 async_write只调用后的第一次写入成功,然后要求在第一次写入原来的处理程序。

This basically makes a queue for sending one packet at a time. async_write is called only after the first write succeeds which then calls the original handler for the first write.

如果ASIO取得写入队列每个插槽/流自动这本来是容易得多。

It would have been easier if asio made write queues automatic per socket/stream.

这篇关于提高:: ASIO ::插座线程安全的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆