缺少使用boost :: ASIO ::流缓冲的第一个字符 [英] Missing the first characters using boost::asio::streambuf

查看:137
本文介绍了缺少使用boost :: ASIO ::流缓冲的第一个字符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在做C ++使用Boost库异步聊天服务器。几乎一切工作正常。

I make an asynchronous chat server in C++ using the boost library. Almost everything works fine.

有两种方式为客户端断开

There are two ways for a client to disconnect:


  • 由pressing按Ctrl + C(查杀客户端进程)

  • 输入退出。

前者是确定。然而,后者有一个问题。如果客户端与退出,接下来的消息,由另一个客户端发送断开,没有出现前几个字符。之后,它的确定。

The former is OK. However, the latter has a problem. If a client disconnects with "exit", the next message, sent by another client, appears without the first several characters. After that it's OK.

例如:一些客户聊天。其中一人用退出断开连接。在此之后,另一个客户端发送0123456789abcdefghijk和所有客户端只接收ABCDEFGHIJK。我不知道在哪里的问题,我想这是一些关于流缓冲。我发现类似的问题(几乎相同),但在C#。

For example: Several clients chat. One of them disconnects with "exit". After that, another client sends "0123456789abcdefghijk" and all clients receive only: "abcdefghijk". I don't know where's the problem, I guess it's something about streambuf. I found similar problem (almost the same) but in C#.

这里的code:

#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>

#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>

using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;

typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;

const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;

io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;

void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
        std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);


int main(int argc, char* argv[])
{
    try
    {
        vector<boost::shared_ptr<boost::thread> > threads;

        ready();
        for (int i = 0; i < THREADS; i++)
        {
            boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
            threads.push_back(t);
        }
        for (int i = 0; i < THREADS; i++)
        {
            threads[i]->join();
        }
    }
    catch (std::exception& error)
    {
        cerr << error.what() << endl;
    }
    return 0;
}

void ready()
{
    cout << waitingMsg;
    accepting();
}

void accepting()
{
    socket_ptr clientSock(new tcp::socket(service));
    acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}

void askForName(socket_ptr sock, const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on accepting: " << error.message() << endl;
    }
    boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
            boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

    accepting();
}

void receiveName(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if (error)
    {
        cerr << errorWritingMsg << error.message() << endl;
    }
    boost::asio::async_read_until(*sock, buff, '\n',
            boost::bind(&identify, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}

void identify(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        return;
    }

    string_ptr name(new string(""));
    if (!extract(name, bytes_transferred))
    {
        return;
    }

    if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
    {
        boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
                    boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
        return;
    }

    nameList->emplace_back(*name);

    accepted(sock, name);
}

void accepted(socket_ptr sock, string_ptr name)
{
    mtx.lock();
    clientList->emplace_back(sock);
    mtx.unlock();
    notification(sock, name, "New client: ", " joined. ");

    receiveMessage(sock, name);
}

void receiveMessage(socket_ptr sock, string_ptr name)

{
    boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred));
}

void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        disconnectClient(sock, name, error);
        return;
    }

    if(!clientList->empty())
    {
        //mtx.lock();

        string_ptr message(new string(""));

        if(!extract(message, bytes_transferred))
        {
            //mtx.unlock();
            disconnectClient(sock, name, error);
            return;
        }

        *message = *name + ": " + *message + "\n";
        cout << "ChatLog: " << *message << endl;

        writeMessage(sock, message);
        receiveMessage(sock, name);

        //mtx.unlock();
    }
}

bool extract(string_ptr message, std::size_t bytes_transferred)
{
    mtx.lock();
    buff.commit(bytes_transferred);
    std::istream istrm(&buff);
    //mtx.unlock();

    std::getline(istrm, *message);
    buff.consume(buff.size());

    string_ptr msgEndl(new string(*message + "\n"));
    mtx.unlock();
    if(clientSentExit(msgEndl))
    {
        return false;
    }
    return true;
}

bool clientSentExit(string_ptr message)
{
    return message->compare(0, 5, "exit\n") == 0;
}

void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
    boost::system::error_code ec = error;
    auto position = find(clientList->begin(), clientList->end(), sock);
    auto namePos = find(nameList->begin(), nameList->end(), *name);

    sock->shutdown(tcp::socket::shutdown_both, ec);
    if (ec)
    {
        cerr << "Error on shutting: " << ec.message() << endl;
    }
    sock->close(ec);
    if(ec)
    {
        cerr << "Error on closing: " << ec.message() << endl;
    }
    clientList->erase(position);
    nameList->erase(namePos);

    notification(sock, name, "", " disconnected. ");
}

void writeMessage(socket_ptr sock, string_ptr message)
{
    for(auto& cliSock : *clientList)
    {
        if (cliSock->is_open() && cliSock != sock)
        {
            boost::asio::async_write(*cliSock, buffer(*message),
                    boost::bind(&responseSent, boost::asio::placeholders::error));
        }
    }
}

void responseSent(const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on writing: " << error.message() << endl;
    }
}

void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
    string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
    cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;

    *serviceMsg = *serviceMsg + "\n";
    writeMessage(sock, serviceMsg);

    cout << waitingMsg;
}

有趣的是,我有使用流缓冲的方式相同相似的同步服务器,但目前还没有这样的问题。

It's interesting that I have similar synchronous server with the same way of using of streambuf, but there are no such problems.

推荐答案

的boost :: ASIO :: async_read_until()可以读取任何字符数量\\ n后,流缓冲。然后它给你bytes_transferred,这是在第一行的字符计数(并不一定是被读出到缓冲区中的字符数)。见<一href=\"http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/async_read_until/overload1.html\"相对=nofollow>文档。

boost::asio::async_read_until() can read any amount of characters to streambuf after \n. It then gives you bytes_transferred, which is count of characters in the first line (not necessarily the count of characters that were read to the buffer). See documentation.

只要你保持你的缓冲区变量完好无损,旁边的boost :: ASIO :: async_read_until()将首先从缓冲区,然后读取的字符从插座上。

As long as you keep your buffer variable intact, next boost::asio::async_read_until() will read characters first from the buffer and then from the socket.

在我看来,你看从用缓冲函数getline(),这是正确的一条线。在这之后,你叫

It seems to me that you read a line from the buffer using getline(), which is correct. After that, you call

buff.consume(buff.size());

其清除缓冲区,消除你可能已经收到部分线路的所有信息。你读的第一个完整产品线已经由函数getline()缓冲区取出,所以消耗()调用是在任何情况下不必要的。

which clears the buffer, removing all information about the partial lines you may have received. The first complete line that you read has already been removed from the buffer by getline(), so the consume() call is unnecessary in any case.

刚取出消耗()调用不会解决你的问题,因为你似乎使用的所有客户端之间共享一个缓冲区,你不会知道的部分数据是从哪个客户端。一个可能的解决方案可以创建缓冲区的列表(每个客户端),就像你有插座的列表。然后提高:: ASIO :: async_read_until()和函数getline()将采取的处理部分数据关怀,你就不要再想想。

Just removing the consume() call would not solve your problem, because you seem to use one buffer that is shared between all clients, and you would not know what partial data was from which client. A possible solution could be creating a list of buffers (one for each client), just like you have a list of sockets. Then boost::asio::async_read_until() and getline() would take care of handling the partial data, and you wouldn't have to think about that.

这篇关于缺少使用boost :: ASIO ::流缓冲的第一个字符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆