try_lock on futures::lock::Mutex 在异步之外? [英] try_lock on futures::lock::Mutex outside of async?

查看:55
本文介绍了try_lock on futures::lock::Mutex 在异步之外?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为具有 futures::lock::Mutex 的结构实现 Async 读取:

pub struct SmolSocket<'a>{堆栈:Arc>、}impl'a>用于 SmolSocket<'a> 的 AsyncRead{fn poll_read(自我:Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut tokio::io::ReadBuf<'_>) ->投票>{block_on(self.stack).read(...)}}

问题在于,由于 poll_read 不是异步的,我无法调用 await.但我也不想,因为它会阻止.我可以调用 try_lock 来尝试,如果没有,我会注册一个 Waker 以便将来被 SmolSocket 调用.

因为我不能这样做,因为它不是 async,是否有一个版本的 block_ontry_lock 对于 的作用相同>futures::lock::Mutexasync 之外?

解决方案

您可能想轮询 MutexLockFuture,例如可以使用 core::task::ready!,如下所示:

let num = match fut.poll(cx) {投票::就绪(t) =>吨,投票::待定 =>返回投票::待定,};

要轮询未来,您还需要固定它(确保它不会被移动).这可以通过 tokio::pin! 在堆栈上完成.,或者 Pin::new 如果类型已经是 Unpin(MutexLockFuture 是),或者通过移动到带有 Box::pin 的堆.

下面是一个可运行的例子.

⚠️ 请继续阅读以了解您为什么不想这样做!

#![feature(ready_macro)]使用核心::{未来::未来,针::针,任务::{准备好,上下文,投票},};使用 std::sync::Arc;使用 tokio::io::{AsyncRead, AsyncReadExt};pub struct SmolStackWithDevice<'a>{计数器:使用,数据:&'a [u8],}impl'a>用于 SmolStackWithDevice<'a> 的 AsyncRead{fn poll_read(mut self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut tokio::io::ReadBuf<'_>,) ->轮询>{如果 self.counter % 2 == 0 {self.counter += 1;cx.waker().wake_by_ref();println!(什么都不读");返回投票::待定;}buf.put_slice(&[self.data[self.counter/2]]);self.counter += 1;println!(读点东西");民意调查::就绪(好的(()))}}pub struct SmolSocket<'a>{堆栈:Arc>、}impl'a>用于 SmolSocket<'a> 的 AsyncRead{fn poll_read(自我:Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut tokio::io::ReadBuf<'_>,) ->投票>{让 mut lock_fut = self.stack.lock();让 pinned_lock_fut = Pin::new(&mut lock_fut);让 mut 守卫 = 准备好了!(pinned_lock_fut.poll(cx));println!(获得锁");让 pinned_inner = Pin::new(&mut *guard);pinned_inner.poll_read(cx, buf)}}#[tokio::main(flavor = "current_thread")]异步 fn main() {让数据= bHORSE";让 mut buf = [0;5];让 mut s = SmolSocket {堆栈:弧::新(SmolStackWithDevice {计数器:0,数据:&data[..],}.进入(),),};s.read_exact(&mut buf).await.unwrap();println!("{}", String::from_utf8_lossy(&buf));}

看看吧去吧!(在 Rust Playground)

⚠️ 请继续阅读以了解您为什么不想这样做!

那么,问题是什么?好吧,正如你从输出中看到的,每当我们成功获取锁,但底层源没有准备好读取,或者只给了我们一个小的读取时,我们丢弃锁,在下一次轮询时我们将不得不再次获得它.

请记住,Mutexasync 风格只推荐在 stdparking_lot 上当预期成功锁定的 Guard 将跨 await 保存,或显式存储在 Future 数据结构中.>

我们在这里没有这样做,我们只是在使用与 Mutex::try_lock,因为每当锁不是立即可用时,我们就会丢弃 MutexLockFuture 而不是等待被唤醒再次轮询.

但是,将锁存储在数据结构中会很容易意外死锁.所以一个好的设计可能是创建一个难以存储(借用)的 AsyncRead 适配器来包装锁:

pub struct SmolSocket<'a>{堆栈:Arc>、}impl'a>SmolSocket<'a>{fn read(&'a self) ->阅读器{读者::锁定(self.stack.lock())}}pub enum Reader<'a>{锁定(futures::lock::MutexLockFuture<'a, SmolStackWithDevice<'a>),锁定(futures::lock::MutexGuard<'a, SmolStackWithDevice<'a>>),}impl'a>AsyncRead for Reader<'a>{fn poll_read(自我:Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut tokio::io::ReadBuf<'_>,) ->投票>{让 this = self.get_mut();匹配这个{阅读器::锁定(f) =>{*this = Reader::Locked(ready!(Pin::new(f).poll(cx)));println!(获得锁");Pin::new(this).poll_read(cx, buf)}读者::锁定(l) =>Pin::new(&mut **l).poll_read(cx, buf),}}}#[tokio::main(flavor = "current_thread")]异步 fn main() {让数据= bHORSE";让 mut buf = [0;5];让 s = SmolSocket {堆栈:弧::新(SmolStackWithDevice {计数器:0,数据:&data[..],}.进入(),),};s.read().read_exact(&mut buf).await.unwrap();println!("{}", String::from_utf8_lossy(&buf));}

看看吧去吧!(可执行的 Playground 链接)

这是可行的,因为 LockFuture 和我们的 SmolStackWithDevice 都是 Unpin(非自引用),所以我们不必须保证我们不会移动它们.

在一般情况下,例如如果您的 SmolStackWithDevice 不是 Unpin,您必须像这样投影 Pin:

unsafe {让 this = self.get_unchecked_mut();匹配这个{阅读器::锁定(f) =>{*this = Reader::Locked(ready!(Pin::new_unchecked(f).poll(cx)));println!(获得锁");Pin::new_unchecked(this).poll_read(cx, buf)}读者::锁定(l) =>Pin::new_unchecked(&mut **l).poll_read(cx, buf),}}

不确定如何封装 unsafety,pin_project 在这里还不够,因为我们还需要取消对守卫的引用.

但这只会获取一次锁,并在 Reader 被删除时删除它,所以,非常成功.

你也可以看到如果你这样做了它不会死锁

 let mut r1 = s.read();让 mut r2 = s.read();r1.read_exact(&mut buf[..3]).await.unwrap();下降(r1);r2.read_exact(&mut buf[3..]).await.unwrap();println!("{}", String::from_utf8_lossy(&buf));

这是唯一可能的,因为我们将锁定推迟到轮询.

I'm trying to implement Async read for a struct that has a futures::lock::Mutex:

pub struct SmolSocket<'a> {
    stack: Arc<futures::lock::Mutex<SmolStackWithDevice<'a>>>,
}

impl<'a> AsyncRead for SmolSocket<'a>  {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        block_on(self.stack).read(...)
    }
}

The problem is that, since poll_read is not async, I cannot call await. But I also don't want to, as it'd block. I could call try_lock to try and if not, I'd register a Waker to be called by SmolSocket in the future.

Since I cannot do that either because it's not async, is there a version of block_on that does the same as try_lock for futures::lock::Mutex outside of async?

解决方案

You probably mean to poll the MutexLockFuture instead, this can for example be done with the core::task::ready! macro, which desugars as following:

let num = match fut.poll(cx) {
    Poll::Ready(t) => t,
    Poll::Pending => return Poll::Pending,
};

To poll a future, you also need to pin it (ensure it doesn't get moved). This can be done on the stack with tokio::pin!, or Pin::new if the type is already Unpin (MutexLockFuture is), or by moving onto the heap with Box::pin.

Below is a runnable example.

⚠️ KEEP READING TO SEE WHY YOU DON'T WANT TO DO THIS!

#![feature(ready_macro)]
use core::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt};
pub struct SmolStackWithDevice<'a> {
    counter: usize,
    data: &'a [u8],
}
impl<'a> AsyncRead for SmolStackWithDevice<'a> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        if self.counter % 2 == 0 {
            self.counter += 1;
            cx.waker().wake_by_ref();
            println!("read nothing");
            return Poll::Pending;
        }
        buf.put_slice(&[self.data[self.counter / 2]]);
        self.counter += 1;
        println!("read something");
        Poll::Ready(Ok(()))
    }
}
pub struct SmolSocket<'a> {
    stack: Arc<futures::lock::Mutex<SmolStackWithDevice<'a>>>,
}
impl<'a> AsyncRead for SmolSocket<'a> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let mut lock_fut = self.stack.lock();
        let pinned_lock_fut = Pin::new(&mut lock_fut);
        let mut guard = ready!(pinned_lock_fut.poll(cx));
        println!("acquired lock");
        let pinned_inner = Pin::new(&mut *guard);
        pinned_inner.poll_read(cx, buf)
    }
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let data = b"HORSE";
    let mut buf = [0; 5];
    let mut s = SmolSocket {
        stack: Arc::new(
            SmolStackWithDevice {
                counter: 0,
                data: &data[..],
            }
            .into(),
        ),
    };
    s.read_exact(&mut buf).await.unwrap();
    println!("{}", String::from_utf8_lossy(&buf));
}

Look at it go! (in Rust Playground)

⚠️ KEEP READING TO SEE WHY YOU DON'T WANT TO DO THIS!

So, what is the problem? Well, as you can see from the output, whenever we succeed in acquiring the lock, but the underlying source is not ready to read, or only gives us a small read, we drop the lock, and on the next poll we will have to acquire it again.

This is a good point to remember that async flavors of Mutex are only recommended over std or parking_lot when it is expected that the Guard from a successful locking will be held across an await, or explicitly stored in a Future data structure.

We are not doing that here, we are only ever exercising the fast path equivalent to Mutex::try_lock, because whenever the lock is not immediately available, we drop the MutexLockFuture instead of waiting to be waked to poll it again.

However, storing the lock in the data structure would make it easy to accidentally deadlock. So a good design might be creating an awkward-to-store(borrowing) AsyncRead adapter that wraps the lock:

pub struct SmolSocket<'a> {
    stack: Arc<futures::lock::Mutex<SmolStackWithDevice<'a>>>,
}
impl<'a> SmolSocket<'a> {
    fn read(&'a self) -> Reader<'a> {
        Reader::Locking(self.stack.lock())
    }
}
pub enum Reader<'a> {
    Locking(futures::lock::MutexLockFuture<'a, SmolStackWithDevice<'a>>),
    Locked(futures::lock::MutexGuard<'a, SmolStackWithDevice<'a>>),
}
impl<'a> AsyncRead for Reader<'a> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let this = self.get_mut();
        match this {
            Reader::Locking(f) => {
                *this = Reader::Locked(ready!(Pin::new(f).poll(cx)));
                println!("acquired lock");
                Pin::new(this).poll_read(cx, buf)
            }
            Reader::Locked(l) => Pin::new(&mut **l).poll_read(cx, buf),
        }
    }
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let data = b"HORSE";
    let mut buf = [0; 5];
    let s = SmolSocket {
        stack: Arc::new(
            SmolStackWithDevice {
                counter: 0,
                data: &data[..],
            }
            .into(),
        ),
    };
    s.read().read_exact(&mut buf).await.unwrap();
    println!("{}", String::from_utf8_lossy(&buf));
}

Look at it go! (executable Playground link)

This works out, because both the LockFuture and our SmolStackWithDevice are Unpin (non-self-referential) and so we don't have to guarantee we aren't moving them.

In a general case, for example if your SmolStackWithDevice is not Unpin, you'd have to project the Pin like this:

unsafe {
    let this = self.get_unchecked_mut();
    match this {
        Reader::Locking(f) => {
            *this = Reader::Locked(ready!(Pin::new_unchecked(f).poll(cx)));
            println!("acquired lock");
            Pin::new_unchecked(this).poll_read(cx, buf)
        }
        Reader::Locked(l) => Pin::new_unchecked(&mut **l).poll_read(cx, buf),
    }
}

Not sure how to encapsulate the unsafety, pin_project isn't enough here, as we also need to dereference the guard.

But this only acquires the lock once, and drops it when the Reader is dropped, so, great success.

You can also see that it doesn't deadlock if you do

    let mut r1 = s.read();
    let mut r2 = s.read();
    r1.read_exact(&mut buf[..3]).await.unwrap();
    drop(r1);
    r2.read_exact(&mut buf[3..]).await.unwrap();
    println!("{}", String::from_utf8_lossy(&buf));

This is only possible because we deferred locking until polling.

这篇关于try_lock on futures::lock::Mutex 在异步之外?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆