为什么我的Future实施在被轮询一次并且未就绪后被阻止? [英] Why is my Future implementation blocked after it is polled once and NotReady?
问题描述
我实现了未来并提出了要求,但是它阻止了我的curl
,并且日志显示poll
仅被调用一次.
I implemented the future and made a request of it, but it blocked my curl
and the log shows that poll
was only invoked once.
我实施了任何错误的操作吗?
Did I implement anything wrong?
use failure::{format_err, Error};
use futures::{future, Async};
use hyper::rt::Future;
use hyper::service::{service_fn, service_fn_ok};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use std::{
sync::{Arc, Mutex},
task::Waker,
thread,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
resp: String,
}
impl Future for TimerFuture {
type Item = Response<Body>;
type Error = hyper::Error;
fn poll(&mut self) -> futures::Poll<Response<Body>, hyper::Error> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
return Ok(Async::Ready(Response::new(Body::from(
shared_state.resp.clone(),
))));
} else {
return Ok(Async::NotReady);
}
}
}
impl TimerFuture {
pub fn new(instance: String) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
resp: String::new(),
}));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
let res = match request_health(instance) {
Ok(status) => status.clone(),
Err(err) => {
error!("{:?}", err);
format!("{}", err)
}
};
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
shared_state.resp = res;
});
TimerFuture { shared_state }
}
}
fn request_health(instance_name: String) -> Result<String, Error> {
std::thread::sleep(std::time::Duration::from_secs(1));
Ok("health".to_string())
}
type BoxFut = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;
fn serve_health(req: Request<Body>) -> BoxFut {
let mut response = Response::new(Body::empty());
let path = req.uri().path().to_owned();
match (req.method(), path) {
(&Method::GET, path) => {
return Box::new(TimerFuture::new(path.clone()));
}
_ => *response.status_mut() = StatusCode::NOT_FOUND,
}
Box::new(future::ok(response))
}
fn main() {
let endpoint_addr = "0.0.0.0:8080";
match std::thread::spawn(move || {
let addr = endpoint_addr.parse().unwrap();
info!("Server is running on {}", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(move || service_fn(serve_health))
.map_err(|e| eprintln!("server error: {}", e)),
);
})
.join()
{
Ok(e) => e,
Err(e) => println!("{:?}", e),
}
}
编译并运行此代码后,正在运行端口8080的服务器.用curl
调用服务器,它将阻止:
After compile and run this code, a server with port 8080 is running. Call the server with curl
and it will block:
curl 127.0.0.1:8080/my-health-scope
推荐答案
我实施了任何错误的操作吗?
Did I implement anything wrong?
是的,您没有阅读并遵循您要实现的方法的文档(重点是我的):
Yes, you did not read and follow the documentation for the method you are implementing (emphasis mine):
当将来还没有准备好时,将返回
Async::NotReady
值.在这种情况下,未来也将对当前任务产生的价值产生兴趣.这是通过调用task::park
来检索当前Task
的句柄来完成的.当将来准备好进步时(例如,应该再次对其进行轮询),在Task
上调用unpark
方法.
When a future is not ready yet, the
Async::NotReady
value will be returned. In this situation the future will also register interest of the current task in the value being produced. This is done by callingtask::park
to retrieve a handle to the currentTask
. When the future is then ready to make progress (e.g. it should be polled again) theunpark
method is called on theTask
.
作为最小的,可复制的示例,让我们使用它:
use futures::{future::Future, Async};
use std::{
mem,
sync::{Arc, Mutex},
thread,
time::Duration,
};
pub struct Timer {
data: Arc<Mutex<String>>,
}
impl Timer {
pub fn new(instance: String) -> Self {
let data = Arc::new(Mutex::new(String::new()));
thread::spawn({
let data = data.clone();
move || {
thread::sleep(Duration::from_secs(1));
*data.lock().unwrap() = instance;
}
});
Timer { data }
}
}
impl Future for Timer {
type Item = String;
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut data = self.data.lock().unwrap();
eprintln!("poll was called");
if data.is_empty() {
Ok(Async::NotReady)
} else {
let data = mem::replace(&mut *data, String::new());
Ok(Async::Ready(data))
}
}
}
fn main() {
let v = Timer::new("Some text".into()).wait();
println!("{:?}", v);
}
它只打印一次被叫投票".
It only prints out "poll was called" once.
您可以在实现Future::poll
的过程中调用task::current
(以前是task::park
),保存结果值,然后在将来可能再次被轮询时将值与Task::notify
(以前是Task::unpark
)一起使用:
You can call task::current
(previously task::park
) in the implementation of Future::poll
, save the resulting value, then use the value with Task::notify
(previously Task::unpark
) whenever the future may be polled again:
use futures::{
future::Future,
task::{self, Task},
Async,
};
use std::{
mem,
sync::{Arc, Mutex},
thread,
time::Duration,
};
pub struct Timer {
data: Arc<Mutex<(String, Option<Task>)>>,
}
impl Timer {
pub fn new(instance: String) -> Self {
let data = Arc::new(Mutex::new((String::new(), None)));
let me = Timer { data };
thread::spawn({
let data = me.data.clone();
move || {
thread::sleep(Duration::from_secs(1));
let mut data = data.lock().unwrap();
data.0 = instance;
if let Some(task) = data.1.take() {
task.notify();
}
}
});
me
}
}
impl Future for Timer {
type Item = String;
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut data = self.data.lock().unwrap();
eprintln!("poll was called");
if data.0.is_empty() {
let v = task::current();
data.1 = Some(v);
Ok(Async::NotReady)
} else {
let data = mem::replace(&mut data.0, String::new());
Ok(Async::Ready(data))
}
}
}
fn main() {
let v = Timer::new("Some text".into()).wait();
println!("{:?}", v);
}
另请参阅:
- 为什么Future :: select首先选择睡眠时间更长的未来?
- 为什么在返回`NotReady`后不反复调用`Future :: poll`?
- 在Future-rs中封装阻塞I/O的最佳方法是什么?
- Why does Future::select choose the future with a longer sleep period first?
- Why is `Future::poll` not called repeatedly after returning `NotReady`?
- What is the best approach to encapsulate blocking I/O in future-rs?
这篇关于为什么我的Future实施在被轮询一次并且未就绪后被阻止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!