Boost ASIO:向所有连接的客户端发送消息 [英] Boost ASIO: Send message to all connected clients

查看:205
本文介绍了Boost ASIO:向所有连接的客户端发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个涉及boost::beast websocket/http混合服务器的项目,该服务器在boost::asio之上运行.我的项目很大程度上基于 advanced_server.cpp 示例源.

I'm working on a project that involves a boost::beast websocket/http mixed server, which runs on top of boost::asio. I've heavily based my project off the advanced_server.cpp example source.

它工作正常,但是现在我正尝试添加一项功能,该功能要求向所有个连接的客户端发送消息.

It works fine, but right now I'm attempting to add a feature that requires the sending of a message to all connected clients.

我对boost::asio不太熟悉,但是现在我看不到有什么办法可以进行广播"事件(即使这是正确的术语).

I'm not very familiar with boost::asio, but right now I can't see any way to have something like "broadcast" events (if that's even the correct term).

我幼稚的方法是看我是否可以将websocket_session()的构造附加到类似事件侦听器的功能,而析构函数则分离侦听器.到那时,我可以触发该事件,并让所有当前有效的websocket会话(websocket_session()的生存期限定在此范围内)执行回调.

My naive approach would be to see if I can have the construction of websocket_session() attach something like an event listener, and the destructor detatch the listener. At that point, I could just fire the event, and have all the currently valid websocket sessions (to which the lifetime of websocket_session() is scoped) execute a callback.

https://stackoverflow.com/a/17029022/268006 ,它或多或少地起到了我的作用想要通过(ab)使用boost::asio::steady_timer进行操作,但这似乎是一种骇人听闻的技巧,无法完成应该很简单的事情.

There is https://stackoverflow.com/a/17029022/268006, which does more or less what I want by (ab)using a boost::asio::steady_timer, but that seems like a kind of horrible hack to accomplish something that should be pretty straightforward.

基本上,给定有状态的boost::asio服务器,如何对多个连接进行操作?

Basically, given a stateful boost::asio server, how can I do an operation on multiple connections?

推荐答案

首先,您可以广播UDP,但不能广播到已连接的客户端.那只是... UDP.

First off: You can broadcast UDP, but that's not to connected clients. That's just... UDP.

第二,该链接显示了如何在Asio中具有类似于条件变量(事件)的接口.那只是您问题的一小部分.您忘记了全局:您需要了解一组打开的连接,一种或另一种方式:

Secondly, that link shows how to have a condition-variable (event)-like interface in Asio. That's only a tiny part of your problem. You forgot about the big picture: you need to know about the set of open connections, one way or the other:

  1. 例如保留一个指向每个连接的会话指针(weak_ptr)容器
  2. 每个连接都订阅一个信号插槽(例如增强信号).
  3. li>
  1. e.g. keeping a container of session pointers (weak_ptr) to each connection
  2. each connection subscribing to a signal slot (e.g. Boost Signals).

选项1.的性能很好,选项2.的灵活性更好(将事件源与订阅者分离,从而可以拥有异构订阅者,例如,不具有来自连接的订阅者).

Option 1. is great for performance, option 2. is better for flexibility (decoupling the event source from subscribers, making it possible to have heterogenous subscribers, e.g. not from connections).

因为我认为选项1比线程简单得多,所以更好.效率(例如,您可以从一个缓冲区为所有客户端提供服务而无需复制),并且您可能不需要双重解耦信号/插槽,让我参考一个答案,其中我已经针对纯Asio(不包含Beast)展示了很多:

Because I think Option 1. is much simpler w.r.t to threading, better w.r.t. efficiency (you can e.g. serve all clients from one buffer without copying) and you probably don't need to doubly decouple the signal/slots, let me refer to an answer where I already showed as much for pure Asio (without Beast):

它显示了连接池"的概念-本质上是具有某些垃圾回收逻辑的weak_ptr<connection>对象的线程安全容器.

It shows the concept of a "connection pool" - which is essentially a thread-safe container of weak_ptr<connection> objects with some garbage collection logic.

聊点事情之后,我想花点时间来实际演示这两种方法,所以完全清楚我在说什么.

After chatting about things I wanted to take the time to actually demonstrate the two approaches, so it's completely clear what I'm talking about.

首先让我们展示一个简单的,运行正常的异步TCP服务器,带有

First let's present a simple, run-of-the mill asynchronous TCP server with

  • 具有多个并发连接
  • 每个连接的会话逐行从客户端读取,并将其回显给客户端
  • 3秒后停止接受,并在最后一个客户端断开连接后退出

github上的master分支

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

  private:
    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             session->start();
             if (!ec)
                 accept_loop();
        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(3s);
    s.stop(); // active connections will continue

    th.join();
}

方法1.添加广播消息

因此,让我们添加同时发送到所有活动连接的广播消息".我们添加两个:

Approach 1. Adding Broadcast Messages

So, let's add "broadcast messages" that get sent to all active connections simultaneously. We add two:

  • 每次建立新连接时都会说一次(说玩家##已进入游戏")
  • 一个模拟全局服务器事件"的服务器,如您在问题中所述.它是从main内部触发的:

  • one at each new connection (saying "Player ## has entered the game")
  • one that emulates a global "server event", like you described in the question). It gets triggered from within main:

std::this_thread::sleep_for(1s);

auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";

请注意,我们如何通过为每个接受的连接注册一个弱指针并对其进行操作来实现此目的:

Note how we do this by registering a weak pointer to each accepted connection and operating on each:

    _acc.async_accept(session->_s, [this,session](error_code ec) {
         auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
         std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

         if (!ec) {
             auto n = reg_connection(session);

             session->start();
             accept_loop();

             broadcast("player #" + std::to_string(n) + " has entered the game\n");
         }

    });

broadcast也可以直接从main中使用,并且很简单:

broadcast is also used directly from main and is simply:

size_t broadcast(std::string const& msg) {
    return for_each_active([msg](connection& c) { c.send(msg, true); });
}

using-asio-post分支github

using-asio-post branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        return for_each_active([msg](connection& c) { c.send(msg, true); });
    }

  private:
    using connptr = std::shared_ptr<connection>;
    using weakptr = std::weak_ptr<connection>;

    std::mutex _mx;
    std::vector<weakptr> _registered;

    size_t reg_connection(weakptr wp) {
        std::lock_guard<std::mutex> lk(_mx);
        _registered.push_back(wp);
        return _registered.size();
    }

    template <typename F>
    size_t for_each_active(F f) {
        std::vector<connptr> active;
        {
            std::lock_guard<std::mutex> lk(_mx);
            for (auto& w : _registered)
                if (auto c = w.lock())
                    active.push_back(c);
        }

        for (auto& c : active) {
            std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
            f(*c);
        }

        return active.size();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}

方法2:广播但带有增强信号的那些人

信号方法是依赖反转的一个很好的例子.

Approach 2: Those Broadcast But With Boost Signals2

The Signals approach is a fine example of Dependency Inversion.

最重要的注意事项:

  • 信号插槽在调用它的线程上被调用(引发事件")
  • scoped_connection在那里,因此connection被破坏时 *自动删除订阅
  • 控制台消息的措辞上的细微差别从已到达#个活动的连接"到已到达#个活动的订户".
  • signal slots get invoked on the thread invoking it ("raising the event")
  • the scoped_connection is there so subscriptions are *automatically removed when the connection is destructed
  • there's subtle difference in the wording of the console message from "reached # active connections" to "reached # active subscribers".

区别是理解增加的灵活性的关键:信号所有者/调用者不了解有关订户的任何信息.那就是我们正在谈论的去耦/依赖倒置

using-signals2 github上的分支

using-signals2 branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;

    boost::signals2::scoped_connection _subscription;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        _broadcast_event(msg);
        return _broadcast_event.num_slots();
    }

  private:
    boost::signals2::signal<void(std::string const& msg)> _broadcast_event;

    size_t reg_connection(connection& c) {
        c._subscription = _broadcast_event.connect(
                [&c](std::string msg){ c.send(msg, true); }
            );

        return _broadcast_event.num_slots();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(*session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active subscribers\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}

请参见方法1和方法2之间的区别: 针对具有以下条件的3个并发客户端运行时的输出示例:

A sample of the output when run against 3 concurrent clients with:

(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)

这篇关于Boost ASIO:向所有连接的客户端发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆