3DS의 Horizon 운영 체제, 동기화 객체, IPC, 타이머, 스레드 풀, 락을 활용해 Rust 비동기 실행기를 구현하는 과정을 살펴봅니다.
2026년 5월 26일
첫 번째 글에서는 몇 가지 future를 구동하는 최소한의 실행기를 만드는 방법을 배웠습니다. 이제 그걸 3DS에서 해 봅시다!
3DS는 Horizon 이라는 마이크로커널 기반 운영 체제를 실행합니다. 주변기기용 드라이버는 약간 특별한 프로세스 에 불과합니다. 주변기기와 상호작용하려면, 프로세스 간 통신(IPC)을 사용해 이 프로세스들에 메시지를 보냅니다.
추가로, 3DS는 일련의 동기화 객체를 제공합니다. 즉, 스레드가 대기 할 수 있는 것들입니다.
이 두 가지가 바로 asyncio 실행기를 만드는 데 필요한 기본 부품입니다!
그럼 시작해 봅시다:
우리는 아주 최소한의 실행기를 만들고 있습니다. 이 실행기의 역할은 future를 spawn하고, 그다음 그것들을 poll하라는 신호를 받는 것뿐입니다.
처음에는 여기에 IPC를 사용했습니다. Rust future 객체를 IPC로 보내는 것까지 포함해서요. 꽤 재미있는 도전이었지만, 큰 문제가 있습니다. 현재 실행 중인 바로 그 스레드로 IPC 요청을 보내면 심각하게 망가집니다. future는 이상적으로 서로를 깨울 수 있어야 하므로, 이 방식에서는 lockup이 자주 발생했습니다.
대신, 우리의 실행기는 세 가지 구성 요소를 갖습니다:
다음은 큐입니다:
pub static TASK_QUEUE: OnceLock<SyncQueue<BoxFuture<'static, ()>>> = OnceLock::new();
pub static WAKE_QUEUE: OnceLock<SyncQueue<TaskToken>> = OnceLock::new();
이 큐들은 3DS 내장 세마포어 원시 타입을 사용해서, 다른 작업이 우리를 깨우라고 신호를 보낼 수 있게 합니다! 우리는 여러 세마포어를 한 번에 기다릴 수 있고, 그중 하나라도 신호를 받으면 깨어날 수 있습니다:
let handles = [self.task_semaphore, self.wake_semaphore];
loop {
let mut idx = 0;
unsafe { svcWaitSynchronizationN(&mut idx, handles.as_ptr(), 2, false, i64::MAX) };
if idx == 1 { // this is the wake semaphore!
while let Some(task) = WAKE_QUEUE.get().unwrap().remove() {
self.wake(task);
}
} else if idx == 0 { // this is the spawn semaphore!
while let Some(task) = TASK_QUEUE.get().unwrap().remove() {
self.spawn(task);
}
}
}
여담: waker 이전 글에서는 future에 넘겨주던 Waker가 Arc로 감싼 객체였고, 그 안에는 현재 작업의 id 그리고 실행기와 연결된 sender 채널이 함께 들어 있었습니다. 이제 이걸 훨씬 더 줄일 수 있습니다!
Arc가 아닌 waker를 만들려면, RawWaker 타입을 사용할 수 있습니다. 이 타입은 (a) 어떤 데이터에 대한 포인터와, (b) 그 데이터를 인자로 받아 작업을 깨우는 함수 포인터 집합으로 구성됩니다.
보통 이 포인터는 그냥 Arc나 비슷한 타입을 가리킵니다. 하지만 포인터는 결국 정수이고, 우리는 이미 정수로 작업 ID를 쓰고 있죠... 그러니 작업 id 자체를 "데이터 포인터" 로 넘기고, 다시 u32로 캐스팅하면 됩니다!
// snip: <impls for clone_waker, wake_by_ref, drop>
pub struct TaskToken(pub u32);
unsafe fn wake(data: *const ()) {
WAKE_QUEUE.get().unwrap().add(TaskToken(data as u32));
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop);
pub fn make_waker(task: TaskToken) -> Waker {
unsafe { Waker::from_raw(RawWaker::new(task.0 as *const (), &VTABLE)) }
}
이제 실행기가 생겼습니다! 잠들기 같은 유용한 일을 시켜 봅시다.
이를 구현하려면 다음을 수행하는 스레드를 실행하고 싶습니다:
이 둘을 동시에 수행하려면 재미있는 트릭을 쓸 수 있습니다. 우리가 표준 채널 대신 3DS의 프로세스 간 통신(IPC) 시스템을 사용하고 있다는 걸 기억하시죠?
알고 보니 IPC 요청을 기다리는 그 동일한 시스템 호출이 또한 ... 3DS 운영 체제에서 대기 가능한 거의 모든 것을 함께 기다릴 수 있습니다. 이벤트, 뮤텍스, 타이머까지 전부 가능합니다.
구현은 이렇습니다(적당한 추상화를 곁들여서):
struct ReactorHandler {
wait_tokens: LiteMap<OSHandle, Vec<TaskToken>>,
}
impl IPCServerHandler<ReactorCmd, ReactorReply> for ReactorHandler {
// handles IPC requests
fn handle_request(
&mut self,
request: ReactorCmd,
server: &mut IPCServer<ReactorCmd, ReactorReply>,
) -> ReactorReply {
match request {
ReactorCmd::AddHandle(task, handle) => {
self.wait_tokens.entry(handle).or_default().push(task);
server.add_handle_to_list(handle);
ReactorReply::Ok
}
ReactorCmd::PopHandle => todo!(),
}
}
// handles non-IPC (any other synchronization object)
fn handle_additional_oshandle(
&mut self,
handle: OSHandle,
_server: &mut IPCServer<ReactorCmd, ReactorReply>,
) {
for token in self.wait_tokens.remove(&handle).unwrap() {
WAKE_QUEUE.get().unwrap().add(token);
}
}
}
파일과 소켓 쪽부터는 조금 더 복잡해집니다. 이 둘의 연산은 모두 IPC 호출로 구현되며, 항상 동기식입니다. 진행 중인 연산을 다른 것들과 함께 기다릴 수 있는 동기화 객체로 바꿀 수는 없습니다. 일반 함수 호출에 더 가깝습니다.
대신 스레드 풀을 사용합니다. 파일 연산은 준비되지 않았을 때 스레드를 양보하게 만들기 때문에, CPU를 독점하지 않고도 사용할 수 있습니다.
그래서 파일 연산을 하려면, 우리 작업의 id와 함께 그 연산을 스레드에 제출합니다. 그러면 워커가 그것을 집어 들고, 연산이 끝났을 때 우리를 깨우기 위해 작업 id를 사용합니다!
그럼 구현해 봅시다. 좀 더 정확히 말하면, 파일의 운영 체제 핸들(유닉스 쪽 사람들에게는 파일 디스크립터), 우리 작업의 id, 워커가 연산 결과를 기록할 몇 개의 포인터, 그리고 실제 데이터가 저장될 버퍼에 대한 포인터를 담은 메시지를 보냅니다.
#[derive(IPCMessage)]
#[repr(u32)]
pub(crate) enum AsyncFsMsg {
Write(#[flatten] FileIoOperation) = 0xA,
Read(#[flatten] FileIoOperation) = 0xB,
Flush(#[flatten] FileControlOperation) = 0xC,
Close(#[flatten] FileControlOperation) = 0xD,
}
#[derive(IPCSerializable)]
pub(crate) struct FileIoOperation {
#[normal]
pub file: u32,
#[normal]
pub task: TaskToken,
#[normal]
pub state: u32, // atomici32 ptr (result code),
#[normal]
pub result: u32, // atomicu32 ptr (u32::MAX if not done, else bytes read)
#[normal]
pub offset: u32,
#[normal]
pub len: u32,
#[normal]
pub data_ptr: u32,
}
impl FileIoOperation {
fn view<'a>(&'a self) -> FileIoOperationView<'a> {
FileIoOperationView {
file: ManuallyDrop::new(unsafe { FileHandle::from_raw(self.file) }),
task: self.task,
state: unsafe { AtomicI32::from_ptr(self.state as *mut i32) },
result: unsafe { AtomicU32::from_ptr(self.result as *mut u32) },
offset: self.offset,
data: unsafe {
std::slice::from_raw_parts(self.data_ptr as *const u8, self.len as usize)
},
}
}
}
워커 쪽에서는 이렇게 됩니다:
match task {
AsyncFsMsg::Write(op) => {
info!("io.worker.{} write fd:{:x}", self.id, op.file);
let mut op = op.view();
let res: BunnyResult<usize> =
op.file
.write(op.offset as u64, op.data, super::WriteOptions::empty());
op.resolve(res);
self.executor.wake(op.task).unwrap();
},
_ => ...
}
그리고 future 쪽은 이렇습니다:
impl<'a> Future for Write<'a> {
type Output = BunnyResult<usize>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let bytes_read = self.bytes_written.load(Ordering::Acquire);
if bytes_read == u32::MAX && !self.registered {
if let Err(e) = self.client.request(&AsyncFsMsg::Write(FileIoOperation {
file: self.file.inner.session,
task: TaskToken::from_waker(cx.waker()),
state: self.state.as_ptr() as u32,
result: self.bytes_written.as_ptr() as u32,
offset: self.offset,
len: self.data.len() as u32,
data_ptr: self.data.as_ptr() as u32,
})) {
Poll::Ready(Err(e.into()))
} else {
self.registered = true;
Poll::Pending
}
} else if self.registered && bytes_read != u32::MAX {
state_resolve(&self.state, bytes_read as usize)
} else {
Poll::Pending
}
}
}
비동기 전용 락이나 mutex를 구현하고 싶다면, 그건 쉽습니다. 락을 기다리는 작업들의 목록을 유지하면 됩니다. 락을 걸 때는 자신을 목록에 추가하고, 락을 풀 때는 목록에 다음 작업이 있으면 그 작업을 깨우면 됩니다.
동기 그리고 비동기 코드 모두와 연결될 수 있는 락을 만드는 일은 조금 더 까다롭습니다. 제가 택한 방법은 libctru의 LightLock을 수정하는 것이었는데, 이 구현은 AddressArbitrator라는 가벼운 동기화 원시 타입을 사용해 스레드에 신호를 보냅니다.
전체 코드는 여기에 싣기엔 조금 길지만, 핵심 부분은 이렇습니다. 전체 구현이 궁금하다면 여기에 있습니다.
impl DsLock {
// if we want to use this from a non-async contest, we use TaskToken(u32::MAX) to mark that
fn try_lock_and_register(&self, token: TaskToken) -> bool {
let state = self.arb.0.load(Ordering::Acquire);
if state > 0 {
self.arb.0.store(-state, Ordering::Release);
true
} else {
self.arb.0.store(state - 1, Ordering::Release);
self.waiters.lock().push(token);
false
}
}
unsafe fn unlock(&self) {
let lock_state = self.arb.0.load(Ordering::Acquire);
if lock_state < 0 {
self.arb.0.store(-lock_state, Ordering::Release);
let mut waiters = self.waiters.lock();
if waiters.is_empty() {
return;
}
// get the next task to wake up...
let task = waiters.remove(0);
drop(waiters);
if task == TaskToken(u32::MAX) {
// it's a non-async one, signal it using the AddressArbitrator
let _ = self.arb.signal_one();
} else {
// it's an async task, wake it up using the executor
executor::WAKE_QUEUE.get().unwrap().add(task);
}
}
}
}
이제 작은 예제를 보겠습니다. egui_citro3d를 사용합니다(github.com/LexiBigCheese가 원작자이고, 제가 약간 수정했습니다).
이 예제는 비동기 TCP 소켓과 블로킹 UI 작업을 결합해서, 화면에 전송된 메시지를 표시합니다!
// full example: https://github.com/kore-signet/bunnyds-egui-example
let server = AsyncTcpSocket::bind("0.0.0.0:3027")?;
let client = server.accept().await?;
let messages: Arc<DSMutex<Vec<String>>> = Arc::new(DSMutex::new(Vec::new()));
let messages_tx: Arc<DSMutex<Vec<String>>> = Arc::clone(&messages);
let message_task = bunnyds::spawn(async move {
let mut rd_buf = [0u8; 1024];
while let Ok(bytes_read) = client.recv(&mut rd_buf).await {
if bytes_read == 0 {
return;
}
messages_tx
.lock()
.await
.push(String::from_utf8_lossy(&rd_buf[..bytes_read]).into());
}
});
let ui_task = bunnyds::spawn_blocking(Some(0x18), Some(2), move || {
let mut hid = Hid::new().unwrap();
let apt = Apt::new().unwrap();
let gfx = Gfx::new().unwrap();
let mut egui_ctx = EguiRenderer::new(&mut hid, &gfx, &apt, &key_mapping);
while apt.main_loop() {
gfx.wait_for_vblank();
egui_ctx.render_frame(
|ui| {
egui::CentralPanel::default().show_inside(ui, |ui| {
let messages = messages.lock_sync();
for message in messages.iter() {
ui.label(message.trim());
}
});
},
|_ui| {},
);
}
});
futures::future::join(ui_task, message_task).await;
이건 정말 재미있는 작은 모험이었습니다! 이 글이나 3DS에서 Rust 코드를 작성하는 일에 대해 질문이 있다면, kore at cat-girl.gay 로 메일 주세요.
프로젝트 전체 코드는 github.com/kore-signet/bunnyds에서 볼 수 있습니다.
이 과정 전체에서 추가적인 사이드 프로젝트 하나는, 3DS IPC 시스템을 Rust 원시 타입으로 추상화하는 라이브러리를 만드는 것이었습니다. 그 코드는 여기에 있습니다. 그런 글도 재미있을 것 같다면 알려 주세요!