多个背靠背的boost :: ASIO async_send_to调用会导致缓冲区OVERUN [英] multiple back to back boost::asio async_send_to calls cause buffer overun

查看:331
本文介绍了多个背靠背的boost :: ASIO async_send_to调用会导致缓冲区OVERUN的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有发送多个背靠背使用boost ASIO独立的UDP缓冲区的一个问题。我有1秒的定时器ASIO触发该通过UDP传输2个独立的UDP数据报结构的回调。每个这些消息的结构通过的std ::分配的unique_ptr,因此他们不应该由异步CADaemon :: handle_send回调被调用的时候走出去的范围。

 无效
CADaemon :: heartBeatTimer(
    常量毫秒&安培; rHeartBeatMs)
{
    mpStatusTimer-> expires_from_now(rHeartBeatMs);
    mpStatusTimer-> async_wait(的boost ::绑定(
        &安培; CADaemon :: heartBeatTimer,
        此,rHeartBeatMs));
    如果(mpALBFSocket&安培;&安培; mpALBFEndpoint){
        mpALBFSocket-> async_send_to(
            缓冲液(mpStatusMessage.get(),
                的sizeof(MemberSystemStatusMessage))
            * mpALBFEndpoint,
            提高::绑定(安培; CADaemon :: handle_send,为此,
                提高:: ASIO ::占位符::错误
                提高:: ASIO ::占位符:: bytes_transferred));
        //必须插入延迟以prevent缓冲覆写
        的std :: this_thread :: sleep_for(性病::时辰::毫秒(10);        //心跳消息也被发送到这个插座/端点
        mpALBFSocket-> async_send_to(
            缓冲液(mpHeartbeatMessage.get(),
                的sizeof(CAServiceHeartbeatMessage))
            * mpALBFEndpoint,
            提高::绑定(安培; CADaemon :: handle_send,为此,
                提高:: ASIO ::占位符::错误
                提高:: ASIO ::占位符:: bytes_transferred));
    }
}

接收应用程序的工作,如果我把一个小的延迟,发送的第一条消息和第二条消息,但是如果我给他们,因为他们之间,似乎第二个缓冲区覆盖第一个通过它到达的时间接收应用程序。

我在做什么错误?

我也试图与下面的code发送多个缓冲区,但是,由于它会合并这两个数据报一个长的数据报表现糟糕。

 无效
CADaemon :: heartBeatTimer(
    常量毫秒&安培; rHeartBeatMs)
{
    mpStatusTimer-> expires_from_now(rHeartBeatMs);
    mpStatusTimer-> async_wait(的boost ::绑定(
        &安培; CADaemon :: heartBeatTimer,
        此,rHeartBeatMs));
    如果(mpALBFSocket&安培;&安培; mpALBFEndpoint){
        的std ::矢量<提高:: ASIO :: const_buffer> transmitBuffers;
        transmitBuffers.push_back(缓冲(
            mpStatusMessage.get(),
            的sizeof(MemberSystemStatusMessage)));
        //transmitBuffers.push_back(缓冲(
        // mpHeartbeatMessage.get(),
        // sizeof的(CAServiceHeartbeatMessage)));
        mpALBFSocket-> async_send_to(
            transmitBuffers,* mpALBFEndpoint,
            提高::绑定(安培; CADaemon :: handle_send,为此,
                提高:: ASIO ::占位符::错误
                提高:: ASIO ::占位符:: bytes_transferred));
    }
}

下面是所涉及的ASIO从相关的头文件类的成员。

  //这个消息被传@ 1HZ
的std ::的unique_ptr< MemberSystemStatusMessage> mpStatusMessage;
//这个消息被传@ 1HZ
的std ::的unique_ptr< CAServiceHeartbeatMessage> mpHeartbeatMessage;
//收到此消息@ 1HZ
的std ::的unique_ptr< WOperationalSupportMessage> mpOpSupportMessage;
//收到此消息@ 1HZ时有效
的std ::的unique_ptr< MaintenanceOTPMessage> mpOTPMessage;的std :: shared_ptr的<提高:: ASIO :: io_service对象> mpIOService;
的std ::的unique_ptr<提高:: ASIO ::知识产权:: UDP ::插座> mpALBFSocket;
的std ::的unique_ptr<的boost ::支持ASIO ::知识产权:: UDP ::端点> mpALBFEndpoint;
的std ::的unique_ptr<提高:: ASIO ::知识产权:: UDP ::插座> mpServerSocket;
的std ::的unique_ptr<的boost ::支持ASIO ::知识产权:: UDP ::端点> mpServerEndpoint;
的std ::的unique_ptr<提高:: ASIO :: steady_timer> mpStatusTimer;
的std ::的unique_ptr< uint8_t有[]>米preceiveBuffer;

这是回调处理程序

 无效
CADaemon :: handle_send(
    常量的boost ::系统::错误_ code&放;错误,
    的std ::为size_t bytes_transferred)
{
    静态自动和放大器; gEvtLog = gpLogger-> getLoggerRef(
        记录仪:: LogDest ::事件日志);
    如果(!错误||(错误==提高:: ASIO ::错误:: message_size)){
        //临界区 - 独占写
        提高:: unique_lock<提高:: shared_mutex> uniqueLock(gRWMutexGuard);
        LOG_EVT_INFO(gEvtLog)LT;< * mpStatusMessage;
        LOG_EVT_INFO(gEvtLog)LT;< * mpHeartbeatMessage;
        LOG_EVT_INFO(gEvtLog)LT;< 已发送<< bytes_transferred<< 字节;
        mpStatusMessage-> incrementSequenceCounter();
    }其他{
        LOG_EVT_ERROR(gEvtLog)LT;< handle_send:ASIO错误code [
            << error.value()&所述;&下; ];
    }
}

编辑:添加了缓冲器遭到破坏接收JAVA应用code

下面的$ C $ 3c示出在接收的Java应用程序中的code,注意到,sizeof的所接收的数据报从不损坏,只是内容,大小似乎永远是,较长的数据报。希望这是有用的帮助追踪问题。

  @覆盖
    保护任务<无效>的CreateTask(){
        返回新任务<无效>(){
            @覆盖
            保护无效()调用抛出异常{
                updateMessage(运行...);
                尝试{
                    DatagramSocket类的ServerSocket =新的DatagramSocket(mPortNum);
                    //用于接收数据报分配空间
                    字节[]字节=新的字节[1024];
                    DatagramPacket类包=新的DatagramPacket(字节,bytes.length);
                    而(!isCancelled()){
                        serverSocket.receive(包);
                        INT =与BytesReceived packet.getLength();
                        MemberSystemStatusMessage statusMessage =
                            新MemberSystemStatusMessage();
                        INT statusMessageSize = statusMessage.size();
                        CAServiceHeartbeatMessage heartbeatMessage =
                            新CAServiceHeartbeatMessage();
                        INT heartbeatMessageSize = heartbeatMessage.size();
                        如果(Platform.isFxApplicationThread()){
                            如果(与BytesReceived == statusMessage.size()){
                                statusMessage.setByteBuffer(ByteBuffer.wrap(字节),0);
                                setMemberSystemMessage(statusMessage);
                            }否则如果(与BytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(字节),0);
                                setHeartbeatMessage(heartbeatMessage);
                            }其他{
                                的System.out.println(意外的数据报);
                            }
                        }其他{//更新后的FxApplicationThread
                            如果(与BytesReceived == statusMessage.size()){
                                statusMessage.setByteBuffer(ByteBuffer.wrap(字节),0);
                                Platform.runLater(() - > setMemberSystemMessage(statusMessage));
                            }否则如果(与BytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(字节),0);
                                Platform.runLater(() - > setHeartbeatMessage(heartbeatMessage));
                            }其他{
                                的System.out.println(意外的数据报);
                            }
                        }
                    }
                }赶上(例外前){
                    的System.out.println(ex.getMessage());
                }
                updateMessage(取消);
                返回null;
            }
        };
    }
}


解决方案

在code看起来不错,只要缓冲区的大小是正确的,直到处理程序要求的缓冲区底层的内存仍然有效。一种可以安全地启动多个非组成异步操作,如<一个href=\"http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/basic_raw_socket/async_send_to/overload2.html\"相对=nofollow> async_send_to() ,对于给定的I / O对象。虽然,这是未具体说明以中,这些操作将执行该命令。

该接收器应用程序有一个单一的共享字节数组,数据报读。如果接收到两个数据包,并出现两种读操作,那么缓冲区将包含最后一次读取数据包的内容。基于在presented code,这可能是由于的Runnable S中的将在未来的一个不确定的时间调用创建一个竞争状态。例如,考虑当两个数据包被发送的情况下,第一容纳一个系统消息和含有心跳消息的第二个。在下面的code:

字节[]字节=新的字节[1024];
DatagramPacket类包=新的DatagramPacket(字节,bytes.length);
而(...)
{
  serverSocket.receive(包);
  INT =与BytesReceived packet.getLength();
  MemberSystemStatusMessage statusMessage = ...;
  CAServiceHeartbeatMessage heartbeatMessage = ...;
  如果(与BytesReceived == statusMessage.size())
  {
    statusMessage.setByteBuffer(ByteBuffer.wrap(字节),0);
    Platform.runLater(() - &GT; setMemberSystemMessage(statusMessage));
  }
  ...
}

在while循环的第一次迭代之后,字节包含状态消息和 statusMessage 对象是指在字节缓冲区。可运行已计划在未来的一个未指定时间运行。在阅读第二个数据包,则字节缓冲区包含心跳消息。可运行现在运行,通过了 statusMessage 对象 setMemberSystemMessage();然而,它的底层缓冲区现在包含一个心跳消息。要解决此问题,可以考虑深复制字节数组时,延迟执行需要发生:

 如果(==与BytesReceived statusMessage.size())
{
  字节[] = bytes_copy Arrays.copyOf(字节,与BytesReceived);
  statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy),0);
  Platform.runLater(() - &GT; setMemberSystemMessage(statusMessage));
}

替代地,人们可以使用一个新的缓冲区的每个读操作

有也可以是与底层协议的期望的问题。 UDP被称为不可靠的协议,因为它不提供通知给发送者,以数据报传递。每个 async_send_to()操作将导致长达一个数据报传输。在完成处理程序的状态表示,如果数据被写入,并暗示在如果如果数据报已被接收任何状态。这个拥有真实的,即使是通过<所提供的多个缓冲区href=\"http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.buffers_and_scatter_gather_i_o\"相对=nofollow>分散收集I / O 。这样,场景中的问题,其中,两个 async_send_to()操作发起的,但接收者只接收单个数据报,由协议允许所述。应用协议应考虑这种行为。例如,而不是报告单心跳截止日期后的错误已经错过,收件人可一旦错过心跳期限的连续数已超出阈值报告错误。添加写入之间有一个小的延迟提供不保证该协议的行为。

I am having a problem with sending multiple back to back separate UDP buffers using boost asio. I have a 1 second asio timer that fires a callback that transmits a 2 separate UDP datagram structures over udp. Each of the these messages structures is allocated via std::unique_ptr, so they should not go out of scope by the time the async CADaemon::handle_send callback is called.

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        mpALBFSocket->async_send_to(
            buffer(mpStatusMessage.get(),
                sizeof(MemberSystemStatusMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));


        // must insert delay to prevent buffer overwrites
        std::this_thread::sleep_for(std::chrono::milliseconds(10);

        // heartbeat messages are also sent to this socket/endpoint
        mpALBFSocket->async_send_to(
            buffer(mpHeartbeatMessage.get(),
                sizeof(CAServiceHeartbeatMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

The receiving application works if I put a small delay in between sending the first message and the second message, however, if I send them as they are, it appears that the second buffer overwrites the first by the time it arrives in the receiving application.

What am I doing incorrectly?

I also tried sending multiple buffers with the code below, however that behaves worse as it coalesces both datagrams as a single long datagram.

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        std::vector<boost::asio::const_buffer> transmitBuffers;
        transmitBuffers.push_back(buffer(
            mpStatusMessage.get(), 
            sizeof(MemberSystemStatusMessage)));
        //transmitBuffers.push_back(buffer(
        //    mpHeartbeatMessage.get(), 
        //    sizeof(CAServiceHeartbeatMessage)));
        mpALBFSocket->async_send_to(
            transmitBuffers, *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

Here are the Members of the class that are involved in the ASIO from the associated header file.

// this message is transmitted @1HZ
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage;
// this message is transmitted @1HZ
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage;
// this message is received @1HZ
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage;
// this message is received @1HZ when valid
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage;

std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint;
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;

This is the callback handler

void
CADaemon::handle_send(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    if (!error || (error == boost::asio::error::message_size)) {
        // Critical Section - exclusive write
        boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
        LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
        LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
        LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes";
        mpStatusMessage->incrementSequenceCounter();
    } else {
        LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
            << error.value() << "]";
    }
}

EDIT:ADDED THE RECEIVING JAVA APPLICATION CODE WITH THE BUFFER CORRUPTION

The code below shows the code in the receiving java application, noted that the sizeof the received datagram is never corrupted, just the contents, the size seems to always be that of the longer datagram. Hope this is useful to help track down the problem.

    @Override
    protected Task<Void> createTask() {
        return new Task<Void>() {
            @Override
            protected Void call() throws Exception {
                updateMessage("Running...");
                try {
                    DatagramSocket serverSocket = new DatagramSocket(mPortNum);
                    // allocate space for received datagrams
                    byte[] bytes = new byte[1024];
                    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);                    
                    while (!isCancelled()) {                    
                        serverSocket.receive(packet);
                        int bytesReceived = packet.getLength();
                        MemberSystemStatusMessage statusMessage = 
                            new MemberSystemStatusMessage();
                        int statusMessageSize = statusMessage.size();
                        CAServiceHeartbeatMessage heartbeatMessage = 
                            new CAServiceHeartbeatMessage();
                        int heartbeatMessageSize = heartbeatMessage.size();
                        if (Platform.isFxApplicationThread()) {
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setMemberSystemMessage(statusMessage);
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setHeartbeatMessage(heartbeatMessage);
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        } else { // update later in FxApplicationThread
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setMemberSystemMessage(statusMessage));
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage));
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        }
                    }
                } catch (Exception ex) {
                    System.out.println(ex.getMessage());
                }
                updateMessage("Cancelled");
                return null;
            } 
        };
    }
}

解决方案

The code looks fine as long as the size of the buffers are correct and the underlying memory for the buffers remains valid until the handler is called. One can safely initiate multiple non-composed asynchronous operations, such as async_send_to(), for a given I/O object. Albeit, it is unspecified as to the order in which these operations will execute.

The receiver application has a single shared byte array into which datagrams are read. If two datagrams are received, and two read operations occur, then the buffer will contain the contents of the last read datagram. Based on the presented code, this can create a race condition due to the Runnables that will be invoked at an unspecified time in the future. For example, consider the scenario where two datagrams are sent, the first containing a system message and the second containing a heartbeat message. In the following code:

byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
while (...)
{
  serverSocket.receive(packet);
  int bytesReceived = packet.getLength();
  MemberSystemStatusMessage statusMessage =  ...;
  CAServiceHeartbeatMessage heartbeatMessage =  ...;
  if (bytesReceived == statusMessage.size())
  {
    statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
    Platform.runLater(() -> setMemberSystemMessage(statusMessage));
  }
  ...
}

After the first iteration of the while-loop, bytes contains a status message and the statusMessage object refers to the bytes buffer. A Runnable has been scheduled to run at an unspecified time in the future. Upon reading the second datagram, the bytes buffer contains a heartbeat message. The Runnable now runs, passing the statusMessage object to setMemberSystemMessage(); however, its underlying buffer now contains a heartbeat message. To resolve this problem, consider deep-copying the byte array when deferred execution needs to occur:

if (bytesReceived == statusMessage.size())
{
  byte[] bytes_copy = Arrays.copyOf(bytes, bytesReceived);
  statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy), 0);
  Platform.runLater(() -> setMemberSystemMessage(statusMessage));
}

Alternatively, one could use a new buffer for each read operation.

There may also be problems with expectations of the underlying protocol. UDP is referred to as an unreliable protocol as it does not provide notifications to the sender as to the delivery of the datagram. Each async_send_to() operation will result in up to one datagram being transmitted. The completion handler's status indicates if the data was written, and implies no status on if if the datagram has been received. This holds true even if multiple buffers are provided via scatter-gather I/O. As such, the scenario described in the question, where two async_send_to() operations are initiated, but the recipient only receives a single datagram, is permitted by the protocol. The application protocol should account for this behavior. For instance, instead of reporting an error after a single heartbeat deadline has been missed, the recipient may report an error once a consecutive number of missed heartbeat deadlines have exceeded a threshold. Adding a small delay between writes provides no guarantees as to the behavior of the protocol.

这篇关于多个背靠背的boost :: ASIO async_send_to调用会导致缓冲区OVERUN的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆