diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig index 81b9d207f3..959cb86421 100644 --- a/lib/std/Io/Dispatch.zig +++ b/lib/std/Io/Dispatch.zig @@ -650,7 +650,8 @@ const SwitchMessage = struct { mutex_wait: *Mutex.Waiter, futex_wait: *Futex.Waiter, futex_wake: *Futex.Waker, - sleep: c.dispatch.time_t, + sleep_wait: *SleepWaiter, + after: c.dispatch.time_t, destroy, exit, }; @@ -698,7 +699,17 @@ const SwitchMessage = struct { .init(ev.queue, @alignCast(@fieldParentPtr("context", message.contexts.old))); waker.futex.queue.async(waker, &Futex.Waker.remove); }, - .sleep => |when| { + .sleep_wait => |waiter| { + waiter.sleeper = + .init(ev.queue, @alignCast(@fieldParentPtr("context", message.contexts.old))); + const queue = waiter.cancelable.queue; + switch (waiter.sleeper.fiber.cancel_protection.check()) { + .unblocked => {}, + .blocked => waiter.cancelable = .blocked, + } + queue.async(waiter, &SleepWaiter.start); + }, + .after => |when| { const fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); when.after(ev.queue, fiber, &Fiber.@"resume"); }, @@ -1563,7 +1574,7 @@ const Futex = struct { .FOREVER => {}, else => |timeout| { const timer = c.dispatch.source_create(.TIMER, 0, .none, futex.queue) orelse { - log.warn("unable to create timer for futex timeout", .{}); + log.warn("failed to create timer for futex timeout", .{}); return error.CancelRequested; }; timer.as_object().set_context(waiter); @@ -4738,9 +4749,69 @@ fn clockResolution(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.ResolutionEr }; } +const SleepWaiter = struct { + sleeper: Sleeper = undefined, + cancelable: Cancelable, + timer: c.dispatch.source_t, + started: bool = false, + + fn start(context: ?*anyopaque) callconv(.c) void { + const waiter: *SleepWaiter = @ptrCast(@alignCast(context)); + waiter.cancelable.enter(waiter.sleeper.fiber) catch |err| switch (err) { + error.CancelRequested => waiter.timer.cancel(), + }; + waiter.timer.as_object().activate(); + } + + fn timedOut(context: ?*anyopaque) callconv(.c) void { + const waiter: *SleepWaiter = @ptrCast(@alignCast(context)); + waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) { + error.CancelRequested => return, + }; + waiter.timer.cancel(); + } + + fn canceled(context: ?*anyopaque) callconv(.c) void { + const cancelable: *Cancelable = @ptrCast(@alignCast(context)); + const waiter: *SleepWaiter = @fieldParentPtr("cancelable", cancelable); + cancelable.requested(waiter.sleeper.fiber); + waiter.timer.cancel(); + } + + fn wake(context: ?*anyopaque) callconv(.c) void { + const waiter: *SleepWaiter = @ptrCast(@alignCast(context)); + var sleeper = waiter.sleeper; + waiter.* = undefined; + Sleeper.wake(&sleeper); + } +}; + fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void { const ev: *Evented = @ptrCast(@alignCast(userdata)); - ev.yield(.{ .sleep = ev.timeFromTimeout(timeout) }); + const queue = c.dispatch.queue_create_with_target( + "org.ziglang.std.Io.Dispatch.sleep", + .SERIAL(), + ev.queue, + ) orelse { + log.warn("failed to create serial queue for sleep", .{}); + return ev.yield(.{ .after = ev.timeFromTimeout(timeout) }); + }; + defer queue.as_object().release(); + const timer = c.dispatch.source_create(.TIMER, 0, .none, queue) orelse { + log.warn("failed to create timer for sleep", .{}); + return ev.yield(.{ .after = ev.timeFromTimeout(timeout) }); + }; + var waiter: SleepWaiter = .{ + .cancelable = .{ .queue = queue, .cancel = &Futex.Waiter.canceled }, + .timer = timer, + }; + timer.as_object().set_context(&waiter); + timer.set_event_handler(&SleepWaiter.timedOut); + timer.set_cancel_handler(&SleepWaiter.wake); + timer.set_timer(ev.timeFromTimeout(timeout), c.dispatch.TIME_FOREVER, ev.leeway); + ev.yield(.{ .sleep_wait = &waiter }); + timer.as_object().release(); + try waiter.cancelable.acknowledge(waiter.sleeper.fiber); } fn timeFromTimeout(ev: *Evented, timeout: Io.Timeout) c.dispatch.time_t {