如何将函数发送到另一个线程? [英] How can I send a function to another thread?

查看:75
本文介绍了如何将函数发送到另一个线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为我的Rust项目编写一个更简单的单元测试运行器.我已经创建了我的测试装置结构将实现的TestFixture特性,类似于从其他测试框架中的单元测试基类继承的特性.特质很简单.这是我的测试装置

I am attempting to write a simpler unit test runner for my Rust project. I have created a TestFixture trait that my test fixture structs will implement, similar to inheriting from the unit test base class in other testing frameworks. The trait is fairly simple. This is my test fixture

pub trait TestFixture {
    fn setup(&mut self) -> () {}
    fn teardown(&mut self) -> () {}
    fn before_each(&mut self) -> () {}
    fn after_each(&mut self) -> () {}
    fn tests(&mut self) -> Vec<Box<Fn(&mut Self)>>
        where Self: Sized {
        Vec::new()
    }
}

我的测试运行功能如下

pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) {
    fixture.setup();

    let _r = fixture.tests().iter().map(|t| {
        let handle = thread::spawn(move || {
            fixture.before_each();
            t(fixture);
            fixture.after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    });

    fixture.teardown();
}

我得到了错误

src/tests.rs:73:22: 73:35 error: the trait `core::marker::Send` is not implemented for the type `T` [E0277]
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 note: `T` cannot be sent between threads safely
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 error: the trait `core::marker::Sync` is not implemented for the type `for<'r> core::ops::Fn(&'r mut T)` [E0277]
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion
src/tests.rs:69:41: 84:6 note: expansion site
src/tests.rs:73:22: 73:35 note: `for<'r> core::ops::Fn(&'r mut T)` cannot be shared between threads safely
src/tests.rs:73         let handle = thread::spawn(move || {
                                     ^~~~~~~~~~~~~
note: in expansion of closure expansion

我尝试在发送到线程的类型周围添加圆弧,没有骰子,同样的错误.

I have tried adding Arcs around the types being sent to the thread, no dice, same error.

pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) {
    fixture.setup();

    let fix_arc = Arc::new(Mutex::new(fixture));
    let _r = fixture.tests().iter().map(|t| {
        let test_arc = Arc::new(Mutex::new(t));
        let fix_arc_clone = fix_arc.clone();
        let test_arc_clone = test_arc.clone();
        let handle = thread::spawn(move || {
            let thread_test = test_arc_clone.lock().unwrap();
            let thread_fix = fix_arc_clone.lock().unwrap();
            (*thread_fix).before_each();
            (*thread_test)(*thread_fix);
            (*thread_fix).after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    });

    fixture.teardown();
}

样品测试夹具将是类似的

A sample test fixture would be something like

struct BuiltinTests {
    pwd: PathBuf
}

impl TestFixture for BuiltinTests {
    fn setup(&mut self) {
        let mut pwd = env::temp_dir();
        pwd.push("pwd");

        fs::create_dir(&pwd);
        self.pwd = pwd;
    }

    fn teardown(&mut self) {
        fs::remove_dir(&self.pwd);
    }

    fn tests(&mut self) -> Vec<Box<Fn(&mut BuiltinTests)>> {
        vec![Box::new(BuiltinTests::cd_with_no_args)]
    }
}

impl BuiltinTests {
    fn new() -> BuiltinTests {
        BuiltinTests {
            pwd: PathBuf::new()
        }
    }
}

fn cd_with_no_args(&mut self) {
    let home = String::from("/");
    env::set_var("HOME", &home);

    let mut cd = Cd::new();
    cd.run(&[]);

    assert_eq!(env::var("PWD"), Ok(home));
}

#[test]
fn cd_tests() {
    let mut builtin_tests = BuiltinTests::new();
    test_fixture_runner(&mut builtin_tests);
}

我使用线程的全部意图是与测试运行程序隔离.如果测试未通过断言,则会引起恐慌,从而杀死跑步者.感谢您的任何见解,如果能够解决紧急问题,我愿意更改设计.

My whole intention of using threads is isolation from the test runner. If a test fails an assertion it causes a panic which kills the runner. Thanks for any insight, I'm willing to change my design if that will fix the panic problem.

推荐答案

您的代码有几个问题,我将向您展示如何一步一步地解决它们.

There are several problems with your code, I'll show you how to fix them one by one.

第一个问题是您正在使用map()迭代迭代器.由于map()是惰性的,它将无法正常工作-除非您使用了迭代器,否则传递给它的闭包将不会运行.正确的方法是使用for循环:

The first problem is that you're using map() to iterate over an iterator. It won't work correctly because map() is lazy - unless you consume the iterator, the closure you passed to it won't run. The correct way is to use for loop:

for t in fixture().tests().iter() {

第二,您要通过引用来迭代闭包向量:

Second, you're iterating the vector of closures by reference:

fixture.tests().iter().map(|t| {

Vec<T>上的

iter()返回一个迭代器,该迭代器产生类型为&T的项,因此您的t将为类型&Box<Fn(&mut Self)>.但是,默认情况下,Box<Fn(&mut T)>不会实现 Sync (这是一个trait对象,除了您明确指定外,它没有有关基础类型的信息),因此&Box<Fn(&mut T)>不能在多个线程中使用.这就是您看到的第二个错误.

iter() on a Vec<T> returns an iterator yielding items of type &T, so your t will be of type &Box<Fn(&mut Self)>. However, Box<Fn(&mut T)> does not implement Sync by default (it is a trait object which have no information about the underlying type except that you specified explicitly), so &Box<Fn(&mut T)> can't be used across multiple threads. That's what the second error you see is about.

您很可能不想通过引用使用这些闭包;您可能希望将它们完全移到生成的线程中.为此,您需要使用into_iter()而不是iter():

Most likely you don't want to use these closures by reference; you probably want to move them to the spawned thread entirely. For this you need to use into_iter() instead of iter():

for t in fixture.tests().into_iter() {

现在t将是Box<Fn(&mut T)>类型.但是,它仍然不能跨线程发送.同样,它是一个特征对象,并且编译器不知道其中包含的类型是否为Send.为此,您需要添加Send绑定到闭包的类型:

Now t will be of type Box<Fn(&mut T)>. However, it still can't be sent across threads. Again, it is a trait object, and the compiler does not know if the type contained inside is Send. For this you need to add Send bound to the type of the closure:

fn tests(&mut self) -> Vec<Box<Fn(&mut Self)+Send>>

现在有关Fn的错误消失了.

Now the error about Fn is gone.

最后一个错误是关于T未实现的Send.我们需要在T上添加一个Send绑定:

The last error is about Send not being implemented for T. We need to add a Send bound on T:

pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {

现在错误变得更容易理解:

And now the error becomes more comprehensible:

test.rs:18:22: 18:35 error: captured variable `fixture` does not outlive the enclosing closure
test.rs:18         let handle = thread::spawn(move || {
                                ^~~~~~~~~~~~~
note: in expansion of closure expansion
test.rs:18:5: 28:6 note: expansion site
test.rs:15:66: 31:2 note: captured variable is valid for the anonymous lifetime #1 defined on the block at 15:65
test.rs:15 pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {
test.rs:16     fixture.setup();
test.rs:17
test.rs:18     for t in fixture.tests().into_iter() {
test.rs:19         let handle = thread::spawn(move || {
test.rs:20             fixture.before_each();
           ...
note: closure is valid for the static lifetime

发生此错误是因为您试图在spawn() ed线程中使用引用. spawn()要求其闭包参数具有'static绑定,也就是说,其捕获的环境不得包含具有非'static生存期的引用.但这正是这里发生的-&mut T不是'static. spawn()设计并不禁止避免加入,因此显式地编写它是为了禁止将非'static引用传递给生成的线程.

This error happens because you're trying to use a reference in a spawn()ed thread. spawn() requires its closure argument to have 'static bound, that is, its captured environment must not contain references with non-'static lifetimes. But that's exactly what happens here - &mut T is not 'static. spawn() design does not prohibit avoiding joining, so it is explicitly written to disallow passing non-'static references to the spawned thread.

请注意,在使用&mut T时,即使将&mut T放在Arc中,该错误也是不可避免的,因为这样&mut T的生存期将被存储"在Arc中,依此类推Arc<Mutex<&mut T>>也不会是'static.

Note that while you're using &mut T, this error is unavoidable, even if you put &mut T in Arc, because then the lifetime of &mut T would be "stored" in Arc and so Arc<Mutex<&mut T>> also won't be 'static.

有两种方法可以做您想要的事情.

There are two ways to do what you want.

首先,您可以使用不稳定的 thread::scoped() API.之所以不稳定,是因为它已显示以允许安全代码中的内存不安全,并且计划是在将来提供某种替代品.但是,您可以在每晚的Rust中使用它(仅在经过特殊设计的情况下,它本身不会导致内存不安全):

First, you can use the unstable thread::scoped() API. It is unstable because it is shown to allow memory unsafety in safe code, and the plan is to provide some kind of replacement for it in the future. However, you can use it in nightly Rust (it won't cause memory unsafety by itself, only in specifically crafted situations):

pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) {
    fixture.setup();

    let tests = fixture.lock().unwrap().tests();
    for t in tests.into_iter() {
        let f = &mut *fixture;

        let handle = thread::scoped(move || {
            f.before_each();
            t(f);
            f.after_each();
        });

        handle.join();
    }

    fixture.teardown();
}

之所以编译该代码,是因为scoped()的编写方式可以保证(在大多数情况下)线程不会超过所有捕获的引用.我不得不重新借用fixture,因为否则(因为&mut引用不可复制),它将被移入线程,并且将禁止fixture.teardown().另外,我还必须提取tests变量,因为否则,互斥锁将在for循环期间被主线程锁定,这自然会禁止将其锁定在子线程中.

This code compiles because scoped() is written in such a way that it guarantees (in most cases) that the thread won't outlive all captured references. I had to reborrow fixture because otherwise (because &mut references aren't copyable) it would be moved into the thread and fixture.teardown() would be prohibited. Also I had to extract tests variable because otherwise the mutex will be locked by the main thread for the duration of the for loop which would naturally disallow locking it in the child threads.

但是,使用scoped()不能隔离子线程中的紧急情况.如果子线程出现紧急情况,则该紧急情况将从join()调用中重新引发.一般来说,这可能是问题,也可能不是问题,但是我认为这对您的代码来说是一个问题.

However, with scoped() you can't isolate the panic in the child thread. If the child thread panics, this panic will be rethrown from join() call. This may or may not be a problem in general, but I think it is a problem for your code.

另一种方法是重构代码,从一开始就将固定装置保存在Arc<Mutex<..>>中:

Another way is to refactor your code to hold the fixture in Arc<Mutex<..>> from the beginning:

pub fn test_fixture_runner<T: TestFixture + Send + 'static>(fixture: Arc<Mutex<T>>) {
    fixture.lock().unwrap().setup();

    for t in fixture.lock().unwrap().tests().into_iter() {
        let fixture = fixture.clone();

        let handle = thread::spawn(move || {
            let mut fixture = fixture.lock().unwrap();

            fixture.before_each();
            t(&mut *fixture);
            fixture.after_each();
        });

        if let Err(_) = handle.join() {
            println!("Test failed!")
        } 
    }

    fixture.lock().unwrap().teardown();
}

请注意,现在T也必须也是'static,因为否则它不能与thread::spawn()一起使用,因为它需要'static.内部封口中的fixture不是&mut T而是MutexGuard<T>,因此必须将其显式转换为&mut T才能将其传递给t.

Note that now T has also to be 'static, again, because otherwise it couldn't be used with thread::spawn() as it requires 'static. fixture inside the inner closure is not &mut T but a MutexGuard<T>, and so it has to be explicitly converted to &mut T in order to pass it to t.

这似乎过于复杂和不必要,但是,这种编程语言设计确实可以防止您在多线程编程中犯许多错误.我们已经看到上述每个错误都是有效的-如果忽略每个错误,则可能是导致内存不安全或数据争用的潜在原因.

This may seem overly and unnecessarily complex, however, such design of a programming language does prevent you from making many errors in multithreaded programming. Each of the above errors we have seen is valid - each of them would be a potential cause of memory unsafety or data races if it was ignored.

这篇关于如何将函数发送到另一个线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆