Io.Threaded: add ParkingMutex, and deal with spurious unparks on NetBSD

We can't use Io.Mutex in parking_futex; instead, we need a simple
parking-based mutex implementation. That's fairly simple to do.

Also deal with spurious unparks on NetBSD, where they *can* happen (as
opposed to Windows, where they cannot).
This commit is contained in:
Matthew Lugg 2026-02-03 22:14:42 +00:00
parent 7c08f77efa
commit 6d6532dd9e
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E

View file

@ -2662,7 +2662,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
while (b.pending.head != .none and b.completions.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
break :interval t.deadlineToWindowsInterval(d);
break :interval timeoutToWindowsInterval(.{ .deadline = d }).?;
};
const alertable_syscall = try AlertableSyscall.start();
const delay_rc = windows.ntdll.NtDelayExecution(windows.TRUE, &delay_interval);
@ -4339,7 +4339,10 @@ fn dirCreateFileWindows(
// kernel bug with retry attempts.
syscall.finish();
if (max_attempts - attempt == 0) return error.FileBusy;
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
try parking_sleep.sleep(.{ .duration = .{
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
.clock = .awake,
} });
attempt += 1;
syscall = try .start();
continue;
@ -4352,7 +4355,10 @@ fn dirCreateFileWindows(
// fixed by sleeping and retrying until the error goes away.
syscall.finish();
if (max_attempts - attempt == 0) return error.FileBusy;
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
try parking_sleep.sleep(.{ .duration = .{
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
.clock = .awake,
} });
attempt += 1;
syscall = try .start();
continue;
@ -7382,7 +7388,10 @@ fn dirReadLinkWindows(dir: Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLink
// kernel bug with retry attempts.
syscall.finish();
if (max_attempts - attempt == 0) return error.FileBusy;
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
try parking_sleep.sleep(.{ .duration = .{
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
.clock = .awake,
} });
attempt += 1;
syscall = try .start();
continue;
@ -7395,7 +7404,10 @@ fn dirReadLinkWindows(dir: Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLink
// fixed by sleeping and retrying until the error goes away.
syscall.finish();
if (max_attempts - attempt == 0) return error.FileBusy;
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
try parking_sleep.sleep(.{ .duration = .{
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
.clock = .awake,
} });
attempt += 1;
syscall = try .start();
continue;
@ -10956,7 +10968,7 @@ fn nowWasi(clock: Io.Clock) Io.Timestamp {
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (timeout == .none) return;
if (use_parking_sleep) return parking_sleep.sleep(timeout.toTimestamp(ioBasic(t)));
if (use_parking_sleep) return parking_sleep.sleep(timeout);
if (native_os == .wasi) return sleepWasi(t, timeout);
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);
return sleepNanosleep(t, timeout);
@ -14363,7 +14375,7 @@ const Wsa = struct {
fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void {
const wsa = &t.wsa;
try mutexLock(&wsa.mutex);
mutexLock(&wsa.mutex);
defer mutexUnlock(&wsa.mutex);
switch (wsa.status) {
.uninitialized => {
@ -16943,7 +16955,7 @@ const parking_futex = struct {
/// avoid a race.
num_waiters: std.atomic.Value(u32),
/// Protects `waiters`.
mutex: Io.Mutex,
mutex: ParkingMutex,
waiters: std.DoublyLinkedList,
/// Prevent false sharing between buckets.
@ -17007,8 +17019,8 @@ const parking_futex = struct {
var status_buf: std.atomic.Value(Thread.Status) = undefined;
{
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.mutex.lock();
defer bucket.mutex.unlock();
_ = bucket.num_waiters.fetchAdd(1, .acquire);
@ -17059,7 +17071,7 @@ const parking_futex = struct {
bucket.waiters.append(&waiter.node);
}
if (park(timeout, ptr)) {
if (park(timeout, ptr, waiter.thread_status)) {
// We were unparked by either `wake` or cancelation, so our current status is either
// `.none` or `.canceling`. In either case, they've already removed `waiter` from
// `bucket`, so we have nothing more to do!
@ -17084,7 +17096,7 @@ const parking_futex = struct {
// to unpark us. Whoever did that will remove us from `bucket`. Wait for
// that (and drop the unpark request in doing so).
// New status is `.none` or `.canceling` respectively.
park(.none, ptr) catch |e| switch (e) {
park(.none, ptr, waiter.thread_status) catch |e| switch (e) {
error.Timeout => unreachable,
};
},
@ -17114,8 +17126,8 @@ const parking_futex = struct {
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
var waking_head: ?*std.DoublyLinkedList.Node = null;
{
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.mutex.lock();
defer bucket.mutex.unlock();
var num_removed: u32 = 0;
var it = bucket.waiters.first;
@ -17171,8 +17183,8 @@ const parking_futex = struct {
fn removeCanceledWaiter(waiter: *Waiter) void {
const bucket = bucketForAddress(waiter.address);
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.mutex.lock();
defer bucket.mutex.unlock();
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
}
@ -17206,7 +17218,7 @@ const parking_sleep = struct {
.blocked_canceling => unreachable,
}
}
if (park(timeout, null)) {
if (park(timeout, null, &thread.status)) {
// The only reason this could possibly happen is cancelation.
const old_status = thread.status.load(.monotonic);
assert(old_status.cancelation == .canceling);
@ -17229,7 +17241,7 @@ const parking_sleep = struct {
// us for a cancelation. Whoever did that will have called `unpark`, so
// drop that unpark request by waiting for it.
// Status is still `.canceling`.
park(.none, null) catch |e| switch (e) {
park(.none, null, &thread.status) catch |e| switch (e) {
error.Timeout => unreachable,
};
return;
@ -17245,32 +17257,183 @@ const parking_sleep = struct {
}
}
// Uncancelable sleep; we expect not to be manually unparked.
if (park(timeout, null)) {
var dummy_status: std.atomic.Value(Thread.Status) = .init(.{ .cancelation = .parked, .awaitable = .null });
if (park(timeout, null, &dummy_status)) {
unreachable; // unexpected unpark
} else |err| switch (err) {
error.Timeout => return,
}
}
};
const ParkingMutex = struct {
state: std.atomic.Value(State),
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
const init: ParkingMutex = .{ .state = .init(.unlocked) };
comptime {
assert(use_parking_futex);
}
const State = enum(usize) {
unlocked = 1,
/// This value is intentionally 0 so that `waiter` returns `null`.
locked_once = 0,
/// Contended; value is a `*Waiter`.
_,
/// Returns the head of the waiter list. Illegal to call if `s == .unlocked`.
fn waiter(s: State) ?*Waiter {
return @ptrFromInt(@intFromEnum(s));
}
/// Returns a locked state where `w` is contending the lock.
/// If `w` is `null`, returns `.locked_once`.
fn fromWaiter(w: ?*Waiter) State {
return @enumFromInt(@intFromPtr(w));
}
};
const Waiter = struct {
status: std.atomic.Value(Thread.Status),
/// Never modified once the `Waiter` is in the linked list.
next: ?*Waiter,
/// Never modified once the `Waiter` is in the linked list.
tid: std.Thread.Id,
};
fn lock(m: *ParkingMutex) void {
state: switch (State.unlocked) { // assume 'unlocked' to optimize for uncontended case
.unlocked => continue :state m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire, // acquire lock
.monotonic,
) orelse {
@branchHint(.likely);
return;
},
.locked_once, _ => |last_state| {
const old_waiter = last_state.waiter();
const self_tid = if (Thread.current) |t| t.id else std.Thread.getCurrentId();
var waiter: Waiter = .{
.next = old_waiter,
.status = .init(.{ .cancelation = .parked, .awaitable = .null }),
.tid = self_tid,
};
if (m.state.cmpxchgWeak(
.fromWaiter(old_waiter),
.fromWaiter(&waiter),
.release, // release `waiter`
.monotonic,
)) |new_state| {
continue :state new_state;
}
// We're now in the list of waiters---park until we're given the lock.
park(.none, m, &waiter.status) catch |err| switch (err) {
error.Timeout => unreachable,
};
// We now hold the lock.
assert(waiter.status.load(.monotonic).cancelation == .none);
return;
},
}
}
fn unlock(m: *ParkingMutex) void {
state: switch (State.locked_once) { // assume 'locked_once' to optimize for uncontended case
.unlocked => unreachable, // we hold the lock
.locked_once => continue :state m.state.cmpxchgWeak(
.locked_once,
.unlocked,
.release, // release lock
.acquire, // acquire any `Waiter` memory
) orelse {
@branchHint(.likely);
return;
},
_ => |last_state| {
// The logic here does not have ABA problems, and does some accesses non-atomically,
// because `Waiter.next` is owned by the lock holder (that's us!) once the waiter is
// in the linked list, up until we set `Waiter.status` to `.none`.
// Run through the waiter list to the end to ensure fairness. This is obviously not
// ideal, but it shouldn't be a big deal in practice provided the critical section
// is fairly small (so we won't get too many threads contending the mutex at once).
// There's a *chance* we could get away with a LIFO queue for our use case, but I
// don't wanna risk that.
var parent: ?*Waiter = null;
var waiter: *Waiter = last_state.waiter().?;
while (waiter.next) |next| {
parent = waiter;
waiter = next;
}
// `waiter` is next in line for the lock. Remove them from the list.
if (parent) |p| {
assert(p.next == waiter);
p.next = null;
} else {
// We're waking the last waiter, so clear the list head.
if (m.state.cmpxchgWeak(
.fromWaiter(last_state.waiter().?),
.locked_once,
.acquire,
.acquire, // acquire any new `Waiter` memory
)) |new_state| {
continue :state new_state;
}
}
// Now we're ready to actually hand the lock over to them.
const tid = waiter.tid; // load this before the store below potentially invalidates `waiter`
waiter.status.store(.{ .cancelation = .none, .awaitable = .null }, .release); // release lock
unpark(&.{tid}, m);
return;
},
}
}
};
fn timeoutToWindowsInterval(timeout: Io.Timeout) ?windows.LARGE_INTEGER {
// ntdll only supports two combinations:
// * real-time (`.real`) sleeps with absolute deadlines
// * monotonic (`.awake`/`.boot`) sleeps with relative durations
const clock = switch (timeout) {
.none => return null,
.duration => |d| d.clock,
.deadline => |d| d.clock,
};
switch (clock) {
.cpu_process, .cpu_thread => unreachable, // cannot sleep for CPU time
.real => {
const deadline = switch (timeout) {
.none => unreachable,
.duration => |d| nowWindows(clock).addDuration(d.raw),
.deadline => |d| d.raw,
};
return @intCast(@max(@divTrunc(deadline.nanoseconds, 100), 0));
},
.awake, .boot => {
const duration = switch (timeout) {
.none => unreachable,
.duration => |d| d.raw,
.deadline => |d| nowWindows(clock).durationTo(d.raw),
};
return @intCast(@min(@divTrunc(-duration.nanoseconds, 100), -1));
},
}
}
fn park(
timeout: Io.Timeout,
/// This value has no semantic effect, but may allow the OS to optimize the operation.
addr_hint: ?*const anyopaque,
/// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals
/// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending).
/// As such, on these targets only, this `status` is checked to determine if an unpark is real.
/// no way to differentiate
status: *std.atomic.Value(Thread.Status),
) error{Timeout}!void {
comptime assert(use_parking_futex or use_parking_sleep);
switch (native_os) {
.windows => {
var timeout_buf: windows.LARGE_INTEGER = undefined;
const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) {
.none => null,
.deadline => |timestamp| continue :timeout .{ .duration = .{
.clock = timestamp.clock,
.raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw),
} },
.duration => |duration| {
_ = duration.clock; // Windows only supports monotonic
timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100));
break :timeout &timeout_buf;
},
};
const raw_timeout = timeoutToWindowsInterval(timeout);
// `RtlWaitOnAddress` passes the futex address in as the first argument to this call,
// but it's unclear what that actually does, especially since `NtAlertThreadByThreadId`
// does *not* accept the address so the kernel can't really be using it as a hint. An
@ -17284,7 +17447,10 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
// this parameter). However, to err on the side of caution, let's match the behavior of
// `RtlWaitOnAddress` and pass the pointer, in case the kernel ever does something
// stupid such as trying to dereference it.
switch (windows.ntdll.NtWaitForAlertByThreadId(addr_hint, raw_timeout)) {
switch (windows.ntdll.NtWaitForAlertByThreadId(
addr_hint,
if (raw_timeout) |*t| t else null,
)) {
.ALERTED => return,
.TIMEOUT => return error.Timeout,
else => unreachable,
@ -17303,19 +17469,23 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
break :timeout .{ &ts_buf, false, duration.clock == .real };
},
};
switch (posix.errno(std.c._lwp_park(
if (clock_real) .REALTIME else .MONOTONIC,
.{ .ABSTIME = abstime },
ts,
0,
addr_hint,
null,
))) {
.SUCCESS, .ALREADY, .INTR => return,
.TIMEDOUT => return error.Timeout,
.INVAL => unreachable,
.SRCH => unreachable,
else => unreachable,
// It's okay to pass the same timeout in a loop. If it's a duration, the OS actually
// writes the remaining time into the buffer when the syscall returns.
while (status.load(.monotonic).cancelation == .parked) {
switch (posix.errno(std.c._lwp_park(
if (clock_real) .REALTIME else .MONOTONIC,
.{ .ABSTIME = abstime },
ts,
0,
addr_hint,
null,
))) {
.SUCCESS, .ALREADY, .INTR => {},
.TIMEDOUT => return error.Timeout,
.INVAL => unreachable,
.SRCH => unreachable,
else => unreachable,
}
}
},
.illumos => @panic("TODO: illumos lwp_park"),
@ -17323,24 +17493,8 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
}
}
fn deadlineToWindowsInterval(t: *Io.Threaded, deadline: Io.Clock.Timestamp) windows.LARGE_INTEGER {
// ntdll only supports two combinations:
// * real-time (`.real`) sleeps with absolute deadlines
// * monotonic (`.awake`/`.boot`) sleeps with relative durations
switch (deadline.clock) {
.cpu_process, .cpu_thread => return 0,
.real => {
return @intCast(@max(@divTrunc(deadline.raw.nanoseconds, 100), 0));
},
.awake, .boot => {
const duration = deadline.durationFromNow(ioBasic(t));
return @intCast(@min(@divTrunc(-duration.raw.nanoseconds, 100), -1));
},
}
}
const UnparkTid = switch (native_os) {
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread handles?
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread IDs?
.windows => usize,
else => std.Thread.Id,
};