为什么我的Future实施在被轮询一次并且未就绪后被阻止? [英] Why is my Future implementation blocked after it is polled once and NotReady?

查看:113
本文介绍了为什么我的Future实施在被轮询一次并且未就绪后被阻止?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我实现了未来并提出了要求,但是它阻止了我的curl,并且日志显示poll仅被调用一次.

I implemented the future and made a request of it, but it blocked my curl and the log shows that poll was only invoked once.

我实施了任何错误的操作吗?

Did I implement anything wrong?

use failure::{format_err, Error};
use futures::{future, Async};
use hyper::rt::Future;
use hyper::service::{service_fn, service_fn_ok};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use std::{
    sync::{Arc, Mutex},
    task::Waker,
    thread,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    resp: String,
}

impl Future for TimerFuture {
    type Item = Response<Body>;
    type Error = hyper::Error;
    fn poll(&mut self) -> futures::Poll<Response<Body>, hyper::Error> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            return Ok(Async::Ready(Response::new(Body::from(
                shared_state.resp.clone(),
            ))));
        } else {
            return Ok(Async::NotReady);
        }
    }
}

impl TimerFuture {
    pub fn new(instance: String) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            resp: String::new(),
        }));
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            let res = match request_health(instance) {
                Ok(status) => status.clone(),
                Err(err) => {
                    error!("{:?}", err);
                    format!("{}", err)
                }
            };
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            shared_state.resp = res;
        });

        TimerFuture { shared_state }
    }
}

fn request_health(instance_name: String) -> Result<String, Error> {
    std::thread::sleep(std::time::Duration::from_secs(1));
    Ok("health".to_string())
}

type BoxFut = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;
fn serve_health(req: Request<Body>) -> BoxFut {
    let mut response = Response::new(Body::empty());
    let path = req.uri().path().to_owned();
    match (req.method(), path) {
        (&Method::GET, path) => {
            return Box::new(TimerFuture::new(path.clone()));
        }
        _ => *response.status_mut() = StatusCode::NOT_FOUND,
    }
    Box::new(future::ok(response))
}

fn main() {
    let endpoint_addr = "0.0.0.0:8080";
    match std::thread::spawn(move || {
        let addr = endpoint_addr.parse().unwrap();
        info!("Server is running on {}", addr);
        hyper::rt::run(
            Server::bind(&addr)
                .serve(move || service_fn(serve_health))
                .map_err(|e| eprintln!("server error: {}", e)),
        );
    })
    .join()
    {
        Ok(e) => e,
        Err(e) => println!("{:?}", e),
    }
}

编译并运行此代码后,正在运行端口8080的服务器.用curl调用服务器,它将阻止:

After compile and run this code, a server with port 8080 is running. Call the server with curl and it will block:

curl 127.0.0.1:8080/my-health-scope

推荐答案

我实施了任何错误的操作吗?

Did I implement anything wrong?

是的,您没有阅读并遵循您要实现的方法的文档(重点是我的):

Yes, you did not read and follow the documentation for the method you are implementing (emphasis mine):

当将来还没有准备好时,将返回Async::NotReady值.在这种情况下,未来也将对当前任务产生的价值产生兴趣.这是通过调用task::park 来检索当前Task的句柄来完成的.当将来准备好进步时(例如,应该再次对其进行轮询),在Task 上调用 unpark方法.

When a future is not ready yet, the Async::NotReady value will be returned. In this situation the future will also register interest of the current task in the value being produced. This is done by calling task::park to retrieve a handle to the current Task. When the future is then ready to make progress (e.g. it should be polled again) the unpark method is called on the Task.

作为最小的,可复制的示例,让我们使用它:

use futures::{future::Future, Async};
use std::{
    mem,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

pub struct Timer {
    data: Arc<Mutex<String>>,
}

impl Timer {
    pub fn new(instance: String) -> Self {
        let data = Arc::new(Mutex::new(String::new()));

        thread::spawn({
            let data = data.clone();
            move || {
                thread::sleep(Duration::from_secs(1));
                *data.lock().unwrap() = instance;
            }
        });

        Timer { data }
    }
}

impl Future for Timer {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let mut data = self.data.lock().unwrap();

        eprintln!("poll was called");

        if data.is_empty() {
            Ok(Async::NotReady)
        } else {
            let data = mem::replace(&mut *data, String::new());
            Ok(Async::Ready(data))
        }
    }
}

fn main() {
    let v = Timer::new("Some text".into()).wait();
    println!("{:?}", v);
}

它只打印一次被叫投票".

It only prints out "poll was called" once.

您可以在实现Future::poll的过程中调用task::current(以前是task::park),保存结果值,然后在将来可能再次被轮询时将值与Task::notify(以前是Task::unpark)一起使用:

You can call task::current (previously task::park) in the implementation of Future::poll, save the resulting value, then use the value with Task::notify (previously Task::unpark) whenever the future may be polled again:

use futures::{
    future::Future,
    task::{self, Task},
    Async,
};
use std::{
    mem,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

pub struct Timer {
    data: Arc<Mutex<(String, Option<Task>)>>,
}

impl Timer {
    pub fn new(instance: String) -> Self {
        let data = Arc::new(Mutex::new((String::new(), None)));
        let me = Timer { data };

        thread::spawn({
            let data = me.data.clone();
            move || {
                thread::sleep(Duration::from_secs(1));
                let mut data = data.lock().unwrap();

                data.0 = instance;
                if let Some(task) = data.1.take() {
                    task.notify();
                }
            }
        });

        me
    }
}

impl Future for Timer {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let mut data = self.data.lock().unwrap();

        eprintln!("poll was called");

        if data.0.is_empty() {
            let v = task::current();
            data.1 = Some(v);
            Ok(Async::NotReady)
        } else {
            let data = mem::replace(&mut data.0, String::new());
            Ok(Async::Ready(data))
        }
    }
}

fn main() {
    let v = Timer::new("Some text".into()).wait();
    println!("{:?}", v);
}

另请参阅:

  • Why does Future::select choose the future with a longer sleep period first?
  • Why is `Future::poll` not called repeatedly after returning `NotReady`?
  • What is the best approach to encapsulate blocking I/O in future-rs?

这篇关于为什么我的Future实施在被轮询一次并且未就绪后被阻止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆