Rust의 비동기 스트림을 for await 스타일로 순회할 때 futures의 buffered/Futures[Un]Ordered가 만들어내는 '버퍼드 스트림 문제'를 사례로 설명하고, 스폰, for_each 기반 내부 반복 등 대안과 더 신뢰할 수 있는 비동기 프리미티브 설계 방향을 논의한다.
지난주, Niko Matsakis가 for await과 우리가 “버퍼드 스트림 문제(buffered streams problem)”라고 부르게 된 것 사이의 연결고리를 내게 지적했다. 이는 썩 편치 않았는데, 몇 년 뒤 내가 바라는 Async Rust의 모습에 관한 다른 블로그 글을 쓰던 중이었고, 그 지적이 내 몇 가지 가정을 흔들었기 때문이다.
이 글에서의 for await은 오늘날 비동기 스트림을 순회하는 가장 흔한 방식을 가리키는 약칭일 뿐이다. 스트림을 폴링해 아이템을 하나씩 꺼내고, 그때마다 for 루프의 본문을 실행한다. 아래 두 루프는 동등하다:
for await item in stream { /* process item */ }
while let Some(item) = stream.next().await { /* process item */ }
알고 보니, 이는 futures 크레이트에 정의된 다른 스트림 프리미티브의 몇 가지 성질과 나쁘게 상호작용할 수 있다.
문제를 보려면, 간단한 async 코드로 시작해 보자. 데이터베이스에 작업 아이템을 질의하고, 각 아이템에 대해 추가 질의를 한 뒤, 결과를 업로드하는 단순 배치 작업을 작성한다고 가정하자.
async fn batch_job(db: &Database) {
let work = run_query(db, FIND_WORK_QUERY).await;
for item in work {
let result = run_query(db, work_query(item)).await;
upload_result(result).await;
}
}
async fn run_query(db: &Database, query: Query) -> Vec<Row> {
let conn = db.connect().await;
conn.select_query(query).await
}
어느 순간 이 잡이 개별 데이터베이스 쿼리를 기다리느라 시간을 많이 쓰고 있음을 알게 된다. 하지만 데이터베이스는 쿼리를 병렬로 처리해 더 높은 부하를 감당할 수 있다.
buffered 콤비네이터를 써서 일부를 동시에 발행해 보기로 한다. 이는 퓨처들의 스트림을 받아 한 번에 최대 N개를 실행하고, 그 결과의 스트림을 반환한다. upload_result는 용량이 더 적은 다른 서비스와 통신하므로, 그 단계는 일단 직렬화한다.
async fn batch_job(db: &Database) {
let work = run_query(db, FIND_WORK_QUERY).await;
let work_queries = stream::iter(work)
.map(|item| run_query(db, work_query(item)))
.buffered(5);
for await result in work_queries {
upload_result(result).await;
}
}
보기에도 좋고, 우리가 하려는 일을 정확히 표현한다. 이렇게 동작을 구성해 내는 것은 표현력 있는 고수준 동시성 API가 해야 할 바로 그 일이다.
하지만 글의 앞부분을 읽었다면, 어딘가에 문제가 도사리고 있음을 이미 알고 있을 것이다.
프로덕션에서 잡을 즐겁게 돌리기 시작한다. 그러던 어느 날, run_query 내부에서 타임아웃이 발생했다는 보고를 본다. 보고에 따르면, 연결이 생성된 뒤 쿼리가 전송되기 전 사이에 타임아웃이 났다.
엥? run_query 내부에선 별다른 일이 없다:
async fn run_query(db: &Database, query: Query) -> Vec<Row> {
let conn = db.connect().await;
conn.select_query(query).await
}
아주 큰 잡도 있다는 건 알고 있고, 그런 경우 upload_result에 20–30초가 걸린다. 하지만 이건 문제가 되지 않아야 한다. 이 함수는 스레드를 막지 않기 때문이다. 그런데도…
이 모든 걸 알고도, 이 코드에서 문제가 어디인지 찾을 수 있겠는가?
내 생각에 여전히 많은 사람이 놓칠 것이다.
문제는 루프를 통한 실행 흐름에 있다. 차근차근 따라가 보자.
Rust의 비동기 실행은 폴링에 의해 구동되므로, 아이템을 요구하기 전까지 work_queries 내부에서는 아무 일도 일어나지 않는다. 루프의 시작에서 우리는 첫 결과를 기다리며, 그 쿼리들을 힘차게 돌리기 시작한다:
async fn batch_job(db: &Database) {
let work = run_query(db, FIND_WORK_QUERY).await;
let work_queries = stream::iter(work)
.map(|item| run_query(db, work_query(item)))
.buffered(5); // <-- 활성 5개; 폴링 중
for await result in work_queries { // <-- 여기에 있음
upload_result(result).await;
}
}
든든한 buffered 콤비네이터가 run_query 호출 다섯 개를 띄우고, 우리는 여기서 기다린다 — 첫 번째가 끝날 때까지. 그러면 실행은 루프 본문으로 넘어간다.
async fn batch_job(db: &Database) {
let work = run_query(db, FIND_WORK_QUERY).await;
let work_queries = stream::iter(work)
.map(|item| run_query(db, work_query(item)))
.buffered(5); // <-- 활성 4개; 일시정지
for await result in work_queries {
upload_result(result).await; // <-- 여기에 있음
}
}
버퍼링된 run_query 작업 큐에 무슨 일이 벌어졌는지 보라: 남은 활성 쿼리 작업 4개의 폴링을 멈췄다. 즉, 루프 본문이 끝날 때까지 — 가장 느린 upload_result 호출이라면 20–30초가 될 수도 있다 — 그 작업들을 다시 폴링할 수 없다. 서버가 연결을 수락한 뒤 곧바로 폴링되지 않으면, 연결이 타임아웃되기 쉽다.
비동기 프로그래밍은 동시적 제어 흐름을 관리하는 일이며, 이 코드는 언어와 라이브러리 프리미티브의 혼합으로 이를 우아하게 해내는 듯 보인다. 그러나 결과는 미묘하고 놀라운 버그였다. 이런 경험은 Rust가 사용자에게 제공하려는 바와 다르다.
문제는 동시적 제어 흐름이 한 무리의 하위 작업(쿼리)과 다른 무리(업로드) 사이를 번갈아 가며 오가게 된다는 데 있다. 마치 큰 크랭크 두 개 사이를 뛰어다니며, 하나를 미친 듯이 돌리다 멈추기 전에 다른 크랭크를 돌리러 달려가는 꼴이다.
우리가 진짜 원하는 것은, 지금 하고 있는 모든 일을 돌리는 크랭크가 하나뿐인 상태다. 이것이 실행기(executor)의 역할이다. 실행기는 스폰된 임의 개수의 태스크와 그 하위 작업을 받아 동시적으로 구동한다. 종종 쿼리를 직접 스폰하는 방식으로 이런 문제를 풀 수 있다.
이번 경우(실화에 기반함: https://github.com/rust-lang/futures-rs/issues/2387)에는, 스폰된 태스크는 반드시 'static이어야 하고, run_query는 데이터베이스 핸들을 참조로 받기 때문에, 실제로 스폰이 어렵다. 일단 지금은 수정이 간단했다고 가정하자.
그렇다면, 이 버그를 야기한 책임은 어느 코드에 있을까? 책임은
run_query가 호출자가 충분히 자주 폴링해 줄 것에 의존한 데 있을까,수정: run_query 안에서 직접 태스크를 스폰한다.
batch_job이 콤비네이터들과 await 지점의 상호작용을 알아채지 못한 데 있을까,수정: 각 쿼리를 batch_job 안에서 스폰한다.
buffered가 이런 패턴이 예기치 않은 동작으로 이어질 수 있음을 경고하지 않은 데 있을까,수정: 문서를 업데이트한다.
혹은 위의 조합일까? 어떤 답도 만족스럽지 않다.
(1)은 가장 안전하다. 하지만 함수가 직접 await될 때 비효율적이고, 빌리기(라이프타임 기반 차용)를 지원하지 않으며, 임베디드 환경에서는 전혀 동작하지 않는다.
노련한 Async Rust 전문가라면 (2)라고 답할지 모른다. 하지만 초심자에게 이 상황을 피하라고 어떤 조언을 해야 할지 상상이 가지 않는다. 어쩌면 “가능하면 자주 spawn을 호출하라”일 텐데, 라이프타임 제약을 생각하면 그건 일종의 땜질일 뿐이고 좋은 조언도 아니다.
두 개의 크랭크가 번갈아 돌아간다는 정신적 모델을 세울 수는 있다. 하지만 미묘하고 놓치기 쉽다. 이런 건 바로 Rust가 우리 대신 점검해 줘야 할 종류처럼 느껴진다.
또 다른 선택지는 (3), 콤비네이터를 탓하는 것이다. 약간 일리가 있다.
buffered 내부에는 이 문제적 동작을 가능케 하는 FuturesOrdered가 있다. FuturesOrdered와 그 자매격인 FuturesUnordered는 많은 스트림 콤비네이터에서 나타난다. 이들은 마치 함수 안에 내장된 실행기처럼 행동한다. 원하는 만큼 퓨처를 추가하고, 동시에 돌린 다음, 준비되는 대로 결과를 처리하게 해 준다.
문제는 일반적인 실행기와는 달리, “실행하기”와 “결과를 처리하기”가 서로 다른 연산이며, 호출자가 전적으로 따로 처리해야 한다는 점이다. 만약 이 퓨처들이 실행기에 태스크로 스폰되었다면, 현재 태스크의 동료로서 그것과 나란히 동시 실행되었을 것이다.
이 차이 때문에 나는 이런 타입들을 “의사 실행기(pseudo-executor)”라 부른다. 이것이 우리 for await 루프의 자연스러운 번갈아 실행을, 원래는 동시 실행되어야 할 두 연산 사이의 번갈아 실행으로 바꿔 버린다.
이 동작에는 적어도 두 가지 장점이 있다:
Unordered 변형은 출력을 저장할 공간을 할당할 필요가 없다. 호출자가 직접 구동하므로, 결과가 준비되는 즉시 다음 결과를 호출자에게 반환할 수 있다.내게 물으면, 이런 장점은 그들이 만들어내는 발목 잡는 함정에 비하면 값어치가 없다.
콤비네이터를 탓한다면, 무엇으로 대체할지라는 질문이 따라온다. buffered는 의심할 바 없이 매우 유용한 도구다. 하나의 선택지는 buffered와, Futures[Un]Ordered를 사용하는 다른 콤비네이터들을, 하위 작업을 실제 실행기에 스폰하는 콤비네이터들로 바꾸는 것이다.
이는 생태계 전반에 표준화된 어떤 형태의 Spawn 트레이트가 있다면 가능할 것이다. 특히 이 API가 빌리기를 허용(동시에 병렬성은 금지)한다면 더욱 도움이 될 텐데, moro의 scoped_spawn처럼 말이다. 여기서처럼 폴링해야 할 퓨처의 타입과 최대 개수를 정적으로 알고 있다면, 할당자 없이 임베디드 애플리케이션에서도 작동하도록 만들 가능성도 있다.
이 접근을 어디서든 작동하게 만들 수 있다면 좋은 선택지 같다. 다만 임베디드 환경에서는 동작하지 않을 가능성도 있고, Spawn 트레이트 같은 많은 부품에 의존한다. 다른 길이 있을까?
for_each, 우리가 마땅히 가져야 할 영웅?또 다른 접근은 내부 반복을 쓰는 것이다:
async fn batch_job(db: &Database) {
let work = run_query(db, FIND_WORK_QUERY).await;
let work_queries = stream::iter(work)
.map(|item| run_query(db, work_query(item)))
.buffered(5);
work_queries.for_each(async |result| {
upload_result(result).await;
}).await;
}
for_each가 StreamExt가 아니라 Stream(또는 AsyncIterator)의 메서드였다면, buffered가 반환하는 타입에서 이를 오버라이드해, 스트림을 순회하는 동안 모든 태스크가 백그라운드에서 폴링되도록 보장할 수 있을 것이다.
하지만 정말로 신뢰성을 얻으려면, 이것이 스트림을 순회하는 유일한 방법이어야 한다. for await도, stream.next().await도 없어야 한다.
Async Rust는 폴링 기반 모델을 가지고 있으며, 이런 미세한 상호작용은 그 결과로 나타나는 현상 중 하나다. 이 특정 사례는 스트림에서만 드러나는데, 스트림(동기 반복자와 마찬가지)은 본질적으로 다음 요소를 계산하는 일과 그 요소를 처리하는 일을 번갈아 수행하기 마련이기 때문이다.
폴링 모델과 관련해 나타나는 또 다른 큰 문제는(대부분 퓨처에서) 예상치 못한 취소에 취약하다는 점이다. Yosh Wuyts가 설명했듯이(https://blog.yoshuawuyts.com/futures-concurrency-3/), 이는 select 콤비네이터의 동작으로 더욱 악화된다.
이 문제들이 Async Rust의 폴링 모델이 본질적으로 나쁘다는 뜻은 아니다. 우리가 필요한 것은, 그 모델이 함의하는 바와 잘 상호작용하면서도 예측 가능하게 동작하는 프리미티브들의 라이브러리다. 이를 위해서는 현실 세계에서 더 많은 실험이 필요하다.
그동안에는 이런 실험을 가능하게 하는 언어 기능을 먼저 제공해야 한다. 이에 대해서는 다음 글에서 더 이야기하겠다.