最简单的并发
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// 阻塞,否则主线层退出后,并发直接被结束
handle.join().unwrap();
}
进程通信
use std::thread;
// mpsc是英文“multiple producer, single consumer”(多个生产者,单个消费者)的缩写
use std::sync::mpsc;
fn main() {
// tx:Sender,rx:receiver
let (tx, rx) = mpsc::channel();
// move 关键词把 tx 移动到闭包里面
thread::spawn(move || {
let val = String::from("hi");
// 子进程发送信息
tx.send(val).unwrap();
// val 的所有权被移动到 channel 中
});
// 主进程接收信息
let received = rx.recv().unwrap();
// recv 阻塞子进程,
// 如果用 try_recv 就不阻塞,如果 channel 中没有信息,就返回 Err
println!("Got {}", received);
}
演示:一个一个地发送多条信息
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
// 要点1:一个一个发送多条信息
let vals = vec![String::from("hi1"), String::from("hi2"), String::from("hi3")];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 要点2:另开一个线程,发送信息
thread::spawn(move || {
let vals = vec![String::from("hello1"), String::from("hello2"), String::from("hello3")];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got {}", received);
}
}
得到
Got hello1
Got hi1
Got hello2
Got hi2
Got hello3
Got hi3
用循环开启多条子线程
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 开 5 个sender
let mut sender_vec = Vec::new();
for _ in 0..5 {
sender_vec.push(mpsc::Sender::clone(&tx))
}
for tx_tmp in sender_vec {
thread::spawn(move || {
let vals = vec![String::from("1"), String::from("2"), String::from("3")];
for val in vals {
tx_tmp.send(format!("Worker {:?} data = {}", thread::current().id(), val)).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
}
for received in rx {
println!("Got {}", received);
}
}
Mutex
Mutex:https://weread.qq.com/web/reader/d733256071eeeed9d7322fdkc9e32940268c9e1074f5bc6
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
有锁并发
好处:代码写起来十分方便,只需要按照单线程的版本写完,然后给数据结构加上锁即可。
坏处:当线程数量比较大的时候,大部分的时间都被用在了等待锁上。死锁的问题。
短命线程
// crossbeam="*"
use std::time::Duration;
use chrono::Utc;
use std::thread;
// 被并发的函数
fn myfunc() -> i32 {
println!("Worker {:?} start at {}", thread::current().id(), Utc::now());
thread::sleep(Duration::from_millis(500));
println!("Worker {:?} end at {}", thread::current().id(), Utc::now());
return 1;
}
fn main() {
let res = crossbeam::scope(|s| {
// let mut res = 0;
let mut res = Vec::new();
let mut thread_vec = Vec::new();
// 开启线程
for _ in 0..20 {
thread_vec.push(s.spawn(|_| myfunc()));
}
// 收集结果
for _ in 0..20 {
// 这个用remove,而不是 thread_vec[i],是因为
// rust 的vec 不允许数据提取出来的同时,还同时在列表里面
let res1 = thread_vec.remove(0).join().unwrap();
res.push(res1);
}
res
}).unwrap();
println!("{:?}", res);
}
例子:
// crossbeam = "*"
fn main() {
let arr = &[1, 25, -4, 10,11,19,80,39];
let max = find_max(arr);
println!("{}==25", max.unwrap());
}
fn find_max(arr: &[i32]) -> Option<i32> {
const THRESHOLD: usize = 2;
println!("{:?}",arr);
if arr.len() <= THRESHOLD {
return arr.iter().cloned().max();
}
let mid = arr.len() / 2;
let (left, right) = arr.split_at(mid);
crossbeam::scope(|s| {
let thread_l = s.spawn(|_| find_max(left));
let thread_r = s.spawn(|_| find_max(right));
let max_l = thread_l.join().unwrap()?;
let max_r = thread_r.join().unwrap()?;
Some(max_l.max(max_r))
}).unwrap()
}
crossbeam_channel
use crossbeam_channel::{bounded, unbounded};
// 创建一个容量是5的channel
let (s, r) = bounded(5);
// 5条消息之内都不会阻塞
for i in 0..5 {
s.send(i).unwrap();
}
// 超过5条就会阻塞了
// s.send(5).unwrap();
// 创建一个无限容量的channel,很多也不会阻塞
let (snd, rev) = unbounded();
使用
// 被并发的函数
fn myfunc(s: &Sender<i32>, r: &Receiver<i32>) -> i32 {
// 发送一个消息然后接受一个消息
let num_snd = rand::thread_rng().gen_range(0..100);
s.send(num_snd).unwrap();
println!("Worker {:?} start {} at {}", thread::current().id(), num_snd, Utc::now());
thread::sleep(Duration::from_millis(500));
let num_rec = r.recv().unwrap();
println!("Worker {:?} end {} at {}", thread::current().id(), num_rec, Utc::now());
return 1;
}
并行维护全局变量
use error_chain::error_chain;
use lazy_static::lazy_static;
use std::sync::Mutex;
error_chain!{ }
lazy_static! {
static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}
fn insert(fruit: &str) -> Result<()> {
let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
db.push(fruit.to_string());
Ok(())
}
fn main() -> Result<()> {
insert("apple")?;
insert("orange")?;
insert("peach")?;
{
let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
}
insert("grape")?;
Ok(())
}
好用的并行实现
use rayon::prelude::*;
fn main() {
let mut arr = [0, 7, 9, 11];
// 经过测试,这是并行的
arr.par_iter_mut().for_each(|p| *p -= 1);
println!("{:?}", arr);
}
很多其它类似的操作,find_any
// 返回任意一个符合条件的值 Option
let f1 = arr.par_iter().find_any(|&&x| x > 9);
find_any(返回bool的函数)
:返回Option,任意一个符合格式的数据filter(返回bool的函数)
map
sum, count
统计reduce
案例:
use rayon::prelude::*;
struct Person {
age: u32,
}
fn main() {
let v: Vec<Person> = vec![
Person { age: 23 },
Person { age: 19 },
Person { age: 42 },
Person { age: 17 },
Person { age: 17 },
Person { age: 31 },
Person { age: 30 },
];
let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
let sum_over_30 = v.par_iter()
.map(|x| x.age)
.filter(|&x| x > 30)
.reduce(|| 0, |x, y| x + y);
let alt_sum_30: u32 = v.par_iter()
.map(|x| x.age)
.filter(|&x| x > 30)
.sum();
let avg_over_30 = sum_over_30 as f32 / num_over_30;
let alt_avg_over_30 = alt_sum_30 as f32/ num_over_30;
assert!((avg_over_30 - alt_avg_over_30).abs() < std::f32::EPSILON);
println!("The average age of people older than 30 is {}", avg_over_30);
}