使用互斥锁从多个线程并发访问向量 [英] Concurrent access to vector from multiple threads using a mutex lock

查看:45
本文介绍了使用互斥锁从多个线程并发访问向量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Tokio 库提供的示例,并尝试获取所有当前活动 TCP 连接的向量.最终,我希望能够向每个活动连接广播一条消息,方法是遍历它们并将消息写入套接字.

I'm using an example provided by the Tokio library and attempting to have a vector of all the currently active TCP connections. Ultimately, I would like to be able to broadcast a message to each of the active connections, by looping through them and writing a message to the socket.

首先,我试图打印出一个线程中的当前连接数,同时接受另一个线程中的连接.

To start with, I am trying to print out the current number of connections in one thread whilst accepting connections in another.

为此,我尝试使用共享向量.当它们断开连接时,我还没有实现从向量中删除连接.

To do this, I'm trying to use a shared vector. I've not yet implemented the removal of connections from the vector as and when they disconnect.

// A tiny async echo server with tokio-core
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::stdout;
use std::io::Write;

fn main() {
    // Create the event loop that will drive this server
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

    let mut connections = Arc::new((Mutex::new(Vec::new())));

    thread::spawn(move || {
        //Every 10 seconds print out the current number of connections
        let mut i;
        loop {              
          i = connections.lock().unwrap().len();
          println!("There are {} connections", i);
          stdout().flush();
          thread::sleep_ms(10000);
        }
    });



    // Iterate incoming connections
    let server = tcp.incoming().for_each(|(tcp, _)| {

        connections.lock().unwrap().push(tcp);
        // Split up the read and write halves
        let (reader, writer) = tcp.split();

        // Future of the copy
        let bytes_copied = io::copy(reader, writer);

        // ... after which we'll print what happened
        let handle_conn = bytes_copied.map(|(n, _, _)| {
            println!("wrote {} bytes", n)
        }).map_err(|err| {
            println!("IO error {:?}", err)
        });

        // Spawn the future as a concurrent task
        handle.spawn(handle_conn);

        Ok(())
    });

    // Spin up the server on the event loop
    core.run(server).unwrap();

}

目前无法构建并出现以下错误:

At the moment this is failing to build with the following errors:

error[E0382]: capture of moved value: `connections`
  --> src/main.rs:36:42
   |
26 |     thread::spawn(move || {
   |                   ------- value moved (into closure) here
...
36 |     let server = tcp.incoming().for_each(|(tcp, _)| {
   |                                          ^^^^^^^^^^ value captured here after move
   |
   = note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `tcp`
  --> src/main.rs:40:32
   |
38 |         connections.lock().unwrap().push(tcp);
   |                                          --- value moved here
39 |         // Split up the read and write halves
40 |         let (reader, writer) = tcp.split();
   |                                ^^^ value used here after move
   |
   = note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait

是否可以在不编写任何不安全代码的情况下实现这一目标?

Is it possible to achieve this without writing any unsafe code?

推荐答案

由于移动闭包,您得到第一个错误:

You get the first error because of the move closure:

let mut connections = Arc::new((Mutex::new(Vec::new())));
thread::spawn(move || {
    let mut i = connections.lock().unwrap().len();
    ....
}

这实际上移动了整个Arc,而您只想移动它的一部分"(也就是说,以引用计数递增的方式移动它,并且两个线程可以使用).

This actually moves the whole Arc, while you only want to move "a part" of it (that is, move it in such a way that the reference count is incremented, and that both threads can use it).

为此,我们可以使用 Arc::clone:

To do this, we can use Arc::clone:

let mut connections = Arc::new((Mutex::new(Vec::new())));
let conn = connections.clone();
thread::spawn(move || {
    let mut i = conn.lock().unwrap().len();
    ....
}

这样,克隆的Arcconn,被移到了闭包中,而原来的Arcconnections,不是,因此仍然可用.

This way, the cloned Arc, conn, is moved into the closure, and the original Arc, connections, is not, and hence still usable.

我不确定您对第二个错误究竟做了什么,但为了简单地计算连接数,您不需要push整个事情.

I'm not sure exactly what you are doing with your second error, but for the sake of simply counting the connections you do not need to push the entire thing.

这篇关于使用互斥锁从多个线程并发访问向量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆