Rust의 비동기 스트림 버퍼링 어댑터가 for await 루프와 상호작용할 때 루프 본문이 비동기이면 버퍼링이 진행되지 않는 문제를 시각화하고 분석한다. 내부 반복(for_each)으로의 회귀 없이 AsyncIterator에 poll_progress 메서드를 추가해 루프 본문이 대기 중일 때도 스트림이 진행되도록 for await의 디슈거링을 제안하고, 백프레셔를 유지하면서 문제를 해결하는 방법을 설명한다.
2023년 12월 12일
지난주 Tyler Mandry가 Rust 프로젝트에서 “Barbara battles buffered streams(바버라가 버퍼링된 스트림과 싸우다)”라고 부르는 문제에 관한 흥미로운 글을 올렸다. Tyler는 이 이슈를 잘 설명하고 있는데, 간단히 말하면 futures 라이브러리의 버퍼링 어댑터(Buffered와 BufferUnordered)가 루프 본문에서 비동기 처리를 할 때(즉, await 표현식을 하나라도 포함할 때) for await와 잘 상호작용하지 않는다는 것이다.
이 문제를 시각적으로 살펴보면 더 잘 이해할 수 있다고 생각한다. 먼저, 사용자가 일반적인, 비비동기 Iterator를 for 루프로 처리할 때 발생하는 제어 흐름을 생각해 보자:
┌── SOME ────────────────┐
╔═══════════════╗ ╔═══════▼═══════╗
║ ║▐▌ ║ ║▐▌
──────▶ NEXT ║▐▌ ║ LOOP BODY ║▐▌
║ ║▐▌ ║ ║▐▌
╚════════════▲══╝▐▌ ╚═══════════════╝▐▌
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
│ └───────────────────┘
└── NONE ──────────────────────────────▶
for 루프는 먼저 이터레이터의 next 메서드를 호출하고, 그러고 나서 결과 아이템(있다면)을 루프 본문에 전달한다. 더 이상 아이템이 없으면 루프를 빠져나온다.
지금까지 구상된 AsyncIterator와 그에 상응하는 for await 루프도 제어 흐름이 거의 같다. 먼저 poll_next가 반복적으로 호출되고, 이 태스크는 Pending일 때 제어를 양보하며, Ready를 반환할 때까지 계속된다. 그런 다음 제어가 루프 본문으로 넘어가고, 본문이 끝나면 다시 비동기 이터레이터로 돌아온다. AsyncIterator의 유일한 추가적 편의는, 이터레이터가 할 일이 없을 때 이 태스크가 제어를 양보할 수 있다는 점이다.
버퍼링 인터페이스의 문제는 이 문맥에서 그 제어 흐름을 시각화해 보면 드러난다. 여기서 나는 동시 실행을, 다른 박스 안에 포함된 박스로 표현할 것이다. 이 단위들은 비결정적인 순서로 수행되므로, 동시에, 혹은 덜 정확하게 말해 “같은 시간에” 일어난다고 볼 수 있다. 이 예시에서는 이 버퍼링 AsyncIterator가 최대 3개의 future를 동시에 실행한다고 가정해 보겠다:
┌── SOME ─────────────┐
╔═══════════════╗ │
║ POLL_NEXT ║▐▌ │
║ ╔══════════╗ ║▐▌ │
║ ║ FUTURE ║▐▌║▐▌ │
║ ╚══════════╝▐▌║▐▌ ╔═══════▼═══════╗
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║▐▌
──────▶ ╔══════════╗ ║▐▌ ║ LOOP BODY ║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║▐▌
║ ╚══════════╝▐▌║▐▌ ╚═══════════════╝▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▘
║ ╔══════════╗ ║▐▌ │
║ ║ FUTURE ║▐▌║▐▌ │
║ ╚══════════╝▐▌║▐▌ │
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ │
╚════════════▲══╝▐▌ │
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ │
│ └────────────────┘
└── NONE ───────────────────────────────────▶
보면 버퍼링된 비동기 이터레이터의 세 자식 future가 모두 동시에 실행되지만, 이는 오직 poll_next를 호출하는 동안에만 그렇다. 루프 본문을 처리하는 동안에는 그 자식 future들이 전혀 실행되지 않는다. 루프 본문이 즉시 반환된다면(예: await 지점이 없거나 모두 즉시 준비되는 경우) 괜찮지만, 루프 본문 자체가 비동기라면 그것이 완료될 때까지 비동기 이터레이터의 버퍼링된 future는 준비되어 있더라도 진행되지 않는다.
이것은 사용자에게 직관적이지 않다. 버퍼링 인터페이스가 루프 본문이 실행되는 동안에는 작업을 버퍼링할 수 없다는 사실을 눈치채지 못하는 것이다. 그리고 이는 전적으로 불필요하기도 하다. for await는 비동기 이터레이터의 소유권을 가져가며, 루프 본문이 그것을 수정할 방법은 없다(이런 것을 “이터레이터 무효화”라고 하는데, Rust의 소유권/대여 규칙이 이런 종류의 버그를 방지한다는 점이 큰 장점이다). 원칙적으로 루프 본문과 버퍼링 스트림을 동시에 실행하지 못할 이유가 없지만, 현재 API는 그런 편의를 제공하지 않는다.
Tyler Mandry는 대안 설계로 외부 반복에서 내부 반복으로 되돌아가자고 제안한다. 즉, 스트림이 poll_next 대신 for_each를 노출하게 하자는 것이다. 그 논지는, 버퍼링 스트림 어댑터가 for_each에 전달되는 클로저를 버퍼링과 동시에 실행하도록 이 메서드를 오버라이드할 수 있어 이 문제를 피할 수 있다는 것이다. 내부 반복은 그런 편의를 제공하는 한 가지 방법이긴 하지만, 심각한 단점도 여럿 딸려온다.
나는 이전에 Rust에서 반복과 동시성의 얽힌 역사에 관해 글을 썼다. 그 글에서 Rust가 한때 내부 반복을 시도했지만, 여러 단점을 극복하기 위해 외부 반복으로 전환했음을 기록했다. 이 단점들은 비동기 코드에 내부 반복을 적용하려는 시도에도 똑같이 해당된다. 특히 눈에 띄는 점은 다음과 같다:
return이나 ? 같은 일반적인 제어 흐름을 사용할 수 없다. 루프 처리가 클로저로 작성되고, 클로저는 return이나 ? 같은 제어 흐름 연산자에 대해 투명하지 않기 때문이다. 이 문제를 우회하려면 복잡한 조합자 세트가 필요하다.zip 같은 “interleaving(교차 실행)” 연산자를 제로 비용으로 정의하기가 어렵거나 불가능할 수 있다.내 생각에 내부 반복은 Rust에선 시작조차 어려운 선택지다. Rust는 Iterator에 대해 스택 없는 상태 기계를 택했고, Future로 동시성에서도 그 접근을 더욱 공고히 했는데, 이는 앞선 글에서 열거한 매우 타당한 이유들 때문이다. 다행히 이 문제를 해결하는 다른 방법이 있다. 바로 poll_progress의 도입이다:
trait AsyncIterator {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<>;
}
poll_progress의 아이디어는 AsyncIterator에 poll_next를 호출하지 않고도 진행 상황을 만들 수 있게 하는 추가 메서드를 더하는 것이다. AsyncIterator의 구현자는, 더 이상 진행하려면 poll_next를 호출하는 수밖에 없게 되는 순간 즉시 Ready를 반환하도록 poll_progress를 구현해야 한다. 대부분의 비동기 이터레이터에선 항상 그런 상태일 것이다. 하지만 Buffered나 BufferUnordered 같은 버퍼링 스트림의 경우 poll_progress는 최대 동시 개수만큼 자식 future가 버퍼될 때까지 계속해서 그 자식 future들을 poll할 것이다.
이 새로운 편의를 활용하기 위해, for await 루프는 루프 본문이 Pending일 때 비동기 이터레이터에 대해 진행을 poll하도록 디슈거링되어, 사실상 루프 본문과 이터레이터를 동시에 실행하게 된다:
┌── SOME ─────────────────┐
│ ╔══════════▼══════════╗
│ ║ ╔═══════════╗ ║▐▌
│ ║ ║ LOOP BODY ║▐▌ ║▐▌
╔═══════════════╗ ║ ╚═══════════╝▐▌ ║▐▌
║ ║▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌
──────▶ POLL_NEXT ║▐▌ ║ ╔═══════════════╗ ║▐▌
║ ║▐▌ ║ ║ POLL_PROGRESS ║▐▌║▐▌
╚════════════▲══╝▐▌ ║ ╚═══════════════╝▐▌║▐▌
▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▀▘ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
│ │ ╚═════════════════════╝▐▌
│ │ ▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▘
│ └─────────────────────┘
└── NONE ───────────────────────────────────▶
앞에서 썼듯, 대부분의 비동기 이터레이터에 대해선 이 동작이 사실상 아무 일도 하지 않을 것이다. 하지만 비동기 이터레이터가 poll_next 호출에 앞서 작업을 버퍼링할 수 있다면, 그렇게 할 것이다. Buffered 같은 경우 이 흐름도는 다음과 같아진다:
┌── SOME ────────────────────────┐
│ ╔═════════════════▼═════════════════╗
╔═══════════════╗ ║ ╔═══════════════╗ ║▐▌
║ ║▐▌ ║ ║ LOOP BODY ║▐▌ ║▐▌
║ POLL_NEXT ║▐▌ ║ ╚═══════════════╝▐▌ ║▐▌
║ ║▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌
║ ╔══════════╗ ║▐▌ ║ ║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ╔══════════════════════════════╗ ║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ POLL_PROGRESS ║▐▌║▐▌
───────▶ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║ ╔══════════╗ ╔══════════╗ ║▐▌║▐▌
║ ╔══════════╗ ║▐▌ ║ ║ ║ FUTURE ║▐▌ ║ FUTURE ║▐▌║▐▌║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║ ╚══════════╝▐▌ ╚══════════╝▐▌║▐▌║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▘ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌║▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ║ ╔══════════╗ ║▐▌║▐▌
║ ╔══════════╗ ║▐▌ ║ ║ ║ FUTURE ║▐▌ ║▐▌║▐▌
║ ║ FUTURE ║▐▌║▐▌ ║ ║ ╚══════════╝▐▌ ║▐▌║▐▌
║ ╚══════════╝▐▌║▐▌ ║ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▘ ║▐▌║▐▌
║ ▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌ ║ ╚══════════════════════════════╝▐▌║▐▌
╚═════════════▲═╝▐▌ ║ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘║▐▌
▀▀▀│▀▀▀▀▀▀▀▀▀│▀▀▀▘ ╚═══════════════════════════════════╝▐▌
│ │ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀│▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▘
│ └──────────────────────────┘
└── NONE ────────────────────────────────────────────────▶
이제 루프 실행의 모든 지점에서 모든 자식 future가 poll되므로, “버퍼링된 스트림과의 전투” 문제는 완전히 해소된다. 최대 개수의 자식 태스크가 루프 본문보다 먼저 완료되면, poll_progress는 Ready를 반환한다. 이는 큐에서 poll_next 호출로 하나를 꺼내기 전까지 더 이상 자식 태스크를 poll할 수 없음을 나타낸다. 이렇게 하면 백프레셔가 계속 적용된다(무한 버퍼링 스트림을 선택하면 백프레셔는 적용되지 않는다. 버퍼링 스트림에는 언제나 어떤 형태로든 상한을 두는 것이 좋다).
이 모든 데 필요한 것은 AsyncIterator에 메서드 하나를 더하고, for await의 디슈거링을 조금 더 복잡하게 만드는 것뿐이다. 또한, 내부적으로 for await를 사용하지 않는 AsyncIterator의 모든 소모형(consuming) 조합자도 poll_progress를 활용하도록 작성해야 한다.