如何实现对异步fn进行轮询的Future或Stream? [英] How to implement a Future or Stream that polls an async fn?

查看:84
本文介绍了如何实现对异步fn进行轮询的Future或Stream?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个结构 Test ,我想实现 std :: future :: Future ,该类将轮询 function :

I have a struct Test I want to implement std::future::Future that would poll function:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct Test;

impl Test {
    async fn function(&mut self) {}
}

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

那行不通:

error[E0308]: mismatched types
  --> src/lib.rs:17:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
17 |             Poll::Pending => Poll::Pending,
   |             ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

error[E0308]: mismatched types
  --> src/lib.rs:18:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
18 |             Poll::Ready(_) => Poll::Ready(()),
   |             ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

我知道 function 必须被调用一次,返回的 Future 必须存储在结构中的某个位置,然后必须对保存的Future进行轮询.我试过了:

I understand that function must be called once, the returned Future must be stored somewhere in the struct, and then the saved future must be polled. I tried this:

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);

impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

那也不起作用:

error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
   --> src/lib.rs:7:13
    |
7   | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
    |
    = help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`

在我调用 function()之后,我采用了 Test & mut 引用,因为我无法更改 Test 变量,因此无法将返回的 Future 存储在 Test 中.

After I call function() I have taken a &mut reference of Test, because of that I can't change the Test variable, and therefore can't store the returned Future inside the Test.

我确实得到了一个不安全的解决方案(灵感来自)

I did get an unsafe solution (inspired by this)

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

我希望还有另一种方式.

I hope that there is another way.

推荐答案

尽管有时您可能想做一些与您要在此处完成的事情类似的事情,但这很罕见.因此,大多数阅读此文档的人,甚至可能是OP,都可能希望进行重组,以使用于单个异步执行的结构状态和数据是不同的对象.

Though there are times when you may want to do things similar to what you're trying to accomplish here, they are a rarity. So most people reading this, maybe even OP, may wish to restructure such that struct state and data used for a single async execution are different objects.

要回答您的问题,是的,这在某种程度上是可能的.除非您要绝对求助于不安全的代码,否则将需要使用 Mutex Arc .您希望在 async fn 内部处理的所有字段都必须包装在 Mutex 内部,并且函数本身将接受 Arc< Self>

To answer your question, yes it is somewhat possible. Unless you want to absolutely resort to unsafe code you will need to use Mutex and Arc. All fields you wish to manipulate inside the async fn will have to be wrapped inside a Mutex and the function itself will accept an Arc<Self>.

但是,我必须强调,这不是一个很好的解决方案,您可能不想这样做.根据您的具体情况,您的解决方案可能会有所不同,但是我对使用 Stream 时OP试图完成的操作的猜测可以通过类似于

I must stress, however, that this is not a beautiful solution and you probably don't want to do this. Depending on your specific case your solution may vary, but my guess of what OP is trying to accomplish while using Streams would be better solved by something similar to this gist that I wrote.

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
};

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}

impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}

impl Future for Test {
    type Output = ();
    fn poll(
        self: std::pin::Pin<&mut Self>,
        ctx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}

这篇关于如何实现对异步fn进行轮询的Future或Stream?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆