让我们从 async/await 说起。和其他语言类似,async/await 不过是一种语法糖。承接这个的概念,在 C# 中是 Task,在 JavaScript 中是 Promise,在 Rust 中则是 Future。
pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } pub enum Poll<T> { Ready(T), Pending, }
这个 trait 中,值得讲的就是 Poll 和 Pin 两个概念。
Poll
Rust async/await 这类 stackless coroutine 的本质上是一个状态机。对于一个 async 函数,编译器会以
await 的调用处为切割点,生成多个内部状态(比如初始态、某个 await 的挂起态、完成态等)。executor 每次调用一次 poll,future 就尝试从当前内部状态向前推进:如果当前还不能继续,就返回 Poll::Pending;如果已经完成,就返回 Poll::Ready。需要注意的是,Pending 不是让 executor 忙等自旋;future 在返回 Pending 后应在“可继续推进”时通过 waker 通知 executor,executor 被唤醒后再继续 poll。假设我们有这样一段代码
async fn foo() -> Result<String, String> { Ok("foo".to_string()) } async fn main() { let x = foo().await.unwrap(); println!("{}", x); }
Rust MIR 的表示如下 godbolt
// WARNING: This output format is intended for human consumers only // and is subject to change without notice. Knock yourself out. // HINT: See also -Z dump-mir for MIR at specific points during compilation. fn foo() -> {async fn body of foo()} { let mut _0: {async fn body of foo()}; bb0: { _0 = {coroutine@/app/example.rs:1:42: 3:2 (#0)}; return; } } fn foo::{closure#0}(_1: Pin<&mut {async fn body of foo()}>, _2: &mut Context<'_>) -> Poll<Result<String, String>> { debug _task_context => _2; let mut _0: std::task::Poll<std::result::Result<std::string::String, std::string::String>>; let mut _3: std::string::String; let mut _4: &str; let mut _5: std::result::Result<std::string::String, std::string::String>; let mut _6: u32; let mut _7: &mut {async fn body of foo()}; bb0: { _7 = copy (_1.0: &mut {async fn body of foo()}); _6 = discriminant((*_7)); switchInt(move _6) -> [0: bb1, 1: bb5, 2: bb4, otherwise: bb6]; } bb1: { _4 = const "foo"; _3 = <str as ToString>::to_string(move _4) -> [return: bb2, unwind: bb3]; } bb2: { _5 = Result::<String, String>::Ok(move _3); _0 = Poll::<Result<String, String>>::Ready(move _5); discriminant((*_7)) = 1; return; } bb3 (cleanup): { discriminant((*_7)) = 2; resume; } bb4: { assert(const false, "`async fn` resumed after panicking") -> [success: bb4, unwind continue]; } bb5: { assert(const false, "`async fn` resumed after completion") -> [success: bb5, unwind continue]; } bb6: { unreachable; } } alloc1 (size: 3, align: 1) { 66 6f 6f │ foo } fn main() -> {async fn body of main()} { let mut _0: {async fn body of main()}; bb0: { _0 = {coroutine@/app/example.rs:5:17: 8:2 (#0)}; return; } } fn main::{closure#0}(_1: Pin<&mut {async fn body of main()}>, _2: &mut Context<'_>) -> Poll<()> { debug _task_context => _2; let mut _0: std::task::Poll<()>; let _3: std::string::String; let mut _4: {async fn body of foo()}; let mut _5: {async fn body of foo()}; let mut _6: std::task::Poll<std::result::Result<std::string::String, std::string::String>>; let mut _7: std::pin::Pin<&mut {async fn body of foo()}>; let mut _8: &mut {async fn body of foo()}; let mut _9: &mut std::task::Context<'_>; let mut _10: isize; let mut _12: &mut std::task::Context<'_>; let mut _13: u32; let mut _14: &mut {async fn body of main()}; scope 1 { debug __awaitee => (((*_14) as variant#3).0: {async fn body of foo()}); let _11: std::result::Result<std::string::String, std::string::String>; scope 2 { debug result => _11; } } bb0: { _14 = copy (_1.0: &mut {async fn body of main()}); _13 = discriminant((*_14)); switchInt(move _13) -> [0: bb1, 1: bb17, 2: bb16, 3: bb15, otherwise: bb7]; } bb1: { _5 = foo() -> [return: bb2, unwind: bb14]; } bb2: { _4 = <{async fn body of foo()} as IntoFuture>::into_future(move _5) -> [return: bb3, unwind: bb14]; } bb3: { (((*_14) as variant#3).0: {async fn body of foo()}) = move _4; goto -> bb4; } bb4: { _8 = &mut (((*_14) as variant#3).0: {async fn body of foo()}); _7 = Pin::<&mut {async fn body of foo()}>::new_unchecked(copy _8) -> [return: bb5, unwind: bb13]; } bb5: { _9 = copy _2; _6 = <{async fn body of foo()} as Future>::poll(move _7, copy _9) -> [return: bb6, unwind: bb13]; } bb6: { _10 = discriminant(_6); switchInt(move _10) -> [0: bb9, 1: bb8, otherwise: bb7]; } bb7: { unreachable; } bb8: { _0 = Poll::<()>::Pending; discriminant((*_14)) = 3; return; } bb9: { _11 = move ((_6 as Ready).0: std::result::Result<std::string::String, std::string::String>); drop((((*_14) as variant#3).0: {async fn body of foo()})) -> [return: bb10, unwind: bb14]; } bb10: { _3 = Result::<String, String>::unwrap(copy _11) -> [return: bb11, unwind: bb14]; } bb11: { drop(_3) -> [return: bb12, unwind: bb14]; } bb12: { _0 = Poll::<()>::Ready(const ()); discriminant((*_14)) = 1; return; } bb13 (cleanup): { drop((((*_14) as variant#3).0: {async fn body of foo()})) -> [return: bb14, unwind terminate(cleanup)]; } bb14 (cleanup): { discriminant((*_14)) = 2; resume; } bb15: { _12 = move _2; _2 = move _12; goto -> bb4; } bb16: { assert(const false, "`async fn` resumed after panicking") -> [success: bb16, unwind continue]; } bb17: { assert(const false, "`async fn` resumed after completion") -> [success: bb17, unwind continue]; } }
我们可以对其简化一下,这样描述(不严谨,大概理解一下即可)
fn main() { state_of_main = ... match state_of_main { init => init(), foo_pending => foo_pending(), foo_ready => foo_ready(), // ... 如果有更多的 await,那么这里就有相应增加(N+1关系)。 } fn init() { future_of_foo = foo() match poll(future_of_foo) { pending => { set_state(state_of_main, foo_pending) return } ready(result_of_foo) => { set_state(state_of_main, foo_ready) goto foo_ready } } } fn foo_pending() { match poll(future_of_foo) { pending => { return } ready(result_of_foo) => { set_state(state_of_main, foo_ready) goto foo_ready } } } fn foo_ready() { s = result_of_foo.unwrap() println!("{}", s) } }
Pin & Unpin
自引用
在讲到 Pin 之前我们需要先了解自引用,才能知道 Pin 解决了什么问题。那么,什么是自引用?可以参考这个例子
#[derive(Debug)] struct A { value: [u32; 3], ptr: *mut u32, } fn _move(a: A) { println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); } fn main() { let mut a = A { value: [1, 2, 3], ptr: std::ptr::null_mut(), }; a.ptr = a.value.as_ptr() as *mut u32; println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); _move(a); } // a.value: 0x16b221d48, a.ptr: 0x16b221d48 // a.value: 0x16b221de8, a.ptr: 0x16b221d48
这个例子中,因为发生了 move,导致了地址发生了变更,但是
ptr 没有被修正,后续代码访问这个指针会出现 UB。async 代码中同样存在这个问题:编译器生成的状态机需要跨多次 poll 调用保存局部变量,如果某个 await 点之前创建的变量在后续状态中仍被引用(例如一个借用跨越了 await 点),状态机结构体就会形成自引用。一旦这样的 future 在 pin 之后被 move,内部指针同样会失效。需要注意的是:future 在被 pin 之前是可以 move 的;但一旦被 pin(例如被 executor 持有并轮询)以后,对于 !Unpin future 就不能再被 move,否则同样会破坏这些内部引用关系。Box
其实在内存分配上来讲,如果要解决上边的自引用问题,那么通过在堆上分配内存即可解决问题。比如上边的例子这样修改以后:
fn _move_boxed(a: Box<A>) { println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); } fn main() { let mut a = Box::new(A { value: [1, 2, 3], ptr: std::ptr::null_mut(), }); a.ptr = a.value.as_ptr() as *mut u32; println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); _move_boxed(a); } // a.value: 0x147e05e18, a.ptr: 0x147e05e18 // a.value: 0x147e05e18, a.ptr: 0x147e05e18
但是这样还不够——
Box<T> 会通过 DerefMut 暴露出 &mut T,safe code 仍然可以用 std::mem::swap 或 std::mem::replace 把堆上的值 move 走,同样会导致 ptr 失效。为了解决这个问题,Rust 引入了 Pin<Ptr>,它在类型系统层面约束:通过这个指针不能把底层值 move 走。Unpin 则是配套的标记 trait,用来描述哪些类型即使被 pin 也允许按普通方式 move。Pin
#[derive(Copy, Clone)] pub struct Pin<Ptr> { pointer: Ptr, }
可以看到,Pin 其实只是一个 newtype,因此它是零成本抽象。它本身并不会改变底层数据,而是通过类型系统收紧可用 API:对于
T: !Unpin,safe code 不能通过 pinned 指针把 T move 走。use std::pin::Pin; #[derive(Debug)] struct A { value: [u32; 3], ptr: *mut u32, _phantom: std::marker::PhantomPinned, } fn _move_pinned(a: Pin<Box<A>>) { println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); } fn main() { let a: Pin<Box<A>> = { let mut a = Box::new(A { value: [1, 2, 3], ptr: std::ptr::null_mut(), _phantom: std::marker::PhantomPinned, }); a.ptr = a.value.as_ptr() as *mut u32; a.into() }; println!("a.value: {:p}, a.ptr: {:p}", &a.value, a.ptr); _move_pinned(a); } // a.value: 0x147e05e18, a.ptr: 0x147e05e18 // a.value: 0x147e05e18, a.ptr: 0x147e05e18
Pin 的保护作用——对于
!Unpin 类型,safe code 不能通过 pinned 指针把底层值 move 走,也无法直接拿到普通 &mut T。下面这个例子展示的是“不能直接改字段”这类受限访问:// 编译错误示例 let mut a: Pin<Box<A>> = { /* ... */ }; a.as_mut().ptr = a.value.as_ptr() as *mut u32; // error[E0594]: cannot assign to data in dereference of `Pin<&mut A>`
这并不等于字段绝对不可变:像Cell/RefCell这类 interior mutability 仍可修改内部状态。
有了这个约束后,再看
Unpin 就更好理解了,Unpin 本质上是在说:这个类型即使被 pin,也可以按普通可移动类型来处理。Unpin
Unpin 是一个 trait,它描述的是数据类型 move 后也是安全的。Rust 中的基本数据类型都是 Unpin 的。
pub auto trait Unpin {}
auto trait 的作用是,如果一个类型的所有字段都实现了一个 trait,那么这个数据类型就实现了这个 trait。
!Unpin 则表示取反,也就是 move 不一定是安全的。PhantomPinned 实现了 !Unpin。上边的例子中 struct A 因为加上了一个 _phantom: std::marker::PhantomPinned 导致它从 Unpin 变成了 !Unpin。#[derive(Debug)] struct A { value: [u32; 3], ptr: *mut u32, _phantom: std::marker::PhantomPinned, } fn unpin_only(a: impl Unpin) { let _ = a; } fn main() { let a = A { value: [1, 2, 3], ptr: std::ptr::null_mut(), _phantom: std::marker::PhantomPinned, }; // unpin_only(a); // 编译错误:A 是 !Unpin unpin_only(Box::pin(a)); // 可以:Box<T> 始终实现 Unpin(移动指针不移动堆数据), // 因此 Pin<Box<A>> 也实现了 Unpin }
总结
Poll:executor 驱动 future 推进的返回值,Pending表示未完成且需要后续通过waker再次调度,Ready表示完成。Pin<Ptr>:包裹指针,在类型系统层面阻止通过该指针将底层!Unpin值 move 走。Unpin:标记 trait,实现了它的类型即使被 pin 也可以安全 move;!Unpin则相反。