From 6d6532dd9eb862dfd6e59ceed5c762342d2cc0d5 Mon Sep 17 00:00:00 2001 From: Matthew Lugg Date: Tue, 3 Feb 2026 22:14:42 +0000 Subject: [PATCH] Io.Threaded: add ParkingMutex, and deal with spurious unparks on NetBSD We can't use Io.Mutex in parking_futex; instead, we need a simple parking-based mutex implementation. That's fairly simple to do. Also deal with spurious unparks on NetBSD, where they *can* happen (as opposed to Windows, where they cannot). --- lib/std/Io/Threaded.zig | 284 +++++++++++++++++++++++++++++++--------- 1 file changed, 219 insertions(+), 65 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 558263a010..59d816f62c 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -2662,7 +2662,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout while (b.pending.head != .none and b.completions.head == .none) { var delay_interval: windows.LARGE_INTEGER = interval: { const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); - break :interval t.deadlineToWindowsInterval(d); + break :interval timeoutToWindowsInterval(.{ .deadline = d }).?; }; const alertable_syscall = try AlertableSyscall.start(); const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval); @@ -4339,7 +4339,10 @@ fn dirCreateFileWindows( // kernel bug with retry attempts. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -4352,7 +4355,10 @@ fn dirCreateFileWindows( // fixed by sleeping and retrying until the error goes away. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -7382,7 +7388,10 @@ fn dirReadLinkWindows(dir: Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLink // kernel bug with retry attempts. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -7395,7 +7404,10 @@ fn dirReadLinkWindows(dir: Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLink // fixed by sleeping and retrying until the error goes away. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -10956,7 +10968,7 @@ fn nowWasi(clock: Io.Clock) Io.Timestamp { fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (timeout == .none) return; - if (use_parking_sleep) return parking_sleep.sleep(timeout.toTimestamp(ioBasic(t))); + if (use_parking_sleep) return parking_sleep.sleep(timeout); if (native_os == .wasi) return sleepWasi(t, timeout); if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout); return sleepNanosleep(t, timeout); @@ -14363,7 +14375,7 @@ const Wsa = struct { fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void { const wsa = &t.wsa; - try mutexLock(&wsa.mutex); + mutexLock(&wsa.mutex); defer mutexUnlock(&wsa.mutex); switch (wsa.status) { .uninitialized => { @@ -16943,7 +16955,7 @@ const parking_futex = struct { /// avoid a race. num_waiters: std.atomic.Value(u32), /// Protects `waiters`. - mutex: Io.Mutex, + mutex: ParkingMutex, waiters: std.DoublyLinkedList, /// Prevent false sharing between buckets. @@ -17007,8 +17019,8 @@ const parking_futex = struct { var status_buf: std.atomic.Value(Thread.Status) = undefined; { - mutexLock(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); + bucket.mutex.lock(); + defer bucket.mutex.unlock(); _ = bucket.num_waiters.fetchAdd(1, .acquire); @@ -17059,7 +17071,7 @@ const parking_futex = struct { bucket.waiters.append(&waiter.node); } - if (park(timeout, ptr)) { + if (park(timeout, ptr, waiter.thread_status)) { // We were unparked by either `wake` or cancelation, so our current status is either // `.none` or `.canceling`. In either case, they've already removed `waiter` from // `bucket`, so we have nothing more to do! @@ -17084,7 +17096,7 @@ const parking_futex = struct { // to unpark us. Whoever did that will remove us from `bucket`. Wait for // that (and drop the unpark request in doing so). // New status is `.none` or `.canceling` respectively. - park(.none, ptr) catch |e| switch (e) { + park(.none, ptr, waiter.thread_status) catch |e| switch (e) { error.Timeout => unreachable, }; }, @@ -17114,8 +17126,8 @@ const parking_futex = struct { // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. var waking_head: ?*std.DoublyLinkedList.Node = null; { - mutexLock(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); + bucket.mutex.lock(); + defer bucket.mutex.unlock(); var num_removed: u32 = 0; var it = bucket.waiters.first; @@ -17171,8 +17183,8 @@ const parking_futex = struct { fn removeCanceledWaiter(waiter: *Waiter) void { const bucket = bucketForAddress(waiter.address); - mutexLock(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); + bucket.mutex.lock(); + defer bucket.mutex.unlock(); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); } @@ -17206,7 +17218,7 @@ const parking_sleep = struct { .blocked_canceling => unreachable, } } - if (park(timeout, null)) { + if (park(timeout, null, &thread.status)) { // The only reason this could possibly happen is cancelation. const old_status = thread.status.load(.monotonic); assert(old_status.cancelation == .canceling); @@ -17229,7 +17241,7 @@ const parking_sleep = struct { // us for a cancelation. Whoever did that will have called `unpark`, so // drop that unpark request by waiting for it. // Status is still `.canceling`. - park(.none, null) catch |e| switch (e) { + park(.none, null, &thread.status) catch |e| switch (e) { error.Timeout => unreachable, }; return; @@ -17245,32 +17257,183 @@ const parking_sleep = struct { } } // Uncancelable sleep; we expect not to be manually unparked. - if (park(timeout, null)) { + var dummy_status: std.atomic.Value(Thread.Status) = .init(.{ .cancelation = .parked, .awaitable = .null }); + if (park(timeout, null, &dummy_status)) { unreachable; // unexpected unpark } else |err| switch (err) { error.Timeout => return, } } }; +const ParkingMutex = struct { + state: std.atomic.Value(State), -/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. -fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { + const init: ParkingMutex = .{ .state = .init(.unlocked) }; + + comptime { + assert(use_parking_futex); + } + + const State = enum(usize) { + unlocked = 1, + /// This value is intentionally 0 so that `waiter` returns `null`. + locked_once = 0, + /// Contended; value is a `*Waiter`. + _, + /// Returns the head of the waiter list. Illegal to call if `s == .unlocked`. + fn waiter(s: State) ?*Waiter { + return @ptrFromInt(@intFromEnum(s)); + } + /// Returns a locked state where `w` is contending the lock. + /// If `w` is `null`, returns `.locked_once`. + fn fromWaiter(w: ?*Waiter) State { + return @enumFromInt(@intFromPtr(w)); + } + }; + const Waiter = struct { + status: std.atomic.Value(Thread.Status), + /// Never modified once the `Waiter` is in the linked list. + next: ?*Waiter, + /// Never modified once the `Waiter` is in the linked list. + tid: std.Thread.Id, + }; + fn lock(m: *ParkingMutex) void { + state: switch (State.unlocked) { // assume 'unlocked' to optimize for uncontended case + .unlocked => continue :state m.state.cmpxchgWeak( + .unlocked, + .locked_once, + .acquire, // acquire lock + .monotonic, + ) orelse { + @branchHint(.likely); + return; + }, + + .locked_once, _ => |last_state| { + const old_waiter = last_state.waiter(); + const self_tid = if (Thread.current) |t| t.id else std.Thread.getCurrentId(); + var waiter: Waiter = .{ + .next = old_waiter, + .status = .init(.{ .cancelation = .parked, .awaitable = .null }), + .tid = self_tid, + }; + if (m.state.cmpxchgWeak( + .fromWaiter(old_waiter), + .fromWaiter(&waiter), + .release, // release `waiter` + .monotonic, + )) |new_state| { + continue :state new_state; + } + // We're now in the list of waiters---park until we're given the lock. + park(.none, m, &waiter.status) catch |err| switch (err) { + error.Timeout => unreachable, + }; + // We now hold the lock. + assert(waiter.status.load(.monotonic).cancelation == .none); + return; + }, + } + } + fn unlock(m: *ParkingMutex) void { + state: switch (State.locked_once) { // assume 'locked_once' to optimize for uncontended case + .unlocked => unreachable, // we hold the lock + + .locked_once => continue :state m.state.cmpxchgWeak( + .locked_once, + .unlocked, + .release, // release lock + .acquire, // acquire any `Waiter` memory + ) orelse { + @branchHint(.likely); + return; + }, + + _ => |last_state| { + // The logic here does not have ABA problems, and does some accesses non-atomically, + // because `Waiter.next` is owned by the lock holder (that's us!) once the waiter is + // in the linked list, up until we set `Waiter.status` to `.none`. + + // Run through the waiter list to the end to ensure fairness. This is obviously not + // ideal, but it shouldn't be a big deal in practice provided the critical section + // is fairly small (so we won't get too many threads contending the mutex at once). + // There's a *chance* we could get away with a LIFO queue for our use case, but I + // don't wanna risk that. + var parent: ?*Waiter = null; + var waiter: *Waiter = last_state.waiter().?; + while (waiter.next) |next| { + parent = waiter; + waiter = next; + } + // `waiter` is next in line for the lock. Remove them from the list. + if (parent) |p| { + assert(p.next == waiter); + p.next = null; + } else { + // We're waking the last waiter, so clear the list head. + if (m.state.cmpxchgWeak( + .fromWaiter(last_state.waiter().?), + .locked_once, + .acquire, + .acquire, // acquire any new `Waiter` memory + )) |new_state| { + continue :state new_state; + } + } + // Now we're ready to actually hand the lock over to them. + const tid = waiter.tid; // load this before the store below potentially invalidates `waiter` + waiter.status.store(.{ .cancelation = .none, .awaitable = .null }, .release); // release lock + unpark(&.{tid}, m); + return; + }, + } + } +}; + +fn timeoutToWindowsInterval(timeout: Io.Timeout) ?windows.LARGE_INTEGER { + // ntdll only supports two combinations: + // * real-time (`.real`) sleeps with absolute deadlines + // * monotonic (`.awake`/`.boot`) sleeps with relative durations + const clock = switch (timeout) { + .none => return null, + .duration => |d| d.clock, + .deadline => |d| d.clock, + }; + switch (clock) { + .cpu_process, .cpu_thread => unreachable, // cannot sleep for CPU time + .real => { + const deadline = switch (timeout) { + .none => unreachable, + .duration => |d| nowWindows(clock).addDuration(d.raw), + .deadline => |d| d.raw, + }; + return @intCast(@max(@divTrunc(deadline.nanoseconds, 100), 0)); + }, + .awake, .boot => { + const duration = switch (timeout) { + .none => unreachable, + .duration => |d| d.raw, + .deadline => |d| nowWindows(clock).durationTo(d.raw), + }; + return @intCast(@min(@divTrunc(-duration.nanoseconds, 100), -1)); + }, + } +} + +fn park( + timeout: Io.Timeout, + /// This value has no semantic effect, but may allow the OS to optimize the operation. + addr_hint: ?*const anyopaque, + /// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals + /// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending). + /// As such, on these targets only, this `status` is checked to determine if an unpark is real. + /// no way to differentiate + status: *std.atomic.Value(Thread.Status), +) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (native_os) { .windows => { - var timeout_buf: windows.LARGE_INTEGER = undefined; - const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) { - .none => null, - .deadline => |timestamp| continue :timeout .{ .duration = .{ - .clock = timestamp.clock, - .raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw), - } }, - .duration => |duration| { - _ = duration.clock; // Windows only supports monotonic - timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100)); - break :timeout &timeout_buf; - }, - }; + const raw_timeout = timeoutToWindowsInterval(timeout); // `RtlWaitOnAddress` passes the futex address in as the first argument to this call, // but it's unclear what that actually does, especially since `NtAlertThreadByThreadId` // does *not* accept the address so the kernel can't really be using it as a hint. An @@ -17284,7 +17447,10 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { // this parameter). However, to err on the side of caution, let's match the behavior of // `RtlWaitOnAddress` and pass the pointer, in case the kernel ever does something // stupid such as trying to dereference it. - switch (windows.ntdll.NtWaitForAlertByThreadId(addr_hint, raw_timeout)) { + switch (windows.ntdll.NtWaitForAlertByThreadId( + addr_hint, + if (raw_timeout) |*t| t else null, + )) { .ALERTED => return, .TIMEOUT => return error.Timeout, else => unreachable, @@ -17303,19 +17469,23 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { break :timeout .{ &ts_buf, false, duration.clock == .real }; }, }; - switch (posix.errno(std.c._lwp_park( - if (clock_real) .REALTIME else .MONOTONIC, - .{ .ABSTIME = abstime }, - ts, - 0, - addr_hint, - null, - ))) { - .SUCCESS, .ALREADY, .INTR => return, - .TIMEDOUT => return error.Timeout, - .INVAL => unreachable, - .SRCH => unreachable, - else => unreachable, + // It's okay to pass the same timeout in a loop. If it's a duration, the OS actually + // writes the remaining time into the buffer when the syscall returns. + while (status.load(.monotonic).cancelation == .parked) { + switch (posix.errno(std.c._lwp_park( + if (clock_real) .REALTIME else .MONOTONIC, + .{ .ABSTIME = abstime }, + ts, + 0, + addr_hint, + null, + ))) { + .SUCCESS, .ALREADY, .INTR => {}, + .TIMEDOUT => return error.Timeout, + .INVAL => unreachable, + .SRCH => unreachable, + else => unreachable, + } } }, .illumos => @panic("TODO: illumos lwp_park"), @@ -17323,24 +17493,8 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { } } -fn deadlineToWindowsInterval(t: *Io.Threaded, deadline: Io.Clock.Timestamp) windows.LARGE_INTEGER { - // ntdll only supports two combinations: - // * real-time (`.real`) sleeps with absolute deadlines - // * monotonic (`.awake`/`.boot`) sleeps with relative durations - switch (deadline.clock) { - .cpu_process, .cpu_thread => return 0, - .real => { - return @intCast(@max(@divTrunc(deadline.raw.nanoseconds, 100), 0)); - }, - .awake, .boot => { - const duration = deadline.durationFromNow(ioBasic(t)); - return @intCast(@min(@divTrunc(-duration.raw.nanoseconds, 100), -1)); - }, - } -} - const UnparkTid = switch (native_os) { - // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles? + // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread IDs? .windows => usize, else => std.Thread.Id, };