使用消息传递在线程间转移数据

使用消息传递机制来保证并发安全正在越来越流行。正如Go语言中通过通信来共享内存的思路一样。 Rust中也提供了通道的编程概念。将通过分为发送者(transmitter)和接收者(receiver)两部分。

代码通过调用发送者发送数据,也可以调用接收者获取数据。

丢弃了发送者和接收者的任何一端,都会让通道关闭。

来看一个简单的示例:

  • 创建一个通道:多生产者,单消费者模型。一个通道可以有多个发送端,但只有一个接收端。
// mpsc,multiple producer, single consumer。
use std::sync::mpsc;
 
fn main() {
    let (tx, rx) = mpsc::channel();
}
  • 将发送端移动到spawn线程,并发送数据,主线程用接收端接收数据
use std::sync::mpsc;
use std::thread;
 
 
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
 
    let recv = rx.recv().unwrap();
    println!("Got: {}", recv);
}

接收端的recv阻塞当前线程,直到有数据读入包裹在Result<T,E>中返回,如果发送端关闭了,那么就返回一个错误。 还有一个try_recv,这个返回类型也是Result<T,E>,但是不会阻塞,而是读取到值就返回Ok,读取不到就返回错误,可以编写一个不阻塞的程序。

通道和所有权转移

调用发送端的send函数,会消耗数据,也就是说,会将所有权转移给接收者,这样可以避免意外使用已发送的值。 如下示例就会报错:

use std::sync::mpsc;
use std::thread;
 
 
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val); // error: 使用已经move的值
    });
 
    let recv = rx.recv().unwrap();
    println!("Got: {}", recv);
}

发送多个值时接收者的等待过程

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
 
    thread::spawn(move || {
        let v = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in v {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
 
    for received in rx {
        println!("Got: {}", received);
    }
}

接收端可以视作迭代器,迭代中的代码会打印出每个接收到的值,并在通道关闭时退出循环。 在上面的代码中,spawn线程结束后,发送端就会被释放,通道就会被关闭。