如何实现对`异步fn(& mut self)`进行轮询的`Future`/`Stream`? [英] How to implement a `Future` / `Stream` that polls `async fn(&mut self)`?

查看:121
本文介绍了如何实现对`异步fn(& mut self)`进行轮询的`Future`/`Stream`?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下结构

struct Test;

impl Test {
    async fn function(&mut self) {}
}

我想在Test上实现一个std::future::Future(嗯,实际上是futures::Stream,但是基本上是相同的),这会轮询function.我的第一次尝试看起来像这样

I want to implement an std::future::Future (well, actually futures::Stream, but it's basically the same) on Test, that would poll the function. My first try looked something like this

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

显然那是行不通的.我知道function必须被调用一次,返回的Future必须存储在结构中的某个位置,然后必须对保存的将来进行轮询.所以,我已经尝试过了

Obviously that didn't work. I have understood that function must be called once, returned Future must be stored somewhere in the struct, and then saved future must be polled. So, I have tried this

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>);
impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

好吧,这并不奇怪

问题是,在我调用function()之后-我已获取Test&mut引用,因此我无法更改Test变量,因此-无法存储返回的Test内的Future.

The problem is, after I call function() - I have taken a &mut reference of Test, because of that I can't change the Test variable, and therefore - can't store the returned Future inside the Test.

更新1:

我已经找到了我能想到的最被诅咒和最不安全的解决方案(灵感来自

I have arrived at the most cursed and unsafe solution that I can think of (inspired by this)

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        // fuck the police
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

我真的希望还有另一种方式

I REALLY hope that there is another way

推荐答案

尽管有时您可能想做一些与您要在此处完成的事情类似的事情,但这很罕见.因此,大多数阅读此文档的人,甚至可能是OP,都可能希望进行重组,以使用于单个异步执行的结构状态和数据是不同的对象.

Though there are times when you may want to do things similar to what you're trying to accomplish here, they are a rarity. So most people reading this, maybe even OP, may wish to restructure such that struct state and data used for a single async execution are different objects.

但是要回答您的问题,是的,这在某种程度上是可能的.除非您要绝对求助于不安全的代码,否则将需要使用 Arc .您希望在async fn function内部进行操作的所有字段都必须包装在Mutex内部,并且函数本身将接受Arc<Self>.有关此示例,请参见下面的代码块.

But to answer your question, yes it is somewhat possible. Unless you want to absolutely resort to unsafe code you will need to use Mutex and Arc. All fields you wish to manipulate inside async fn function will have to be wrapped inside a Mutex and the function itself will accept an Arc<Self>. See the code block below for an example of this.

但是,我必须强调,这不是一个很好的解决方案,您可能不想这样做.根据您的具体情况,您的解决方案可能会有所不同,但我对OP在使用Stream时试图完成的目标的猜测可以通过类似于我编写的以下要点来更好地解决:

I must stress, however, that this is not a beautiful solution and you probably don't want to do this. Depending on your specific case your solution may vary, but my guess of what OP is trying to accomplish while using Streams would be better solved by something similar to this gist that I wrote as well: https://gist.github.com/TimLuq/83a35453405f4c6e0f63fb2a0caa9f6e.

use core::future::Future;
use core::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}
impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}
impl Future for Test {
    type Output = ();
    fn poll(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}

这篇关于如何实现对`异步fn(&amp; mut self)`进行轮询的`Future`/`Stream`?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆