如何避免使用asio :: ip :: tcp :: iostream`进行数据竞争? [英] How to avoid data race with `asio::ip::tcp::iostream`?

查看:90
本文介绍了如何避免使用asio :: ip :: tcp :: iostream`进行数据竞争?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用两个线程通过asio::ip::tcp::iostream发送和接收时,如何避免数据争用?

How do I avoid a data race when using two threads to send and receive over an asio::ip::tcp::iostream?

我正在编写一个使用asio::ip::tcp::iostream进行输入和输出的程序.该程序通过端口5555接受来自(远程)用户的命令,并通过该TCP连接将消息发送给该用户.因为这些事件(从用户收到的命令或发送给用户的消息)是异步发生的,所以我有单独的发送和接收线程.

I am writing a program that uses an asio::ip::tcp::iostream for input and output. The program accepts commands from the (remote) user over port 5555 and sends messages over that same TCP connection to the user. Because these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmit and receive threads.

在此玩具版本中,命令为一个",两个"和退出".当然,退出"退出程序.其他命令什么也不做,任何无法识别的命令都会导致服务器关闭TCP连接.

In this toy version, the commands are "one", "two" and "quit". Of course "quit" quits the program. The other commands do nothing, and any unrecognized command causes the server to close the TCP connection.

传输的消息是简单的序列号消息,每秒发送一次.

The transmitted messages are simple serial-numbered messages that are are sent once per second.

在这个玩具版本和我尝试编写的实际代码中,发送和接收过程都使用阻塞IO,因此似乎没有使用std::mutex或其他同步的好方法机制. (在我的尝试中,一个进程将抢占互斥对象然后阻塞,这将无法正常工作.)

In both this toy version and the real code I'm trying to write, the transmit and receive processes are both using blocking IO, so there doesn't appear to be a good way to use a std::mutex or other synchronization mechanism. (In my attempts, one process would grab the mutex and then block, which isn't going to work for this.)

要构建和测试它,我在64位Linux机器上使用gcc版本7.2.1和valgrind 3.13.构建:

To build and test this, I'm using gcc version 7.2.1 and valgrind 3.13 on a 64-bit Linux machine. Build:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

要测试,我使用以下命令运行服务器:

To test, I run the server with this command:

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

然后我在另一个窗口中使用telnet 127.0.0.1 5555创建与服务器的连接. helgrind正确指出的是存在数据争夺,因为runTxrunRx都试图异步访问同一流:

Then I use telnet 127.0.0.1 5555 in another window to create a connection to the server. What helgrind correctly points out is that there is a data race because both runTx and runRx are trying to access the same stream asynchronously:

== 16188 ==在线程#1以0x1FFEFFF1CC读取大小1期间可能发生数据争用

==16188== Possible data race during read of size 1 at 0x1FFEFFF1CC by thread #1

== 16188 ==持有的锁:无

==16188== Locks held: none

...删除了更多行

concurrent.cpp

#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream *in, std::ostream *out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream *in);
    int runRx(std::ostream *out);
    bool want_quit;
    bool want_reset;
};

int Console::runTx(std::istream *in) {
    static const std::array<std::string, 3> cmds{
        "quit", "one", "two", 
    };
    std::string command;
    while (!want_quit && !want_reset && *in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream *out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        (*out) << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        out->flush();
    }
    return 0;
}

int Console::run(std::istream *in, std::ostream *out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, out};
    int status = runTx(in);
    t1.join();
    return status;
}

int main()
{
    Console con;
    asio::io_service ios;
    // IPv4 address, port 5555
    asio::ip::tcp::acceptor acceptor(ios, 
            asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
    while (!con.getQuitValue()) {
        asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());
        con.run(&stream, &stream);
        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

推荐答案

是的,您正在共享作为流基础的套接字,而没有同步

Yeah, you're sharing the socket that underlies the stream, without synchronization

侧面注释,与布尔标志相同,可以通过更改轻松地固定":

Sidenote, same with the boolean flags, which can easily be "fixed" by changing:

std::atomic_bool want_quit;
std::atomic_bool want_reset;

如何解决

说实话,我认为没有一个好的解决方案.您自己说的:这些操作是异步的,因此,如果尝试同步执行操作,将会遇到麻烦.

How To Solve

To be honest, I don't think there is a good solution. You said it yourself: the operations are asynchronous, so you'll be in trouble if you try to do them synchronously.

您可以尝试考虑黑客行为.如果我们基于相同的基础套接字(文件描述符)创建了一个单独的流对象,该怎么办? 不会很容易,因为这样的流不是Asio的一部分.

You could try to think of hacks. What if we created a separate stream object based on the same underlying socket (filedescriptor). It's not going to be very easy as such a stream is not part of Asio.

但是我们可以使用Boost Iostreams破解一个

But we could hack one up using Boost Iostreams:

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

// .... later:

    // HACK: procure a _separate `ostream` to prevent the race, using the same fd
    namespace bio = boost::iostreams;
    bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
    bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

    con.run(stream, hack_ostream);

实际上这没有任何竞争(在同一套接字上同时读写

Indeed this runs without the race (simultaneous reads and writes on the same socket are fine, as long as you don't share the non-threadsafe Asio object(s) wrapping them).

不要这样做.太过分了您正在使事情复杂化,显然是为了避免使用异步代码.我会硬着头皮.

Don't do that. It's a kludge. You're complicating things, apparently in an attempt to avoid using asynchronous code. I'd bite the bullet.

将I/O机制从服务逻辑中排除出来并不需要太多工作.您最终将摆脱随机限制(可以考虑与多个客户打交道,可以完全不用任何线程 等来完成此任务.)

It's not too much work to factor the IO mechanics out from the service logic. You'll end up being free from random limitations (you could consider dealing with multiple clients, you could do without any threading at all etc.).

如果您想了解一些中间立场,请查看堆栈式协同程序(

If you would like to learn about some middle ground, look at stackful coroutines (http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)

仅供参考

请注意,我进行了重构以消除对指针的需要.您不会转让所有权,因此可以参考.如果您不知道如何将引用传递给bind/std::thread构造函数,则技巧就在您会看到的std::ref中.

Note I refactored to remove the need for pointers. You're not transferring ownership, so a reference will do. In case you didn't know how to pass the reference to a bind/std::thread constructor, the trick is in the std::ref you'll see.

[为了进行压力测试,我大大减少了延迟.]

[For stress testing I have greatly reduced the delays.]

在Coliru上直播

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream &in, std::ostream &out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream &in);
    int runRx(std::ostream &out);
    std::atomic_bool want_quit;
    std::atomic_bool want_reset;
};

int Console::runTx(std::istream &in) {
    static const std::array<std::string, 3> cmds{
        {"quit", "one", "two"}, 
    };
    std::string command;
    while (!want_quit && !want_reset && in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream &out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        out << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        out.flush();
    }
    return 0;
}

int Console::run(std::istream &in, std::ostream &out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, std::ref(out)};
    int status = runTx(in);
    t1.join();
    return status;
}

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

int main()
{
    Console con;
    boost::asio::io_service ios;

    // IPv4 address, port 5555
    boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});

    while (!con.getQuitValue()) {
        boost::asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());

        {
            // HACK: procure a _separate `ostream` to prevent the race, using the same fd
            namespace bio = boost::iostreams;
            bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
            bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

            con.run(stream, hack_ostream);
        }

        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

测试:

netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2

还有

commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)

无限期运行,偶尔会重置连接(发送命令三"时).

runs indefinitely, occasionally resetting the connection (when command "three" has been sent).

这篇关于如何避免使用asio :: ip :: tcp :: iostream`进行数据竞争?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆