返回`NotReady`后,为什么不反复调用`Future :: poll`? [英] Why is `Future::poll` not called repeatedly after returning `NotReady`?

查看:89
本文介绍了返回`NotReady`后,为什么不反复调用`Future :: poll`?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下代码

extern crate futures; // v0.1 (old)

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

生成的线程设置一个标志,将来等待. 我们还休眠产生的线程,因此从.wait()进行的初始.poll()调用是在设置标志之前.这导致.wait()无限期(看似)阻塞.如果我们取消注释另一个thread::sleep_ms,则.wait()返回,并打印出结果(()).

The spawned thread sets a flag, which the future waits for. We also sleep the spawned thread, so the initial .poll() call from .wait() is before the flag is set. This causes .wait() to block (seemingly) indefinitely. If we uncomment the other thread::sleep_ms, .wait() returns, and prints out the result (()).

我希望当前线程尝试多次调用poll来解决未来问题,因为我们正在阻塞当前线程.但是,这没有发生.

I would expect the current thread to try to resolve the future by calling poll multiple times, since we're blocking the current thread. However, this is not happening.

我尝试阅读一些文档,这似乎是问题所在是第一次从poll获取NotReady之后,线程被park了.但是,我不清楚这是为什么的原因,或者如何解决该问题.

I have tried to read some docs, and it seems like the problem is that the thread is parked after getting NotReady from the poll the first time. However, it is not clear to me why this is, or how it is possible to work around this.

我想念什么?

推荐答案

为什么您需要停放一个等待的将来而不是反复轮询它?答案很明显,恕我直言.因为归根结底,它更快,更高效!

Why would you need to park a waiting future instead of polling it repeatedly? The answer is rather obvious, IMHO. Because at the end of the day it's faster and more efficient!

要反复轮询一个未来(可能被称为"繁忙等待"),图书馆将不得不决定是经常还是很少这样做,没有一个答案令人满意.经常这样做,这会浪费CPU周期,很少这样做,并且代码反应很慢.

To repeatedly poll a future (which might be dubbed "busy-waiting") the library would have to decide whether to do it often or seldom and neither answer is satisfactory. Do it often and you're wasting the CPU cycles, do it seldom and the code is slow to react.

所以,是的,您需要在等待某事时将其停放,然后在等待完成后将其取消停放.像这样:

So yeah, you need to park the task when you're waiting for something and then unpark it when you've done waiting. Like this:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}

请注意,Future::poll在这里正在做几件事情:它正在检查外部条件并且正在暂挂任务,因此可能有种族,例如:

Note that Future::poll is doing several things here: it's checking for an external condition and it's parking the task, so it's possible to have a race, like when:

  1. poll检查变量并将其找到为false;
  2. 外部代码将变量设置为true
  3. 外部代码检查任务是否已停放,并发现没有停放;
  4. poll停放任务,但是繁荣!为时已晚,没有人会再将其停放.
  1. the poll checks the variable and finds it to be false;
  2. the outer code sets the variable to true;
  3. the outer code checks if the task is parked and finds that it's not;
  4. the poll parks the task, but boom! it is too late, nobody is going to unpark it any longer.

为了避免种族冲突,我使用了Mutex来同步这些交互.

In order to avoid any races, I've used a Mutex to synchronize these interactions.

P.S.如果您只需要将线程结果包装到Future中,则考虑使用 oneshot 频道:它具有 Receiver 已经实现了Future接口.

P.S. If all you need is to wrap a thread result into a Future then consider using the oneshot channel: it has the Receiver that implements the Future interface already.

这篇关于返回`NotReady`后,为什么不反复调用`Future :: poll`?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆