使用tokio 0.1.x生成具有非静态生存期的任务 [英] Spawning tasks with non-static lifetimes with tokio 0.1.x

查看:71
本文介绍了使用tokio 0.1.x生成具有非静态生存期的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个tokio核心,其主要任务是运行websocket(客户端).当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据.下面是一个最小的失败示例:

I have a tokio core whose main task is running a websocket (client). When I receive some messages from the server, I want to execute a new task that will update some data. Below is a minimal failing example:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}

哪个会产生此错误:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      

问题是通过手柄产生的新任务必须是静态的.这里.遗憾的是,我不清楚如何解决此问题.甚至尝试使用 Arc Mutex (单线程应用程序实际上并不需要),我也没有成功.

The problem is that new tasks that are spawned through a handle need to be static. The same issue is described here. Sadly it is unclear to me how I can fix the issue. Even some attempts with and Arc and a Mutex (which really shouldn't be needed for a single-threaded application), I was unsuccessful.

由于东京市的发展相当迅速,所以我想知道目前最好的解决方案是什么.你有什么建议吗?

Since developments occur rather quickly in the tokio landscape, I am wondering what the current best solution is. Do you have any suggestions?

Peter Hall 的解决方案适用于以上示例.可悲的是,当我构建失败的示例时,我更换了tokio反应堆,以为它们会相似.使用 tokio::runtime::current_thread

The solution by Peter Hall works for the example above. Sadly when I built the failing example I changed tokio reactor, thinking they would be similar. Using tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}

我获得:

error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21                                                         
|                                                                            
17 |         self.handle.spawn(future::ok(()).and_then(move |_x| {              
|                     ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|                                                                            
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`

所以在这种情况下,即使整个代码都是单线程的,我似乎也需要一个 Arc 和一个 Mutex ?

So it does seem like in this case I need an Arc and a Mutex even though the entire code is single-threaded?

推荐答案

在单线程程序中,您无需使用 Arc Rc 就足够了:

In a single-threaded program, you don't need to use Arc; Rc is sufficient:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

重点是您不必担心生存期,因为 Rc 的每个克隆的行为就像是拥有数据一样,而不是通过引用 self .需要内部的 Cell (或对于非 Copy 类型的 RefCell ),因为不能取消引用 Rc 可变的,因为它已被克隆.

The point is that you no longer have to worry about the lifetime because each clone of the Rc acts as if it owns the data, rather than accessing it via a reference to self. The inner Cell (or RefCell for non-Copy types) is needed because the Rc can't be dereferenced mutably, since it has been cloned.

tokio :: runtime :: current_thread :: Handle 的 spawn 方法要求将来是 Send 问题更新中的问题.此Tokio Github问题<.

The spawn method of tokio::runtime::current_thread::Handle requires that the future is Send, which is what is causing the problem in the update to your question. There is an explanation (of sorts) for why this is the case in this Tokio Github issue.

您可以使用 tokio :: runtime :: current_thread :: spawn 代替 Handle 的方法,该方法将始终在当前线程中运行将来的代码,并且要求将来是 Send .您可以在上面的代码中替换 self.handle.spawn ,它会正常工作.

You can use tokio::runtime::current_thread::spawn instead of the method of Handle, which will always run the future in the current thread, and does not require that the future is Send. You can replace self.handle.spawn in the code above and it will work just fine.

如果需要在 Handle 上使用该方法,则还需要诉诸 Arc Mutex (或 RwLock )来满足发送要求:

If you need to use the method on Handle then you will also need to resort to Arc and Mutex (or RwLock) in order to satisfy the Send requirement:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}

如果你的数据真的是usize,你也可以用AtomicUsize代替Mutex,但我个人觉得它只是太笨拙了.

If your data is really a usize, you could also use AtomicUsize instead of Mutex<usize>, but I personally find it just as unwieldy to work with.

这篇关于使用tokio 0.1.x生成具有非静态生存期的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆