当多个期货使用相同的基础套接字时,为什么不能将它们唤醒? [英] Why do I not get a wakeup for multiple futures when they use the same underlying socket?

查看:115
本文介绍了当多个期货使用相同的基础套接字时,为什么不能将它们唤醒?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有使用相同的本地UdpSocket将数据发送到多个UDP端点的代码:

I have code which sends data to multiple UDP endpoints using same local UdpSocket:

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::{
    future::Future,
    net::{Ipv4Addr, SocketAddr},
    pin::Pin,
    task::{Context, Poll},
};
use tokio::net::UdpSocket;

#[tokio::main]
async fn main() {
    let server_0: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12000).into();
    let server_2: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12002).into();
    let server_1: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12001).into();

    tokio::spawn(start_server(server_0));
    tokio::spawn(start_server(server_1));
    tokio::spawn(start_server(server_2));

    let client_addr: SocketAddr = (Ipv4Addr::UNSPECIFIED, 12004).into();
    let socket = UdpSocket::bind(client_addr).await.unwrap();

    let mut futs = FuturesUnordered::new();
    futs.push(Task::new(0, &socket, &server_0));
    futs.push(Task::new(1, &socket, &server_1));
    futs.push(Task::new(2, &socket, &server_2));

    while let Some(n) = futs.next().await {
        println!("Done: {:?}", n)
    }
}

async fn start_server(addr: SocketAddr) {
    let mut socket = UdpSocket::bind(addr).await.unwrap();
    let mut buf = [0; 512];
    loop {
        println!("{:?}", socket.recv_from(&mut buf).await);
    }
}

struct Task<'a> {
    value: u32,
    socket: &'a UdpSocket,
    addr: &'a SocketAddr,
}

impl<'a> Task<'a> {
    fn new(value: u32, socket: &'a UdpSocket, addr: &'a SocketAddr) -> Self {
        Self {
            value,
            socket,
            addr,
        }
    }
}

impl Future for Task<'_> {
    type Output = Option<u32>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Polling for {}", self.value);
        let buf = &self.value.to_be_bytes();

        match self.socket.poll_send_to(cx, buf, self.addr) {
            Poll::Ready(Ok(_)) => {
                println!("Got Ok for {}", self.value);
                Poll::Ready(Some(self.value))
            }
            Poll::Ready(Err(_)) => {
                println!("Got err for {}", self.value);
                Poll::Ready(None)
            }
            Poll::Pending => {
                println!("Got pending for {}", self.value);
                Poll::Pending
            }
        }
    }
}

有时候,仅写入其中一个数据并打印后,它就会卡住:

Sometimes it gets stuck after writing only one of the data, printing:

Polling for 0
Got pending for 0
Polling for 1
Got pending for 1
Polling for 2
Got pending for 2
Polling for 2
Got Ok for 2
Done: Some(2)
Ok((4, V4(127.0.0.1:12004)))

在这种情况下,永远不会唤醒值为0和1的任务.我如何可靠地通知他们将其唤醒?

The tasks with value 0 and 1 are never woken up in this case. How do I reliably signal them to wake them up?

我尝试在接收到 Poll :: Ready 后调用 cx.waker().wake_by_ref(),因为我认为这也可能会唤醒其他人,但事实并非如此

I tried calling cx.waker().wake_by_ref() on receiving Poll::Ready as I thought that may wake up other too, but that's not the case.

推荐答案

poll_send_to 返回 Poll :: Pending 时,它会确保唤醒<对上下文中提供的code> Waker 进行了轮询.但是,只需要唤醒被轮询的最后一个唤醒器即可.这意味着,由于您要从多个任务在同一套接字上调用 poll_send_to ,因此套接字仅承诺将唤醒上一次对其进行轮询的主机.

When poll_send_to returns Poll::Pending, it guarantees to emit a wake-up to the Waker provided in the context is was polled with. However it is only required to emit a wake-up to the last Waker it was polled with. This means that since you are calling poll_send_to on the same socket from multiple tasks, the socket has only promised to emit a wake-up to the one that polled it last.

这也解释了它为什么起作用:

This also explains why this works:

let mut futs = Vec::new();
futs.push(Task::new(0, &socket, &server_0));
futs.push(Task::new(1, &socket, &server_1));
futs.push(Task::new(2, &socket, &server_2));

for n in join_all(futs).await {
    println!("Done: {:?}", n)
}

FuturesUnordered 不同, join_all 组合器每次轮询时都会轮询每个内部的未来,但是 FuturesUnordered 会跟踪哪个潜在的未来.唤醒来自.

Unlike FuturesUnordered, the join_all combinator will poll every internal future every time it is polled, but FuturesUnordered keeps track of which underlying future the wake-up came from.

另请参见 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆