Io.Dispatch.sleep: add cancelation support

This commit is contained in:
Jacob Young 2026-02-14 05:42:15 -05:00
parent b7f93695f9
commit 771047ae93

View file

@ -650,7 +650,8 @@ const SwitchMessage = struct {
mutex_wait: *Mutex.Waiter,
futex_wait: *Futex.Waiter,
futex_wake: *Futex.Waker,
sleep: c.dispatch.time_t,
sleep_wait: *SleepWaiter,
after: c.dispatch.time_t,
destroy,
exit,
};
@ -698,7 +699,17 @@ const SwitchMessage = struct {
.init(ev.queue, @alignCast(@fieldParentPtr("context", message.contexts.old)));
waker.futex.queue.async(waker, &Futex.Waker.remove);
},
.sleep => |when| {
.sleep_wait => |waiter| {
waiter.sleeper =
.init(ev.queue, @alignCast(@fieldParentPtr("context", message.contexts.old)));
const queue = waiter.cancelable.queue;
switch (waiter.sleeper.fiber.cancel_protection.check()) {
.unblocked => {},
.blocked => waiter.cancelable = .blocked,
}
queue.async(waiter, &SleepWaiter.start);
},
.after => |when| {
const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
when.after(ev.queue, fiber, &Fiber.@"resume");
},
@ -1563,7 +1574,7 @@ const Futex = struct {
.FOREVER => {},
else => |timeout| {
const timer = c.dispatch.source_create(.TIMER, 0, .none, futex.queue) orelse {
log.warn("unable to create timer for futex timeout", .{});
log.warn("failed to create timer for futex timeout", .{});
return error.CancelRequested;
};
timer.as_object().set_context(waiter);
@ -4738,9 +4749,69 @@ fn clockResolution(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.ResolutionEr
};
}
const SleepWaiter = struct {
sleeper: Sleeper = undefined,
cancelable: Cancelable,
timer: c.dispatch.source_t,
started: bool = false,
fn start(context: ?*anyopaque) callconv(.c) void {
const waiter: *SleepWaiter = @ptrCast(@alignCast(context));
waiter.cancelable.enter(waiter.sleeper.fiber) catch |err| switch (err) {
error.CancelRequested => waiter.timer.cancel(),
};
waiter.timer.as_object().activate();
}
fn timedOut(context: ?*anyopaque) callconv(.c) void {
const waiter: *SleepWaiter = @ptrCast(@alignCast(context));
waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) {
error.CancelRequested => return,
};
waiter.timer.cancel();
}
fn canceled(context: ?*anyopaque) callconv(.c) void {
const cancelable: *Cancelable = @ptrCast(@alignCast(context));
const waiter: *SleepWaiter = @fieldParentPtr("cancelable", cancelable);
cancelable.requested(waiter.sleeper.fiber);
waiter.timer.cancel();
}
fn wake(context: ?*anyopaque) callconv(.c) void {
const waiter: *SleepWaiter = @ptrCast(@alignCast(context));
var sleeper = waiter.sleeper;
waiter.* = undefined;
Sleeper.wake(&sleeper);
}
};
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void {
const ev: *Evented = @ptrCast(@alignCast(userdata));
ev.yield(.{ .sleep = ev.timeFromTimeout(timeout) });
const queue = c.dispatch.queue_create_with_target(
"org.ziglang.std.Io.Dispatch.sleep",
.SERIAL(),
ev.queue,
) orelse {
log.warn("failed to create serial queue for sleep", .{});
return ev.yield(.{ .after = ev.timeFromTimeout(timeout) });
};
defer queue.as_object().release();
const timer = c.dispatch.source_create(.TIMER, 0, .none, queue) orelse {
log.warn("failed to create timer for sleep", .{});
return ev.yield(.{ .after = ev.timeFromTimeout(timeout) });
};
var waiter: SleepWaiter = .{
.cancelable = .{ .queue = queue, .cancel = &Futex.Waiter.canceled },
.timer = timer,
};
timer.as_object().set_context(&waiter);
timer.set_event_handler(&SleepWaiter.timedOut);
timer.set_cancel_handler(&SleepWaiter.wake);
timer.set_timer(ev.timeFromTimeout(timeout), c.dispatch.TIME_FOREVER, ev.leeway);
ev.yield(.{ .sleep_wait = &waiter });
timer.as_object().release();
try waiter.cancelable.acknowledge(waiter.sleeper.fiber);
}
fn timeFromTimeout(ev: *Evented, timeout: Io.Timeout) c.dispatch.time_t {