缺少第一个字符使用boost :: asio :: streambuf [英] Missing the first characters using boost::asio::streambuf

查看:112
本文介绍了缺少第一个字符使用boost :: asio :: streambuf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用boost库在C ++中创建一个异步聊天服务器。



客户端断开连接的方法有两种:




  • 通过按Ctrl + C(停止客户端进程)

  • 输入exit。



前者是好的。然而,后者有一个问题。如果客户端与exit断开连接,则由另一个客户端发送的下一个消息将显示为不包含前几个字符。之后它就OK了。



例如:几个客户端聊天。其中一个与退出断开连接。之后,另一个客户端发送0123456789abcdefghijk,所有客户端只接收:abcdefghijk。我不知道在哪里的问题,我想这是关于streambuf的东西。我发现类似的问题(几乎相同),但在C#。



这里的代码:

 #include< iostream> 
#include< list>
#include< map>
#include< queue>
#include< vector>
#include< cstdlib>
#include< ctime>

#include< boost / thread.hpp>
#include< boost / bind.hpp>
#include< boost / asio.hpp>
#include< boost / asio / ip / tcp.hpp>

using namespace std;
using namespace boost :: asio;
using namespace boost :: asio :: ip;

typedef boost :: shared_ptr< tcp :: socket> socket_ptr;
typedef boost :: shared_ptr< string> string_ptr;
typedef boost :: shared_ptr< list< socket_ptr> > clientList_ptr;
typedef boost :: shared_ptr< list< string> > nameList_ptr;

const string waitingMsg(Waiting for clients ... \\\
);
const string totalClientsMsg(客户总数:);
const string errorReadingMsg(Error on reading:);
const string errorWritingMsg(写错误:);
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;

io_service服务;
tcp :: acceptor acceptor(service,tcp :: endpoint(tcp :: v4(),30001));
boost :: mutex mtx;
clientList_ptr clientList(new list< socket_ptr>);
nameList_ptr nameList(new list< string>);
boost :: asio :: streambuf buff;
time_t timer;

void ready();
void accepting();
void askForName(socket_ptr clientSock,const boost :: system :: error_code& error);
void receiveName(socket_ptr clientSock,const boost :: system :: error_code& error,
std :: size_t bytes_transferred);
void identify(socket_ptr clientSock,const boost :: system :: error_code& error,std :: size_t bytes_transferred);
void accepted(socket_ptr clientSock,string_ptr name);
void receiveMessage(socket_ptr clientSock,string_ptr name);
void received(socket_ptr clientSock,string_ptr name,const boost :: system :: error_code& error,
std :: size_t bytes_transferred);
bool extract(string_ptr message,std :: size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock,string_ptr name,const boost :: system :: error_code& error);
void writeMessage(socket_ptr clientSock,string_ptr message);
void responseSent(const boost :: system :: error_code& error);
void notification(socket_ptr sock,string_ptr name,const string headOfMsg,const string tailOfMsg);


int main(int argc,char * argv [])
{
try
{
vector< boost :: shared_ptr< boost :: thread> >线程;

ready();
for(int i = 0; i {
boost :: shared_ptr< boost :: thread> t(new boost :: thread(boost :: bind(& io_service :: run,& service)));
threads.push_back(t);
}
for(int i = 0; i {
threads [i] - > join
}
}
catch(std :: exception& error)
{
cerr< error.what()<< endl;
}
return 0;
}

void ready()
{
cout<等待
accepting();
}

void accepting()
{
socket_ptr clientSock(new tcp :: socket(service));
acceptor.async_accept(* clientSock,boost :: bind(& askForName,clientSock,boost :: asio :: placeholder :: error));
}

void askForName(socket_ptr sock,const boost :: system :: error_code& error)
{
if(error)
{
cerr<< 接受时出错:<< error.message()<< endl;
}
boost :: asio :: async_write(* sock,buffer(请输入您的名字:\\\
),
boost :: bind(& receiveName,sock, boost :: asio :: placeholder :: error,
boost :: asio :: placeholders :: bytes_transferred));

accepting();
}

void receiveName(socket_ptr sock,const boost :: system :: error_code& error,
std :: size_t bytes_transferred)
{
if (error)
{
cerr<< errorWritingMsg<< error.message()<< endl;
}
boost :: asio :: async_read_until(* sock,buff,'\\\
',
boost :: bind(& identify,sock,boost :: asio :: placeholders :: error,
boost :: asio :: placeholders :: bytes_transferred));
}

void identify(socket_ptr sock,const boost :: system :: error_code& error,
std :: size_t bytes_transferred)
{
if (错误)
{
if(error.value()!= EOF_ERROR_CODE)
{
cerr< errorReadingMsg<< error.message()<< endl;
}
return;
}

string_ptr name(new string());
if(!extract(name,bytes_transferred))
{
return;
}

if(find(nameList-> begin(),nameList-> end(),* name)!= nameList-> end())
{
boost :: asio :: async_write(* sock,buffer(这个名字已被使用!请选择另一个名称:\\\
),
boost :: bind(& receiveName ,sock,boost :: asio :: placeholders :: error,
boost :: asio :: placeholders :: bytes_transferred));
return;
}

nameList-> emplace_back(* name);

accepted(sock,name);
}

void accept(socket_ptr sock,string_ptr name)
{
mtx.lock();
clientList-> emplace_back(sock);
mtx.unlock();
通知(sock,name,New client:,joined);

receiveMessage(sock,name);
}

void receiveMessage(socket_ptr sock,string_ptr name)

{
boost :: asio :: async_read_until(* sock,buff,'\\ \\ n',boost :: bind(& received,sock,name,boost :: asio :: placeholders :: error,
boost :: asio :: placeholders :: bytes_transferred));
}

void receive(socket_ptr sock,string_ptr name,const boost :: system :: error_code& error,
std :: size_t bytes_transferred)
{
if(error)
{
if(error.value()!= EOF_ERROR_CODE)
{
cerr< errorReadingMsg<< error.message()<< endl;
}
disconnectClient(sock,name,error);
return;
}

if(!clientList-> empty())
{
//mtx.lock();

string_ptr message(new string());

if(!extract(message,bytes_transferred))
{
//mtx.unlock();
disconnectClient(sock,name,error);
return;
}

* message = * name +:+ * message +\\\
;
cout<< ChatLog:< * message<< endl;

writeMessage(sock,message);
receiveMessage(sock,name);

//mtx.unlock();
}
}

bool extract(string_ptr message,std :: size_t bytes_transferred)
{
mtx.lock();
buff.commit(bytes_transferred);
std :: istream istrm(& buff);
//mtx.unlock();

std :: getline(istrm,* message);
buff.consume(buff.size());

string_ptr msgEndl(new string(* message +\\\
));
mtx.unlock();
if(clientSentExit(msgEndl))
{
return false;
}
return true;
}

bool clientSentExit(string_ptr message)
{
return message-> compare(0,5,exit\\\
)== 0;
}

void disconnectClient(socket_ptr sock,string_ptr name,const boost :: system :: error_code& error)
{
boost :: system :: error_code ec = error;
auto position = find(clientList-> begin(),clientList-> end(),sock);
auto namePos = find(nameList-> begin(),nameList-> end(),* name);

sock-> shutdown(tcp :: socket :: shutdown_both,ec);
if(ec)
{
cerr<< 关闭时出错:< ec.message()<< endl;
}
sock-> close(ec);
if(ec)
{
cerr<< 关闭时出错:< ec.message()<< endl;
}
clientList-> erase(position);
nameList-> erase(namePos);

通知(sock,name,,disconnected。
}

void writeMessage(socket_ptr sock,string_ptr message)
{
for(auto& cliSock:* clientList)
{
if (cliSock-> is_open()&& cliSock!= sock)
{
boost :: asio :: async_write(* cliSock,buffer(* message),
boost :: bind(& responseSent,boost :: asio :: placeholders :: error));
}
}
}

void responseSent(const boost :: system :: error_code& error)
{
if
{
cerr<< 写错误:<< error.message()<< endl;
}
}

void notification(socket_ptr sock,string_ptr name,const string headOfMsg,const string tailOfMsg)
{
string_ptr serviceMsg headOfMsg + * name + tailOfMsg));
cout<< * serviceMsg<< totalClientsMsg < clientList-> size()<< endl;

* serviceMsg = * serviceMsg +\\\
;
writeMessage(sock,serviceMsg);

cout<<等待
}

很有趣的是,我有类似的同步服务器使用streambuf ,但没有这样的问题。

解决方案

boost :: asio :: async_read_until()可以读取任何字符数量streambuf后\\\
。然后它给出bytes_transferred,它是第一行中的字符计数(不一定是读入缓冲区的字符数)。请参见文档。 p>

只要你保持缓冲区变量不变,下一个boost :: asio :: async_read_until()将首先从缓冲区读取字符,然后从套接字读取字符。



在我看来,你使用getline()从缓冲区读取一行,这是正确的。之后,您呼叫

  buff.consume(buff.size()); 

清除缓冲区,删除您可能收到的部分行的所有信息。您读取的第一个完整行已通过getline()从缓冲区中删除,因此在任何情况下,consume()调用都是不必要的。



()调用不会解决你的问题,因为你似乎使用一个缓冲区,所有客户端之间共享,你不知道什么部分数据是从哪个客户端。一个可能的解决方案可能是创建一个缓冲区列表(每个客户端一个),就像你有一个套接字列表。然后boost :: asio :: async_read_until()和getline()将处理部分数据,你不必考虑这一点。


I make an asynchronous chat server in C++ using the boost library. Almost everything works fine.

There are two ways for a client to disconnect:

  • by pressing Ctrl + C (killing the client process)
  • by entering "exit".

The former is OK. However, the latter has a problem. If a client disconnects with "exit", the next message, sent by another client, appears without the first several characters. After that it's OK.

For example: Several clients chat. One of them disconnects with "exit". After that, another client sends "0123456789abcdefghijk" and all clients receive only: "abcdefghijk". I don't know where's the problem, I guess it's something about streambuf. I found similar problem (almost the same) but in C#.

Here's the code:

#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>

#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>

using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;

typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;

const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;

io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;

void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
        std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);


int main(int argc, char* argv[])
{
    try
    {
        vector<boost::shared_ptr<boost::thread> > threads;

        ready();
        for (int i = 0; i < THREADS; i++)
        {
            boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
            threads.push_back(t);
        }
        for (int i = 0; i < THREADS; i++)
        {
            threads[i]->join();
        }
    }
    catch (std::exception& error)
    {
        cerr << error.what() << endl;
    }
    return 0;
}

void ready()
{
    cout << waitingMsg;
    accepting();
}

void accepting()
{
    socket_ptr clientSock(new tcp::socket(service));
    acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}

void askForName(socket_ptr sock, const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on accepting: " << error.message() << endl;
    }
    boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
            boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

    accepting();
}

void receiveName(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if (error)
    {
        cerr << errorWritingMsg << error.message() << endl;
    }
    boost::asio::async_read_until(*sock, buff, '\n',
            boost::bind(&identify, sock, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}

void identify(socket_ptr sock, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        return;
    }

    string_ptr name(new string(""));
    if (!extract(name, bytes_transferred))
    {
        return;
    }

    if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
    {
        boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
                    boost::bind(&receiveName, sock, boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
        return;
    }

    nameList->emplace_back(*name);

    accepted(sock, name);
}

void accepted(socket_ptr sock, string_ptr name)
{
    mtx.lock();
    clientList->emplace_back(sock);
    mtx.unlock();
    notification(sock, name, "New client: ", " joined. ");

    receiveMessage(sock, name);
}

void receiveMessage(socket_ptr sock, string_ptr name)

{
    boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred));
}

void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
        std::size_t bytes_transferred)
{
    if(error)
    {
        if (error.value() != EOF_ERROR_CODE)
        {
            cerr << errorReadingMsg << error.message() << endl;
        }
        disconnectClient(sock, name, error);
        return;
    }

    if(!clientList->empty())
    {
        //mtx.lock();

        string_ptr message(new string(""));

        if(!extract(message, bytes_transferred))
        {
            //mtx.unlock();
            disconnectClient(sock, name, error);
            return;
        }

        *message = *name + ": " + *message + "\n";
        cout << "ChatLog: " << *message << endl;

        writeMessage(sock, message);
        receiveMessage(sock, name);

        //mtx.unlock();
    }
}

bool extract(string_ptr message, std::size_t bytes_transferred)
{
    mtx.lock();
    buff.commit(bytes_transferred);
    std::istream istrm(&buff);
    //mtx.unlock();

    std::getline(istrm, *message);
    buff.consume(buff.size());

    string_ptr msgEndl(new string(*message + "\n"));
    mtx.unlock();
    if(clientSentExit(msgEndl))
    {
        return false;
    }
    return true;
}

bool clientSentExit(string_ptr message)
{
    return message->compare(0, 5, "exit\n") == 0;
}

void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
    boost::system::error_code ec = error;
    auto position = find(clientList->begin(), clientList->end(), sock);
    auto namePos = find(nameList->begin(), nameList->end(), *name);

    sock->shutdown(tcp::socket::shutdown_both, ec);
    if (ec)
    {
        cerr << "Error on shutting: " << ec.message() << endl;
    }
    sock->close(ec);
    if(ec)
    {
        cerr << "Error on closing: " << ec.message() << endl;
    }
    clientList->erase(position);
    nameList->erase(namePos);

    notification(sock, name, "", " disconnected. ");
}

void writeMessage(socket_ptr sock, string_ptr message)
{
    for(auto& cliSock : *clientList)
    {
        if (cliSock->is_open() && cliSock != sock)
        {
            boost::asio::async_write(*cliSock, buffer(*message),
                    boost::bind(&responseSent, boost::asio::placeholders::error));
        }
    }
}

void responseSent(const boost::system::error_code& error)
{
    if (error)
    {
        cerr << "Error on writing: " << error.message() << endl;
    }
}

void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
    string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
    cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;

    *serviceMsg = *serviceMsg + "\n";
    writeMessage(sock, serviceMsg);

    cout << waitingMsg;
}

It's interesting that I have similar synchronous server with the same way of using of streambuf, but there are no such problems.

解决方案

boost::asio::async_read_until() can read any amount of characters to streambuf after \n. It then gives you bytes_transferred, which is count of characters in the first line (not necessarily the count of characters that were read to the buffer). See documentation.

As long as you keep your buffer variable intact, next boost::asio::async_read_until() will read characters first from the buffer and then from the socket.

It seems to me that you read a line from the buffer using getline(), which is correct. After that, you call

buff.consume(buff.size());

which clears the buffer, removing all information about the partial lines you may have received. The first complete line that you read has already been removed from the buffer by getline(), so the consume() call is unnecessary in any case.

Just removing the consume() call would not solve your problem, because you seem to use one buffer that is shared between all clients, and you would not know what partial data was from which client. A possible solution could be creating a list of buffers (one for each client), just like you have a list of sockets. Then boost::asio::async_read_until() and getline() would take care of handling the partial data, and you wouldn't have to think about that.

这篇关于缺少第一个字符使用boost :: asio :: streambuf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆