400 - 비동기 Rust에서의 취소 안전성 다루기 / RFD

ko생성일: 2025. 9. 10.갱신일: 2025. 9. 11.

비동기 Rust에서 취소 안전성과 취소 정합성을 실용적으로 다루는 방법을 안내한다. 취소의 개요와 주요 발생 원인, 라이브러리 작성 시 취소-안전 코드를 설계·표기하는 법, 애플리케이션 소비자 입장에서 취소-비안전 future를 다루는 기법을 사례와 함께 설명한다.

소개

컨트롤 플레인을 개발하면서, 실무에서 큰 문제 중 하나가 비동기 취소 안전성(cancel safety)과 관련된 취소 정합성(cancel correctness)이었다: 즉, future가 drop될 때 비동기 Rust 시스템이 상대적으로 예측 불가능하게 동작하는 현상이다 [rfd397]. 취소는 일종의 “원격에서 일어나는 으스스한 작용(spooky action at a distance)”이며, 이를 항상 고려해야 한다는 점은 비동기 코드를 작성·리뷰하는 사람들에게 상당한 인지적 부담을 준다. 이 두 가지는 Rust의 일반적인 특성과는 거리가 멀다.

이 문서는 Rust에서 취소 안전성을 다루는 실용적인 가이드를 제시하며, 네 가지 주요 주제를 다룬다:

  1. 비동기 취소에 대한 전반적 개요는 무엇인가?

  2. 비동기 Rust에서 취소의 주요 근원은 무엇인가?

  3. 비동기 라이브러리 코드 작성자로서, 어떻게 취소-안전 코드를 작성하는가? 그리고 어떤 API가 취소-비안전임을 소비자에게 명시하는가?

  4. 비동기 코드를 소비하는 애플리케이션으로서, 취소-비안전 future를 어떻게 다루는가?

이 문서의 대상은 Omicron을 포함해 일반적인 맥락에서 비동기 Rust 코드를 작성하거나 리뷰하는 모든 사람이다.

수동 구현 future vs async 블록

Rust의 future는 근본적으로 지연 계산을 위한 상태 기계(state machine)다. Rust에서는 future를 두 가지 방식으로 작성할 수 있다.

  1. 수동으로 구현한 future는 std::future::Future를 직접 구현한 값이다. 예를 들어, 즉시 값을 반환하는 수동 구현 future는 다음과 같다.

use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};struct MyFuture { value: Option<String>,}impl Future for MyFuture { type Output = String; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { return Poll::Ready(self.value.take().expect("future polled after completion")); }} 2. async 블록은 async { }로 감싸거나 async fn 함수로 작성한 코드 덩어리다. 예를 들어, 즉시 값을 반환하는 async 블록은 다음과 같다:

async { let output: String = "future-output".to_owned(); output}

두 방식 모두 취소 안전성 문제에 취약하지만, async 블록은 future가 어떻게 동작하는지의 기계적 면을 일부 감춘다. 실무에서는 취소와 관련된 숨어 있는 놀라움의 상당수가 수동 구현 future보다는 async 블록에서 발생한다.

취소 개요

future가 drop되면 그 future는 “취소(cancelled)”된다. 이는 해당 future의 상태 기계가 drop된 시점 이후로는 더 이상 실행을 이어가지 않음을 의미한다. (future는 첫 poll 전에 drop될 수도 있으며, 이 경우 전혀 실행되지 않는다.)

이 점은 수동 구현 future에서 가장 보기 쉽다. poll을 몇 번 호출해 Poll::Pending을 반환하다가, Poll::Ready를 반환하기 전에 future를 drop하는 것과 같다.

취소가 발생하면, future가 소유한 값들은 drop된다. 여기에는 future가 소유한 다른 future들도 포함될 수 있다. 즉, 부모 future로부터 자식 future로 취소가 전파된다.

let semaphore = Semaphore::new(2);async { let permit = semaphore.acquire().await.unwrap(); println!("permit acquired"); tokio::time::sleep(Duration::from_secs(5)).await;}

  • async 블록이 퍼밋을 획득하기 “전”에 drop되면, semaphore.acquire()가 반환한 future가 drop된다.

  • async 블록이 퍼밋을 획득한 “후” 슬립 중에 drop되면, permit이 drop되고 그 Drop 구현이 호출된다.

두 경우 모두 async 블록이 semaphore의 소유권을 가져가지 않았기 때문에(async move가 아님) semaphore는 drop되지 않는다.

또한, future의 협력적(cooperative) 멀티태스킹 모델 때문에 취소는 await 지점에서만 발생할 수 있다. 예를 들어 다음 Rust 코드를 보자:

use std::time::Duration;async fn my_func() { print!("1"); print!(", 2"); tokio::time::sleep(Duration::from_secs(5)).await; print!(", 3"); print!(", 4");}

이 코드는 다음 중 하나를 출력할 수 있다:

  • 1, 2, 3, 4: my_func()가 끝까지 실행된 경우.

  • 1, 2: my_func()가 await 지점까지 실행되었다가 그 이후에 취소된 경우. (예: tokio::time::timeout(Duration::from_secs(1), my_func()))

  • 아무것도 출력되지 않음: my_func()가 await되기 전에 취소된 경우.

my_func()1만 출력하거나 1, 2, 3까지 출력하는 등 중간 상태를 출력할 수 없다.

이 또한 수동 구현 future에서 보기 쉽다. poll이 호출되는 “중간”에 future를 drop할 수는 없지만, poll 호출과 호출 사이에서는 drop할 수 있다.

또한 취소는 동기 코드만 호출할 수 있다. 이는 Drop 함수가 async가 아니기 때문이다. AsyncDrop을 추가하려는 계획이 있지만, 2023-06 시점에는 아직 갈 길이 멀다.

그리고 취소는 종종 버그가 아니라 기능이다. 많은 취소는 원하고 바람직하다. 예를 들어 tokio::sync::SemaphorePermit을 기다리는 중 그 future를 drop하면, 대기열에서 자신의 순서를 잃는다. 이는 더 이상 존재하지 않는 취득자를 위해 자원을 낭비하지 않게 해 주므로 좋다.

대안으로, 보다 명시적인 취소 모델은 이에 대응하는 자체적인 방식들이 있다[1].

취소 안전성(cancel safety)

future가 완료되기 전에 drop되더라도 시스템의 나머지 부분에 영향을 주지 않는다면 그 future는 “취소-안전(cancel-safe)”하다고 한다.

예를 들어, Tokio의 MPSC 채널에는 메시지를 비동기적으로 수신하는 Receiver::recv 메서드가 있다. 이 메서드는 취소-안전하다: 완료되어 메시지를 반환하거나, drop되어 어떤 메시지도 수신하지 않는다. Receiver::recv에는 “큐에서 메시지를 꺼냈지만 호출자에게는 아직 전달하지 않은” 중간 상태가 없다.

주목할 점은, Receiver::recv의 취소 안전성은 신중한 API 설계와 구현의 결과라는 것이다. 기다리는 동안 메시지를 잡고서 await 지점을 넘기는 구현(데이터 손실을 유발할 수 있음)도 만들 수 있다. 하지만 현재의 recv는 그렇게 동작하지 않는다. 앞으로도 그렇게 동작한다는 점은 API 문서(그리고 아마도 Tokio의 테스트)가 보장한다.

여기서의 정의에서, 취소 안전성은 future의 “지역적(local)” 속성이다.

취소 정합성(cancel correctness)

우리는 “취소 정합성”을 “전역적(global)” 속성으로 정의한다: 시스템 내부의 future들이 취소될 수 있다는 점을 고려해도 시스템이 올바르게 동작할 수 있는 능력이다.

취소 정합성은 아래 세 가지 조건이 모두 만족될 때 침해된다:

  1. 시스템 어딘가에 취소-안전하지 않은 future가 있다.

  2. 그 future가 실제로 취소된다.

  3. 그 future를 취소하는 것이 시스템의 어떤 속성을 위반한다.

각각을 풀어 보자:

시스템 어딘가에 취소-안전하지 않은 future가 있다. 취소-안전한 future만으로 이루어진 시스템은 취소 정합성 버그가 없다. [making_cancel_safe]에서는 취소-비안전 future를 취소-안전하게 만드는 방법을 다룬다.

그 future가 취소된다. 취소 정합성 문제는 실제로 future가 취소되는 경우에만 발생한다. 언뜻 사소해 보일 수 있지만, 보통 future가 취소되지 않도록 구성하는 방법들이 존재하기 때문에 기억해 둘 가치가 있다.

그 future를 취소하면 시스템 속성이 위반된다. 정합성에 필요한 속성은 다음에 국한되지는 않지만, 예를 들어 다음이 있다:

  • 데이터 손실 금지. 스트림을 읽거나 쓰는 future가 취소되더라도, 전송 중인 데이터가 유실되어서는 안 된다. 사례 연구: Oxide 시리얼 콘솔. 필요하다면, 취소된 지점에서 읽기/쓰기를 재개할 새 future를 만들 수 있어야 한다.

  • 불변식 유지. future가 취소되더라도, 그것이 다루던 공유 상태는 유효하지 않은 상태로 남지 않아야 한다.

Rust 코드에서, 일반적으로 &mut 참조를 보유하는 동안 일시적으로 상태가 무효가 되는 건 괜찮다. 단, &mut 참조가 해제될 때까지 상태를 다시 유효하게 만들면 된다. 이는 적어도 동기 코드에 관해 Rust가 “두려움 없는 동시성”을 주장하는 중요한 부분이다. 비동기 코드에서 이것에 상응하는 원칙은, 취소-안전한 future에서는 공유 상태가 await 지점 사이를 넘어 무효 상태로 남아 있어서는 안 된다는 것이다. 사례 연구: wicketd의 installinator 진행 상황 추적.

이 문제를 바라보는 한 가지 관점은 future의 “취소 폭발 반경(cancellation blast radius)”이다: 어떤 future가 취소되었을 때 영향을 받는 시스템의 다른 부분들이다.

예를 들어, tokio::time::sleep은 완전히 독립적이다. 이것을 취소해도 시스템의 나머지에는 영향이 없다.

내부 가변 상태를 갖되 공유 상태는 읽기 전용으로만 접근하는 코드에서의 취소도 괜찮다. 내부 상태가 무효가 되더라도 즉시 폐기되기 때문이다.

공유 가변 상태를 무효로 만드는 취소는 문제를 야기한다. 이는 특히 Tokio의 mutex에서 흔하다. 그래서 이 가이드는 Tokio mutex 사용을 피할 것을 권한다.

프로세스 외부의 상태에 영향을 주는 취소는 특히 위험하다. 예시는 이 사례 연구를 보라.

  • 정리(cleanup). 실행 말미에 정리 작업을 수행하는 future들을 설정하는 시스템은, 그 일부 또는 전부가 취소되는 상황에도 견고해야 한다. 데이터베이스 커넥션 풀의 경우, 트랜잭션 “끝”에서의 정리뿐 아니라, 트랜잭션 “시작” 시 커넥션이 깨끗한 상태인지 추가 확인해야 할 수 있다. 사례 연구: async_bb8_diesel.

  • 공정성. 어떤 future들(예: tokio::sync::Mutex::lock)은 drop될 때 대기열에서 “자신의 순서”를 잃는다. 이런 future를 취소하는 시스템은 공정성의 결여에 견딜 수 있어야 한다.

공정성은 특히 경합이 적은 큐에서 다른 이슈들보다 덜 중요할 때가 많다. 공정성만 빼면 취소-안전한 future를 “대체로 취소-안전(mostly cancel-safe)”하다고 표현할 수 있다.

어떤 시스템 속성을 유지해야 하는지도 시간에 따라 바뀔 수 있다. 예를 들어, HTTP URL을 가져와 디스크에 쓰는 curl 유사 프로그램을 생각해 보자. 디스크 쓰기 작업이 future로 모델링되어 있다면, 이를 취소하는 것은 보통 정합성 버그로 이어질 것이다. 하지만 사용자가 Ctrl-C로 프로그램을 종료한다면, 이 future를 취소하는 것이 허용될 수 있다.

취소의 근원

future는 drop될 때마다 취소되지만, 특정 코드 패턴들은 future가 자주 취소되도록 만든다. 이 절에서는 future 취소가 얽힌 흔한 패턴들을 나열한다(완전한 목록은 아니다).

select! 매크로

아마도 가장 흔한 비동기 취소 버그의 근원은 여러 future를 동시에 poll하여 가장 먼저 완료되는 것에 반응하는 tokio::select!일 것이다.

다음 예를 보자:

use std::time::Duration;let (sender, receiver) = tokio::sync::mpsc::channel(8);spawn_send_task(sender); // give sender off to a task to send values fromlet sleep = tokio::time::sleep(Duration::from_secs(5));tokio::select! { value = receiver.recv() => { // ... process value } _ = sleep => { println!("sleep completed"); }}

  • sleepreceiver.recv()보다 먼저 완료되면 receiver.recv() future가 취소된다. 그러나 이는 괜찮다. Receiver::recv는 문서로 취소-안전함이 보장되기 때문이다.

  • receiver.recv()sleep보다 먼저 완료되면 sleep future가 취소된다. sleep future는 개념적으로 상태가 없다[2]. 이를 취소해도 시스템에는 영향이 없다.

select! 루프

select!는 가끔 일회성으로 쓰이지만, 현실에서는 대부분 루프와 함께 쓰인다. 위의 예를 이어서, spawn_send_task 함수를 생각해 보자:

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) { let strings: Vec<String> = vec![ "foo".to_owned(), "bar".to_owned(), "baz".to_owned(), ]; tokio::task::spawn(async move { for value in strings { sender.send(value).await; } });}

겉보기엔 관용적인 Rust 코드이고 문제 없어 보인다. 그런데 실무에서 채널로 값을 보내는 동작이 느려져 원인을 파악하기 위해 로깅을 넣고 싶다고 해 보자. 첫 시도로, sender.send(value) future 주변에 간단한 select!를 두어 주기적으로 로깅하기로 한다:

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<usize>) { let strings: Vec<String> = vec![ "foo".to_owned(), "bar".to_owned(), "baz".to_owned(), ]; tokio::task::spawn(async move { // interval.tick() completes execution at 0s, 1s, 2s... let mut interval = tokio::time::interval(Duration::from_secs(1)); for value in strings { tokio::select! { res = sender.send(value) => { if res.is_err() { println!("receiver closed"); break; } } _ = interval.tick() => { println!("interval tick"); } } } });}

위 코드는 다소 비자명한 방식으로 잘못되었다. 그 이유는 sender.send(value)취소-안전하지 않기 때문이다.

sender.send는 취소-안전하지 않을까? interval.tick()sender.send(value)보다 먼저 완료하는 상황을 생각해 보자. 그 경우 value는 전송되지 않는다. Rust의 단일 소유권 모델에 따라, 그 값은 drop된다—즉 공중으로 사라진다.

참고

Sender::send는 이 RFD가 작성되고 저자가 Tokio 프로젝트에 PR을 보낸 뒤인 Tokio 1.33에서야 문서에 취소-비안전함이 명시되었다. 이전에는 시그니처로 추론해야 했다: send가 값을 소유하므로, future가 완료되기 전에 drop되면 그 값은 소실된다.

이 문제를 해결하는 편리한 방법이 있으며, [spawn_send_task_2]에서 다룬다.

타임아웃

취소 문제의 또 다른 흔한 근원은 타임아웃이다. 다음은 tokio::time::timeout을 사용하는 코드다:

use std::time::Duration;let (sender, receiver) = tokio::sync::mpsc::channel();tokio::time::timeout(Duration::from_secs(5), sender.send(20)).await;

이는 sender.send()tokio::time::sleepselect!로 감싼 것과 동일하며, 마찬가지로 타임아웃이 발생하면 값이 유실된다. [complex_ops]에서 소개하는 해법이 이 문제도 해결한다.

일반적으로, select!와 본질적으로 동등한 연산은 select!와 같은 문제를 겪는다.

try-join

Rust 비동기 생태계에는 여러 “try-join” 구현이 있다:

이 구현들은 복수의 실패 가능 future를 await하고, 모든 future가 Ok를 반환하거나 어떤 future든 Err를 반환하면 완료한다. 공통점은 모두 **조기 취소(early cancellation)**를 수행한다는 것이다: 하나가 실패하는 즉시 나머지를 모두 취소한다.

이는 종종 올바른 선택이다. 아래 코드는 두 개의 HTTP GET을 실행하고, 하나가 실패하면 다른 하나를 취소한다(Rust playground).

use hyper::{client::HttpConnector, Uri}; // 0.14.27struct Users { /* ... */ }struct Posts { /* ... */ }async fn fetch_users_and_posts(, client: &hyper::Client<HttpConnector>,) -> hyper::Result<(Users, Posts)> { let users_fut = client.get(Uri::from_static("https://my.domain/api/users")); let posts_fut = client.get(Uri::from_static("https://my.domain/api/posts")); let (users_response, posts_response) = tokio::try_join!( users_fut, posts_fut )?; // -- process users_response and posts_response here let users = Users { /* ... */ }; let posts = Posts { /* ... */ }; Ok((users, posts))}

그러나 부작용이 있는 연산에서는 try_join이 버그를 일으킬 수 있다. 예를 들어, 두 개의 tokio::io::AsyncWrite 구현에 flush를 수행하는 코드를 보자:

use tokio::io::{AsyncWrite, AsyncWriteExt};let write1 = /* AsyncWrite impl, eg tokio::fs::File */;let write2 = /* another AsyncWrite impl */;let result = tokio::try_join!(write1.flush(), write2.flush());result?;

이 경우 하나의 flush가 실패하면, 다른 flush는 즉시 취소된다. 이는 대부분 부적절하다: 두 flush 작업 모두 가능한 한 많은 진전을 하게 해야 한다.

해결책은 [then_try_adapters]에서 다룬다.

태스크 중단(abort)

“future”는 소유자가 없으면 존재할 수 없지만, “태스크(task)”는 실행기(일반적으로 Tokio)가 소유한다. 태스크는 tokio::task::spawn으로 생성한다. 이 메서드는 생성한 태스크를 중단할 수 있는 JoinHandle을 반환한다. 태스크는 다음 await 지점에서 취소된다. 그 안에서 실행 중이던 future들도 함께 취소된다.

예를 들어 다음 태스크를 보자:

let join_handle = tokio::task::spawn(async move { do_long_running_operation().await;});// some time later...join_handle.abort();

abort() 호출로 인해 do_long_running_operation()이 반환한 future는 “모든 await 지점에서” 취소될 수 있다. 임의의 비동기 Rust 코드가 제어 흐름의 어느 곳에서든 발생하는 취소에 탄탄할 가능성은 낮다[3].

이는 취소 문제를 select! 같은 좁은 범위가 아닌 전역 문제로 만든다. 이 때문에 태스크 중단은 전적으로 피해야 한다고 저자는 믿는다. 특히 대부분의 경우 태스크 중단을 정상 제어 흐름의 일부로 취급하지 말아야 한다. 마치 패닉이 정상 제어 흐름이 아닌 것처럼.

장시간 실행 태스크를 취소하는 실질적 해법은 [explicit_cancellation]을 보라.

참고

개별 future와 달리, Tokio 태스크의 JoinHandle을 drop해도 해당 태스크는 취소되지 않는다. 이는 구현상의 선택이며—다른 실행기들은 drop 시 태스크를 취소하기도 한다—태스크가 실행기에 의해 소유되기 때문에 가능한 선택이다. 개별 future는 drop 시 반드시 취소되어야 한다.

런타임 종료

Tokio 런타임이 종료되면, 그 안에서 실행 중인 모든 태스크는 다음 await 지점에서 취소된다. 이는 기대된 동작이지만, 예상치 못한 결과를 끌고 오기도 한다.

흔한 패턴인 #[tokio::main]에서는 프로세스 종료 시 런타임이 종료된다. 하지만 (예: 테스트 내, 혹은 수동으로 런타임을 만든 경우처럼) 다른 때도 종료될 수 있다. 특히 패닉으로 언와인드되면서 비정상적인 시점에 런타임이 종료될 수 있다.

그 결과:

  1. 태스크를 만들자마자 JoinHandle을 drop했다고 하자. 그러면 그 태스크가 영원히 실행될 것이라 기대할 수 있다. 하지만 그렇지 않다: 런타임이 패닉으로 종료되면 태스크는 취소된다. 이 태스크와 상호작용하는 코드는 태스크가 취소된 경우를 처리해야 한다.

  2. 동일한 런타임에서 태스크 A와 B가 실행 중이고 런타임이 종료되면, 두 태스크는 임의의 순서로 취소된다. 이는 비결정적 패닉으로 이어질 수 있다. 예시는 Dropshot 이슈 #709를 보라.

비동기 API 작성 시의 취소 안전성

비동기 API를 작성하거나 future를 반환한다면, 사용자가 취소 안전성을 어떻게 다룰지 생각해야 한다. 일반 전략은 두 가지 접근을 조합하는 것이다:

  1. 가능한 한 코드를 취소-안전하게 만들기.

  2. 나머지 부분은 취소-비안전함을 명확히 표기하기.

코드를 취소-안전하게 만들기

안타깝게도 코드를 취소-안전하게 만드는 만능 열쇠는 없다. 이 절은 작성자가 사용할 수 있고 리뷰어가 찾아볼 수 있는 여러 설계와 사례를 나열한다. 대다수 취소 안전성 이슈가 이 패턴들로 해결되기를 기대한다.

복잡한 연산을 분해하기

[select_loops]에서 Tokio의 mpsc::Sender::send가 취소-안전하지 않아 데이터 유실을 일으킬 수 있음을 보았다.

이에 대한 해법은 Sender::send의 소스 코드를 보면 알 수 있다. Sender는 값을 보내기 위해 사용할 수 있는 퍼밋을 반환하는 reserve 메서드도 제공한다. reserve는 대부분 취소-안전하다: drop되면 줄에서 순서를 잃지만, 그 외에는 안전하다.

예: 수정된 spawn_send_task

[select_loops]에서 버그가 있다고 지적한 spawn_send_task 예제를 보았다. Sender::reserve를 사용하면 이 코드를 올바르게 만들 수 있다.

이 버전은 조금 더 복잡하다. 요지는 다음과 같다:

코드 예시

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) { let strings: Vec<String> = vec![ "foo".to_owned(), "bar".to_owned(), "baz".to_owned(), ]; tokio::task::spawn(async move { // interval.tick() completes execution at 0s, 1s, 2s... let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut strings = strings.into_iter(); // std::vec::IntoIter implements ExactSizeIterator, which has a len() // method which tells us if the iterator is non-empty. If your // iterator doesn't implement ExactSizeIterator, you can instead // use iterator.peekable(), and peek() to see if any values remain. while strings.len() > 0 { tokio::select! { permit = sender.reserve() => { match permit { Ok(permit) => { let value = strings.next().unwrap(); // send() is synchronous because the async part // (waiting for a slot to be available) has // already been completed by the time the permit is // acquired. permit.send(value); } Err(_) => { println!("receiver closed"); break; } } } _ = interval.tick() => { println!("interval tick"); } } } });}

또한, 줄에서 순서를 잃지 않게 하면서도 이 문제를 해결하는 방법이 있다. 이는 [spawn_send_task_3]에서 다룬다.

reserve 및 유사 패턴

mpsc::Sender::reserve는 흔한 패턴의 한 예다. 일반적인 비동기 연산 순서는 다음과 같다:

  1. 최소한 대부분 취소-안전한 초기 비동기 연산.

  2. 동기 연산 및/또는 그 뒤를 잇는(하지만 취소-안전하지 않은) 비동기 연산[4].

Sender::send 같은 편의 메서드는 두 단계를 결합한다. 이런 API는 때로 사용하기 쉽다. 그러나, [select_loops]에서 본 것처럼 이런 편의 API는 사용자가 무심코 버그를 도입하게 만들 수 있다.

이런 API를 작성 중이라면, Sender::send 같은 편의 메서드를 제공하지 않고, 대신 문서에서 취소 안전성을 언급하는 것을 고려하라. 처음엔 놀라울 수 있다(“send도 못 하는 sender?”). 하지만 편의 메서드를 제공하지 않으면 사용자들이 올바른 코드를 작성하도록 훈련하기 쉬워진다. 좋은 생각인지 여부는 상황에 따라 다르다.

이제 복잡한 연산을 실제로 어떻게 분해하는지 보여 주는 사례 연구를 살펴보자.

사례 연구: Oxide 시리얼 콘솔

Oxide 컨트롤 플레인은 웹/CLI 클라이언트를 위한 시리얼 콘솔을 제공한다. Nexus에는 Propolis와 클라이언트 사이에서 데이터를 양방향 프록시하는 proxy_instance_serial_ws 함수가 있다. omicron#3356에서, proxy_instance_serial_wstokio::select!를 취소-비안전하게 사용하고 있음이 드러났다.

구체적으로, 이 함수는 네 개의 서로 다른 future를 대상으로 select했다:

  1. StreamExt::next future

  2. InstanceSerialConsoleHelper::recv이 반환하는 future

  3. SinkExt::send 인스턴스

  4. InstanceSerialConsoleHelper::sendSinkExt::send의 래퍼

이 네 future 중 취소-안전한 것은 1번뿐이었다.

  • 2번은, 스트림에서 값을 읽어온 뒤 중간에 추가 await로 처리했기 때문에 취소-안전하지 않았다. 중간에 취소되면, 스트림에서 값을 꺼냈지만 처리하지 못해 유실되는 문제가 생길 수 있다.

  • 3번은, mpsc::sender::Send가 취소-안전하지 않은 것과 같은 이유로 SinkExt::send가 취소-안전하지 않았다.

  • 4번은 3번의 래퍼였기 때문에 취소-안전하지 않았다.

참고

StreamExt::next취소-안전함이 문서에 명시되어 있지 않다. 그 취소 안전성은 내부 Stream의 동작에 달려 있다. 그러나, 값을 생성하는 동안의 중단을 다루지 못하는 Stream은 명백히 버그일 것이다. 어쨌든 Stream의 목적 자체가, 바로 다음 아이템이 즉시 가용하지 않을 수 있는 상황에서 값을 순회하는 것이기 때문이다[5].

SinkExt::send에 관해서는, 내부 Sink가 아무리 잘 작성되었더라도 연산 자체가 취소-안전하지 않다. 즉, 취소 비안전성은 send 연산에 내재한다.

이 문제들은 두 가지 방식으로 복잡한 연산을 분해하여 개별적으로 해결했다.

InstanceSerialConsoleHelper::recv를 취소-안전하게 만들기

설명: 복잡한 연산 분해

  1. 메서드는 먼저 StreamExt::next로 입력 Stream에서 다음 메시지를 가져왔다.

  2. 그 다음, recv는 가져온 메시지를 처리했다.

    • 대부분의 경우, 메시지를 그대로 호출자에게 반환했다.

    • 다만, 메시지가 “다른 서버로 마이그레이션하라”는 인밴드 제어 지시일 경우, recv는 실제로 재연결(추가로 두 번의 await)을 수행하고, 이후 메시지를 반환했다.

1단계는 StreamExt::next에 불과하므로 취소-안전하다. 반면 2단계의 마이그레이션은 취소-안전하지 않다. recv future가 중간에 취소되면, 진행 중인 마이그레이션이 완료되지 않을 수 있고 메시지가 유실될 수 있기 때문이다. 이 취소 안전성 문제를 다음과 같이 해결했다:

  • recv를 분리하여 1단계와 2단계를 나눴다. 1단계는 recv()에서 수행하게 하고, 메시지가 아니라 “future”를 반환하도록 했다.

  • recv가 반환하는 future(이름: InstanceSerialConsoleMessage)가 2단계를 수행하고 메시지를 반환하도록 했다[6].

  • 1단계가 반환하는 future는 취소-안전하지 않으니 시스템 정합성을 위해 반드시 await해야 한다고 문서화했다. (개선으로, InstanceSerialConsoleHelper에 플래그를 둬 이를 명시적으로 확인할 수 있지만, 메시지 사용의 자연스러운 흐름으로도 정합성이 보장되어 지금까지는 필요하지 않았다.)

도우미를 사용하는 코드는 다음과 같다:

let mut helper: InstanceSerialConsoleHelper = /* ... */;tokio::select! { res = helper.recv() => { let message = res?.await?; // operate on message } // ... other select branches}

부모 함수가 취소된다면?

[overview]에서 다뤘듯이, 취소는 부모 future에서 자식 future로 전파된다. 자연히 떠오르는 질문: let message = res?.await?;에 있는 await 지점에서 부모 함수가 취소되면 어떻게 되는가?

이에 대한 답은, 부모 함수가 클라이언트/서버 스트림의 소유 및 책임을 함께 진다는 점에 있다. 부모 함수가 취소되면 스트림은 어차피 닫힐 것이므로, 무효가 되는 상태는 곧 파기될 운명이다.

대안적 해법

다음 해법들은 recv 전체를 취소-안전하게 만드는 것을 목표로 하며, recv를 취소-안전/비안전 부분으로 나누지 않는다. 신중한 상태 관리로 가능하다.

하나는 백그라운드 태스크를 생성하여 마이그레이션을 수행하는 것이다. propolis#438은 이 접근을 구현한다.

다른 하나는, self에 진행 중인 마이그레이션 메시지를 저장하여 부분 진행을 재개하는 것이다:

  • 마이그레이션 메시지를 받으면, 그것을 처리하기 전에 self의 필드(예: self.migration_message, 타입은 Option<T>)에 복사본을 저장한다.

  • 마이그레이션을 완료하면 self.migration_messageNone으로 설정한다.

  • recv() 시작 시, self.migration_messageSome이면, 입력 Stream에서 읽는 대신 self.migration_message를 처리한다.

둘 다 괜찮은 해결책이었겠지만, 도메인 특화 이유로 선택하지 않았다.

SinkExt::send를 취소-안전하게 만들기

설명: reserve 패턴 사용

SinkExt::send 메서드는 다음 세 동작을 순서대로 수행한다:

  1. Sink::poll_ready를 완료될 때까지 호출한다.

  2. (동기인) Sink::start_send를 호출한다.

  3. 성공하면, Sink::poll_flush를 완료될 때까지 호출한다.

이는 mpsc::Sender::send와 같은 이유로 취소-안전하지 않다: 값이 전송되지 못하고 유실될 수 있다. 하지만 여기서도 reserve 기반 해법이 mpsc::Sender::send 때처럼 잘 동작한다!

업스트림 futures 크레이트는 reserve 패턴 API를 제공하지 않아서, 우리는 직접 작성했다(문서). 이 API는 위 1단계가 완료되었음을 나타내는 Permit을 반환한다. 이 Permit을 사용해 2·3단계를 수행하며 메시지를 보낸다.

use cancel_safe_futures::prelude::*;let mut sink = /* ... */;tokio::select! { res = sink.reserve() => { let permit = res?; res.send(value).await?; // operate on message } // ... other select branches}

Permit은 sink에 대한 가변 참조를 붙잡고 있으므로, 퍼밋이 활성인 동안에는 그 sink를 다른 용도로 사용할 수 없다[7].

InstanceSerialConsoleHelper::send를 취소-안전하게 만들기

InstanceSerialConsoleHelper::send는 아무 작업도 추가하지 않는 SinkExt::send의 래퍼였다. 이 메서드의 취소 안전성 문제를 해결하기 위해, 이를 제거하고 InstanceSerialConsoleHelperSink를 구현하도록 했다. 그러면 위에서 구현한 reserve 메서드를 이 select 분기에서도 사용할 수 있다.

부분 진행 상태에서 재개하기

여러 비동기 단계를 연달아 수행하는 async 함수는, select! 루프에서 취소 이슈에 견고하도록 “부분 진행 상태”를 저장해 두고 재개하도록 만들 수 있다.

핵심 아이디어는, 일부 진행이 일어났다는 사실을 다음 중 한 방식으로 저장하는 것이다:

  • 내부 필드에 저장하거나,

  • 외부의 &mut 매개변수를 통해 저장한다.

그 다음 함수가 다시 호출될 때 그 지점부터 재개한다.

부분 진행을 내부적으로 저장하는 예시는 [serial_console_recv]의 대안 해법을 보라.

외부에서 부분 상태를 추적하는 예시는 아래 [case_study_write_all_buf]를 보라.

사례 연구: AsyncWriteExt::write_all_buf

동기 코드에서, 표준 라이브러리의 std::io::Write에는 버퍼 전체를 라이터에 쓰는 write_all 메서드가 있다. 이 메서드는 단순한 경우에는 잘 작동하지만, 부분 진행 상황이 보고되지 않기 때문에 오류에서 복구하거나—얼마나 썼는지조차—알 수 없다. 이 때문에 많은 사용자가 직접 write_all을 구현해야 하고, 이는 실수하기 쉽다[8].

이 메서드를 비동기 코드로 옮기면, 사용자는 오류뿐 아니라 select!의 다른 분기가 발화하여 쓰기 작업이 중단되는 상황도 고려해야 한다. 이 문제는 Tokio의 AsyncWriteExt가 영리한 API 설계로 해결했다.

먼저, 부분 진행을 기록하기 위해 bytes::Buf라는 트레이트를 설계했다. 이 트레이트는 읽기 전용 바이트 버퍼를 추상적으로 표현하며, 세 가지 중요한 메서드가 있다:

  1. fn remaining(&self) → usize: 남은 바이트 수를 보고한다. (has_remaining(&self) → bool도 있다.)

  2. fn chunk(&self) → &[u8]: 다음 바이트 청크를 반환한다.

  3. fn advance(&mut self, cnt: usize): 버퍼를 특정 바이트 수만큼 전진시킨다.

Cursor는 어떤 연속 메모리 버퍼 위에서도 bytes::Buf를 구현한다[9].

그 다음, AsyncWriteExt::write_all_buf&mut B where B: bytes::Buf를 받는다. 이 메서드는 성공적으로 기록된 바이트 수만큼 advance를 호출하여 Buf 구현자에게 부분 진행을 보고한다.

이 말은, select! 루프 안에서 write_all_buf를 호출하는 코드는 올바르다는 뜻이다:

async fn write(writer: &mut W, data: &[u8]) -> std::io::Result<()>where W: AsyncWrite + Unpin,{ // Cursor<&[u8]> implements the bytes::Buf trait. let mut cursor = Cursor::new(data); while cursor.has_remaining() { tokio::select! { res = writer.write_all_buf(&mut cursor) => { res?; } // ... some other branch } } Ok(())}

협력적(명시적) 취소 사용

[task_aborts]에서 보았듯이, Tokio 태스크 중단은 태스크가 취소-비안전 연산 중간에서도 취소될 수 있어 문제가 된다. 하지만 도메인 요구로 태스크를 취소해야 할 때가 자주 있다. 이를 해결하는 방법 중 하나는 협력적(c ooperative), 즉 명시적 취소 채널을 사용하는 것이다.

Oxide가 유지하는 cancel-safe-futures 라이브러리에는 다수의 취소자와 하나의 수신자(팬인) 모델로 협력적 취소를 구현한 coop_cancel 모듈이 있다.

자세한 내용과 예시는 coop_cancel 문서를 보라.

tokio::sync::Mutex 지양

Tokio는 비동기 문맥을 위해 특별히 설계된 Mutex를 제공한다. 공식 권장사항은 await를 넘기지 않는 잠금에는 std::sync::Mutex를 선호하고, await를 넘기는 경우에만 tokio::sync::Mutex를 쓰라는 것이다.

하지만 다음 사실을 고려하면:

  • 거의 모든 mutex 사용은 임계 구역 내에서 일시적으로 코드 불변식을 깨뜨리되, 연산 종료 시 복원하기 위함이다.

  • mutex는 거의 항상 future들 사이에서 공유된다(그렇지 않다면 mutex를 쓸 이유가 거의 없다). 즉, mutex를 들고 있는 future가 취소될 때의 “취소 폭발 반경”은 다른 future들까지 확장된다.

  • std::sync::Mutex는 임계 구역 내에서 패닉이 발생하면 락 포이즈닝을 제공한다. Tokio의 mutex에는 취소나 패닉에 대한 포이즈닝이 없다.

결론적으로, mutex를 보유한 future가 취소되면 mutex가 보호하는 상태는 유효하지 않게 남을 가능성이 높다. 이는 심각한 문제이므로, 이 가이드는 tokio::sync::Mutex 사용을 피할 것을 권장한다.

Tokio mutex 대안](https://rfd.shared.oxide.computer/rfd/400#tokio_mutex_alternatives)

Tokio mutex의 권장 대안은 메시지 패싱 설계(액터 모델)다. 메시지 패싱 설계는 이 Tokio 튜토리얼에 설명되어 있다. 공유 상태에 여러 future가 접근하는 대신, 단일 관리자 태스크가 그 상태를 단독(비공유) 소유한다. 이 태스크는 MPSC 채널로 순차적으로 요청을 받고, (보통) oneshot 채널로 응답을 보낸다.

또 다른 선택은 std::sync::Mutex로 바꾸고, await 지점을 넘기지 않도록 잠금을 유지하지 않는 것이다. 다만, 잠금을 기다리는 동안 워커 스레드가 완전히 점유된다는 점에 유의하라. 잠금 경합이 없고 짧은 시간만 잠금이 유지된다면 문제되지 않는다.

정말 Tokio mutex를 써야 한다면

Tokio mutex를 다루는 올바른 코드를 작성하는 것이 “불가능”한 것은 아니지만, 매우 어렵다. 취할 수 있는 접근은:

  1. 코드가 취소-안전하지 않음을 명시한다.

  2. await 지점 사이마다 불변식이 항상 복원되도록 보장한다. await 사이에 상태를 정상으로 되돌린다면 괜찮다[10].

  3. 상태가 일시적으로 무효가 되도록 허용하되, 임계 구역(즉, 잠금을 획득할 때)마다 시작 시 정리한다.

이 모든 선택은 신중한 분석이 필요하다.

취소-비안전 연산을 수행할 백그라운드 태스크 생성

“future”는 소유자 외부에 존재하지 않지만, “태스크”는 실행기(일반적으로 Tokio)가 소유한다. 즉, 어떤 상황에서는 백그라운드 태스크를 사용해 취소-비안전 future를 감싸는 취소-안전 인터페이스를 만들 수 있다.

예를 들어, 취소-비안전한 일을 하는 next 메서드를 생각해 보자:

struct MyHandler { inner: /* some stream of data */,}impl MyHandler { // This is cancel-unsafe because message will be dropped if // the future is cancelled in the middle of the operation. async fn next(&mut self) -> MyMessage { let message = self.inner.next().await; process_message(&message).await; message }}

[marking_unsafe]에서 설명했듯이, next처럼 이름 붙은 메서드는 취소-안전해야 한다. 이를 달성하는 한 방법은 메시지를 처리할 백그라운드 태스크를 띄우는 것이다:

코드 예시

struct MyHandler { inner: /* some stream of data */ join_handle: Option<MyJoinHandle>,}impl MyHandler { async fn next(&mut self) -> MyMessage { // If an existing background task exists, wait for // that to complete. if let Some(join_handle) = &mut self.join_handle { let message = join_handle.await_completion().await; self.join_handle = None; return message; } let message = self.inner.next().await; let join_handle = MyJoinHandle::new(message); // Set self.join_handle to Some before the next await point. // This enables resumption if this future gets cancelled. self.join_handle = Some(join_handle); let join_handle = self.join_handle.as_mut().unwrap(); message = join_handle.await_completion().await; self.join_handle = None; message }}struct MyJoinHandle { handle: JoinHandle<MyMessage>,}impl MyJoinHandle { fn new(message: MyMessage) -> Self { let handle = tokio::task::spawn(async move { process_message(&message).await; message }); Self { handle } } async fn await_completion(&mut self) -> MyMessage { // A simple self.handle.awaitresults in "cannot move out of //self.handlewhich is behind a mutable reference". Need to // use&mut self.handleexplicitly to guide the type checker. let handle = &mut self.handle; // Can also return an error rather than expecting. handle.await.expect("task panicked") // After this method completes, the handle should never be // polled again. If it is polled again, then it will panic. // That is managed inMyHandle::nextby settingjoin_handle // to None immediately after returning from this method. }}

이 접근은 잘 동작하지만 단점도 있다:

  1. 조심스레 조인 핸들을 설정하는 추가 의식이 필요하다. 실수하기 쉽다. (이를 쉽게 만드는 추상화가 있을까?)

  2. 어떤 상황에서는 실제로 취소가 필요하다면, 태스크를 수동으로 취소해야 한다(중단(abort)하거나 명시적 취소 채널을 사용).

  3. tokio::task::spawn'static 바운드를 요구하므로, self나 그 외에서 데이터를 빌릴 수 없다. 이것이 괜찮은지, 불편한지, 혹은 치명적인지는 상황에 따라 다르다[11].

MPSC 채널의 대체 모드 사용

Tokio는 유한 버퍼(bound) MPSC 채널을 제공한다. 유한 채널은 시스템에 역압(backpressure)을 도입하므로 좋은 기본값이지만, 채널이 가득 차면 send 메서드가 비동기(대기)라는 문제가 따라온다.

여러 실제 사례에서, 시스템 일부에서 “유일한” 비동기성의 원인이 유한 MPSC 채널인 것으로 관찰되었다. 이 비동기성의 원인을 제거하면, 해당 부분의 설계가 훨씬 단순해질 수 있다.

다음과 같은 동기 채널 모드들을 고려하라:

  1. 유한 채널의 try_send: 유한 채널의 Sender는 채널이 가득 차면 에러를 반환하는 동기 try_send를 제공한다. 이 메서드로 역압을 클라이언트로 외부화할 수 있다. 예를 들어, 채널이 가득 찼을 때 HTTP 429 Too Many Requests를 반환한다.

  2. watch 채널: 채널에 보낸 “마지막 값”만 중요하다면 Tokio watch 채널을 고려하라. watch 채널은 단일 생산자, 다중 소비자 채널로 하나의 값만 저장하고, 새 값이 오면 기존 값을 폐기한다. 역압 문제가 사라진다.

  3. 무한(unbounded) 채널: 권장되지는 않지만 사용할 수 있는 선택지로 무한 MPSC 채널이 있다. 무한 채널에는 용량 제한이 없고, send가 동기다. 그러나 무한 채널 사용에는 큰 단점이 있다. 역압이 없다는 점이다. 그 결과, 무한 채널이 비워지지 않으면 메모리 사용량이 폭증할 수 있다. 이 우려와 취소 안전성 이슈의 절충은 상황에 따라 다르다.

실제 사례 연구는 [case_study_wicketd]를 보라.

사례 연구: wicketd의 installinator 진행 상황 추적기

Omicron에서 테크니션 포트에서 실행되는 서비스는 wicketd다. 이 서비스는 복구 및 오프라인 업데이트를 관리하며, 그 일환으로 다른 컴포넌트(installinator)가 보내는 진행 보고를 기록한다 [rfd345].

과거에, 리포트 트래커의 report_progress 메서드는 유한 채널로 보고를 보냈다. 그 결과:

report_progress는 취소-안전하지 않았다. 보고가 중단되면 업데이트가 Invalid 상태에 걸릴 수 있었기 때문이다. (이는 tokio::Mutex 사용을 피하라는 권장으로 이어진 바로 그 문제다.)

해결책

Omicron PR #3950에서 watch 채널을 사용하도록 바꾸었다. 진행 보고가 누적적이며, 트래커에 중요한 것은 “마지막 진행 상태”라는 점을 활용했다.

이는 일련의 긍정적 파급 효과를 낳았다:

이전에는…​

Omicron #3579에서, 먼저 이 코드를 취소-안전하게 만들기 위한 몇 가지시도를 했다. 그러나 둘 다 취소와 관련한 미묘한 결함이 있었다. 결국 무한 채널로 전환하는 결론에 도달했다.

당시에는 취소 안전성의 이점이 역압 문제를 능가했지만, 나중에는 watch 채널이 역압 관련 문제 없이 동일한 이점을 제공함을 깨달았다.

또 다른 대안은 이 코드를 메시지 패싱 스타일로 다시 작성하는 것이었지만, 이 접근이 더 신속하게 취소 안전성을 달성하는 길이었다.

정리 작업을 별도로 수행

데이터베이스 커넥션 풀에서, 데이터베이스 트랜잭션 컨텍스트 안에서 future를 실행하는 메서드를 생각해 보자:

use std::future::Future;struct TransactionContext { // ...}struct ConnectionPool { // ...}impl ConnectionPool { fn execute_transaction<F, Fut>(future_fn: F) -> Result<T, E> where F: FnOnce(TransactionContext) -> Fut, Fut: Future<Output = Result<R, E>>, { let cx: TransactionContext = /* ... */; // ... begin transaction match future_fn(cx).await { Ok(value) => { // ... commit transaction Ok(value) } Err(error) => { // ... rollback transaction Err(error) } } }}

execute_transaction이 실행 중간에 취소되면, 트랜잭션 컨텍스트가 나타내는 데이터베이스 커넥션이 불일치 상태로 남을 수 있다. 예를 들어, 트랜잭션이 시작되었지만 끝나지 않은 상태일 수 있다.

취소 정합성을 위해, ConnectionPool은 커넥션을 풀로 되돌릴 때 또는 새 커넥션을 할당할 때 정리를 수행하는 것이 중요하다. 예컨대, 커넥션이 트랜잭션 도중이라면 롤백해야 한다.

API를 취소-비안전으로 표시

많은 async 함수는 쉽게 바꾸기 어려운 취소-비안전일 가능성이 높다. 취소-안전하지 않은 API를 소비자가 주의해 사용하도록 어떻게 표기할까? 명명과 문서를 병행하면 된다.

이름과 함수 시그니처

취소-비안전 API는 select!에서 사용하는 것이 “자연스럽지 않게 느껴지도록” 이름과 시그니처를 잡아야 한다. 이런 기호론(semiotics)을 잘 맞추는 일은 경험과 판단의 영역이지만, 일반 지침은 다음과 같다:

  • 취소-비안전 메서드에 next(), recv() 같은 이름을 붙이지 말라. 이 이름들은 select! 루프에서 쓰기 자연스럽다.

  • reserve(), acquire()라는 이름은 대부분 취소-안전한 경우에만 쓰라. 이는 유사한 Tokio 메서드들과의 패턴 매칭을 유도한다.

  • “되돌릴 수 없는, 반복해선 안 되는 행동”을 암시하는 이름을 쓰라. 예를 들어, self.http_post_data() 같은 이름은 HTTP POST를 연상시키며, select! 루프에서 재생성해서는 안 될 동작임을 분명히 한다.

  • 가능하다면 타입 시스템을 활용하라. 예를 들어 fn execute(self)처럼 self가 복제 불가능하다면, select! 루프에서 사용할 수 없다. 대신 사용자는 그 메서드가 반환하는 future를 select 루프 밖에서 만들어야 한다.

    • 단, execute(self) 같은 메서드는 select! 루프에서는 쓸 수 없지만, 한 번만 호출하고 취소(예: 타임아웃으로)하는 것은 가능하다. 종종 그 경우 전체 연산을 포기하는 것이므로 괜찮다.

문서화

API의 취소 안전성을 합당한 수준으로 문서화하면, 사용자가 future가 취소될 때 어떤 문제가 생길 수 있는지 이해하는 데 도움이 된다. 각 메서드에 “취소 안전성” 섹션을 두는 것을 권장한다.

모든 async API에 대해 취소 안전성을 문서화하는 일은 벅찰 수 있지만, 최소한 가장 흔히 쓰이는 메서드에 대해서는 해 두는 것이 좋다.

비동기 코드를 소비할 때의 취소 안전성

이름, 시그니처, 문서에 주의

API를 취소-비안전으로 표시의 반대편에서, 라이브러리 작성자가 전달하려는 신호를 주의 깊게 살펴보는 것이 중요하다. 예를 들어, 메서드가 HTTP POST를 수행하거나 이름이 execute()라면, select! 루프에서 사용할 수 없을 가능성이 높다.

“문서를 읽어라”는 답은 여러 이유로 만족스럽지 않지만, 많은 경우 우리가 가진 최선의 수단이다.

select! 루프에서 future를 재생성하지 말고 재개하기

취소-비안전 future를 select! 루프에서 사용한다면, 각 반복에서 future를 “다시 만들면” 안 된다. 대신 select! 루프 밖에서 그 future를 “한 번만” 만들고, 루프 안에서는 그 future에 대한 &mut 참조를 대상으로 select!하라. 즉, future를 처음부터 다시 만드는 대신 “재개(resume)”하라.

다음은 주기적 틱과 함께 HTTP 요청을 수행하는 예다.

코드 예시

use hyper::{client::HttpConnector, Body, Request, Response};use std::time::Duration;struct MyHttpClient { client: hyper::Client<HttpConnector>,}impl MyHttpClient { async fn request_with_ticks( &self, req: Request<Body>, ) -> Result<Response<Body>, hyper::Error> { let mut interval = tokio::time::interval(Duration::from_millis(100)); let mut response_fut = std::pin::pin!(self.client.request(req)); loop { tokio::select! { response = &mut response_fut => { // The initial response has arrived. Exit the loop // and return the response. return response; } _ = interval.tick() => { println!("interval tick"); } } } }}

예: spawn_send_task 재등장

[spawn_send_task_2]에서, tokio::mpsc::Sender::reserve를 사용해 Sender::send의 데이터 유실 문제를 해결하는 방법을 보았다. 하지만 Sender::reserve가 반환하는 future를 drop하면 줄에서 순서를 잃는다는 점에서 여전히 공정성 문제가 남는다고 했다.

이번 예에서는 future를 재생성하는 대신 “재개”하도록 spawn_send_task를 수정한다. 이렇게 하면 future가 취소되더라도 줄에서 순서를 잃지 않는다.

이 예제에서는 루프 중간에 reserve_fut의 값을 교체하므로 std::pin::pin!을 쓸 수 없다. 대신 Box::pin을 호출해, sender.reserve()가 반환하는 future를 힙으로 이동하고 pin한다.

코드 예시

fn spawn_send_task(sender: tokio::sync::mpsc::Sender<String>) { let strings: Vec<String> = vec![ "foo".to_owned(), "bar".to_owned(), "baz".to_owned(), ]; tokio::task::spawn(async move { // interval.tick() completes execution at 0s, 1s, 2s... let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut strings = strings.into_iter(); let mut reserve_fut = if strings.len() > 0 { // Create a future to obtain a permit to send the first value. Box::pin(sender.reserve()) } else { // No values to send -- exit the loop. return; }; loop { tokio::select! { res = &mut reserve_fut => { if res.is_err() { println!("receiver closed"); break; } match res { Ok(permit) => { permit.send(strings.next().unwrap()); reserve_fut = if strings.len() > 0 { // Create a future to send the next value. Box::pin(sender.reserve()) } else { // All values sent -- exit the loop. break; } } Err(_) => { println!("receiver closed"); break; } } } _ = interval.tick() => { println!("interval tick"); } } } });}

이 코드는 작성하기 다소 번거롭고, 연산이 충분히 복잡하면 아예 이런 방식으로 표현이 불가능할 때도 있다.

연습: spawn_send_taskBox::pin(sender.reserve()) 대신 Box::pin(sender.send(value)) future를 만들도록 다시 작성해 보라.

  1. send()의 장단점은 무엇인가? spawn_send_task가 해결하려는 문제 설정하에서, send()로 여전히 데이터 유실이 가능할까?

  2. 여전히 reserve()send()보다 더 일반적이다. 문제 설정을 어떻게 바꾸면, “재개”하더라도 send()에서는(즉, send()만으로는) 데이터 유실이 발생하지만 reserve()에서는 발생하지 않는 상황을 만들 수 있을까? (힌트: write_all_buf와 유사한 시나리오를 구성하라.)

then_try 어댑터 사용

[try_joins]에서 논의했듯이, try_ 어댑터 사용은 조기 취소로 인해 취소되면 안 되는 future들이 취소되는 문제가 있다.

[try_joins]의 예제는 다음처럼 고쳐 쓰면 올바르다:

let (result1, result2) = tokio::join!(write1.flush(), write2.flush());result1?;result2?;

tokio::join!Result 타입을 모른다. 따라서 조기 취소를 수행하지 않는다. 그러나 복잡한 try-join이나 다른 어댑터들에 대해서는 다소 번잡하다.

이 문제를 해결하기 위해, Oxide의 cancel-safe-futures 라이브러리Result 타입을 인지하지만 조기 취소를 하지 않는 then_try 어댑터들을 제공한다.

예를 들어, join_then_try 매크로는 먼저 전달된 모든 future를 끝까지 실행한 다음, 그중 하나라도 실패했다면 에러를 반환한다[13]

예:

let result = cancel_safe_futures::join_then_try!( write1.flush(), write2.flush(),);result?;

이는 위의 tokio::join! 예와 정확히 동일하게 동작한다.

참고

cancel-safe-futures 라이브러리는 아직 완전하지 않다. then_try 어댑터들은 필요에 따라 추가되고 있다. 필요한 어댑터가 없다면, 자유롭게 추가해서 PR을 보내 달라!

사례 연구: sled-agent 존 삭제

이 코드는 미묘하게 잘못되어 있었다. 한 존 삭제가 실패하면 다른 모든 존 삭제 작업이 취소되어, 프로세스 외부의 리소스인 존들이 불일치 상태로 남을 수 있었기 때문이다.

Omicron PR #3758에서, 코드는 cancel-safe-futures가 제공하는 for_each_concurrent_then_try 어댑터를 사용하도록 변경되었다. 이 어댑터를 사용하면, 이제 모든 존 삭제를 가능한 한 멀리까지 진행하려 시도한다.

백그라운드 태스크 사용

[spawn_tasks]에서 설명했듯이, 태스크는 그것을 생성한 코드가 아니라 실행기가 소유한다. 거기서 소개한 전략을 애플리케이션 코드에도 적용해, 취소-비안전 future를 백그라운드 태스크로 옮길 수 있다.

취소 안전성의 실제 문제를 백그라운드 태스크로 해결한 예시는 [case_study_dropshot]을 보라.

태스크 중단 피하기

[task_aborts]에서 개괄했듯이, 임의의 await 지점에서 취소-안전한 코드를 작성하기는 매우 어렵다. 태스크 중단을 피하고, 필요한 경우 명시적 취소 채널을 선호하라.

사례 연구: Dropshot과 클라이언트 연결 끊김

Dropshot은 Omicron 전반에서 사용되는 REST HTTP 서버다. Dropshot은 비동기 스타일로 작성되었으며, 사용자는 비동기 요청 핸들러를 등록하고 Dropshot은 이를 실행한다.

클라이언트가 HTTP 요청을 시작한 뒤 중간에 연결을 끊으면 어떻게 될까? 이전에는 연결 해제 시 해당 future가 취소되었다. Omicron의 많은 코드가(Tokio mutex 사용 등으로) 취소 안전성을 염두에 두지 않고 작성되었기 때문에, 이는 많은 잠재적 취소 안전성 문제를 낳았다. 더 나쁜 점은, 이러한 문제를 “클라이언트가 유발”할 수 있었다는 것이다.

취소-비안전한 Omicron 코드를 체계적으로 다루기 위해, Dropshot을 PR #701#702에서 다음 모드를 추가하도록 변경했다:

  1. 각 요청을 자체 태스크에서 실행한다.

  2. 클라이언트가 연결을 끊어도 해당 태스크를 더 이상 중단(abort)하지 않는다. 대신 백그라운드에서 계속 실행한다.

  3. 서버가 종료될 때, 보류 중인 태스크는 완료될 때까지 실행한다.

새 모드가 문제를 일으키지 않음을 확인한 뒤, Dropshot의 기본값으로 만들었다. 이제 select! 루프 등에서의 취소는 여전히 고려 사항이지만, 클라이언트 주도의 취소는 더 이상 불가능하다.

향후 과제

업스트림과 협업해 취소 안전성 명확화

이 문서가 비동기 취소의 위험을 보여 주는 임무를 잘 수행했기를 바란다. 실천 과제로, 업스트림과 협업하여 그들의 API에 취소 안전성을 명확히 하고, 필요 시 문제를 수정해야 한다.

체계적 해결책

이 RFD는 주로 취소 정합성에 대한 임기응변적 해결책에 초점을 맞췄다. Rust 1.89 시점에서, 언어는 아직 취소 정합성을 더 체계적으로 다룰 수 있을 만큼 준비되어 있지 않다. 이를 해결하기 위해 Rust 커뮤니티 구성원들이 여러 제안을 내놓았다. 그중 몇 가지는:

  • 비동기 drop: drop 시 일부 비동기 코드를 실행하는 future([async_drop]).

  • 잊혀질 수 없는(unforgettable) 타입: std::mem::forget 같은 함수를 호출할 수 없는 future로, 빌린 데이터에 대한 댕글링 참조가 존재하지 않음을 보장.

  • drop할 수 없는(undroppable) 타입: 취소 불가능으로 표시된 future로, 끝까지 실행됨을 보장.

이 제안들이 위에서 논의한 문제들을 어떻게 해결할 수 있는지에 대한 논의는 [asynchronous_cleanup]을 보라.