std.Io.Threaded: use _lwp_park correctly for real this time?

This commit is contained in:
Matthew Lugg 2026-02-04 17:59:21 +00:00 committed by Alex Rønne Petersen
parent fcdde3e4c7
commit a816f9e245
No known key found for this signature in database

View file

@ -628,6 +628,7 @@ const Thread = struct {
cancel_protection: Io.CancelProtection,
/// Always released when `Status.cancelation` is set to `.parked`.
futex_waiter: if (use_parking_futex) ?*parking_futex.Waiter else ?noreturn,
unpark_flag: UnparkFlag,
csprng: Csprng,
@ -1018,6 +1019,7 @@ const Thread = struct {
if (thread.futex_waiter) |futex_waiter| {
parking_futex.removeCanceledWaiter(futex_waiter);
}
if (need_unpark_flag) setUnparkFlag(&thread.unpark_flag);
unpark(&.{thread.id}, null);
return false;
},
@ -1559,6 +1561,7 @@ fn worker(t: *Threaded) void {
}),
.cancel_protection = .unblocked,
.futex_waiter = undefined,
.unpark_flag = unpark_flag_init,
.csprng = .{},
};
Thread.current = &thread;
@ -17007,6 +17010,7 @@ const parking_futex = struct {
/// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope
/// while it is still in the `Bucket`).
thread_status: *std.atomic.Value(Thread.Status),
unpark_flag: if (need_unpark_flag) *UnparkFlag else void,
};
fn bucketForAddress(address: usize) *Bucket {
@ -17045,9 +17049,11 @@ const parking_futex = struct {
.address = @intFromPtr(ptr),
.tid = self_tid,
.thread_status = undefined, // populated in critical section
.unpark_flag = undefined, // populated in critical section
};
var status_buf: std.atomic.Value(Thread.Status) = undefined;
var unpark_flag_buf: UnparkFlag = unpark_flag_init;
{
bucket.mutex.lock();
@ -17062,7 +17068,7 @@ const parking_futex = struct {
// This is in the critical section to avoid marking the thread as parked until we're
// certain that we're actually going to park.
waiter.thread_status = status: {
waiter.thread_status, waiter.unpark_flag = status: {
cancelable: {
if (uncancelable) break :cancelable;
const thread = opt_thread orelse break :cancelable;
@ -17090,19 +17096,19 @@ const parking_futex = struct {
.blocked_canceling => unreachable,
}
// We could now be unparked for a cancelation at any time!
break :status &thread.status;
break :status .{ &thread.status, if (need_unpark_flag) &thread.unpark_flag };
}
// This is an uncancelable wait, so just use `status_buf`. Note that the value of
// `status_buf.awaitable` is irrelevant because this is only visible to futex code,
// while only cancelation cares about `awaitable`.
status_buf.raw = .{ .cancelation = .parked, .awaitable = .null };
break :status &status_buf;
break :status .{ &status_buf, if (need_unpark_flag) &unpark_flag_buf };
};
bucket.waiters.append(&waiter.node);
}
if (park(timeout, ptr, waiter.thread_status)) {
if (park(timeout, ptr, waiter.unpark_flag)) {
// We were unparked by either `wake` or cancelation, so our current status is either
// `.none` or `.canceling`. In either case, they've already removed `waiter` from
// `bucket`, so we have nothing more to do!
@ -17127,7 +17133,7 @@ const parking_futex = struct {
// to unpark us. Whoever did that will remove us from `bucket`. Wait for
// that (and drop the unpark request in doing so).
// New status is `.none` or `.canceling` respectively.
park(.none, ptr, waiter.thread_status) catch |e| switch (e) {
park(.none, ptr, waiter.unpark_flag) catch |e| switch (e) {
error.Timeout => unreachable,
};
},
@ -17201,6 +17207,7 @@ const parking_futex = struct {
waking_head = node.next;
const waiter: *Waiter = @fieldParentPtr("node", node);
unpark_buf[unpark_len] = waiter.tid;
if (need_unpark_flag) setUnparkFlag(waiter.unpark_flag);
unpark_len += 1;
if (unpark_len == unpark_buf.len) {
unpark(&unpark_buf, ptr);
@ -17249,7 +17256,7 @@ const parking_sleep = struct {
.blocked_canceling => unreachable,
}
}
if (park(timeout, null, &thread.status)) {
if (park(timeout, null, if (need_unpark_flag) &thread.unpark_flag)) {
// The only reason this could possibly happen is cancelation.
const old_status = thread.status.load(.monotonic);
assert(old_status.cancelation == .canceling);
@ -17272,7 +17279,7 @@ const parking_sleep = struct {
// us for a cancelation. Whoever did that will have called `unpark`, so
// drop that unpark request by waiting for it.
// Status is still `.canceling`.
park(.none, null, &thread.status) catch |e| switch (e) {
park(.none, null, if (need_unpark_flag) &thread.unpark_flag) catch |e| switch (e) {
error.Timeout => unreachable,
};
return;
@ -17288,8 +17295,8 @@ const parking_sleep = struct {
}
}
// Uncancelable sleep; we expect not to be manually unparked.
var dummy_status: std.atomic.Value(Thread.Status) = .init(.{ .cancelation = .parked, .awaitable = .null });
if (park(timeout, null, &dummy_status)) {
var dummy_flag: UnparkFlag = unpark_flag_init;
if (park(timeout, null, if (need_unpark_flag) &dummy_flag)) {
unreachable; // unexpected unpark
} else |err| switch (err) {
error.Timeout => return,
@ -17322,7 +17329,7 @@ const ParkingMutex = struct {
}
};
const Waiter = struct {
status: std.atomic.Value(Thread.Status),
unpark_flag: UnparkFlag,
/// Never modified once the `Waiter` is in the linked list.
next: ?*Waiter,
/// Never modified once the `Waiter` is in the linked list.
@ -17345,7 +17352,7 @@ const ParkingMutex = struct {
const self_tid = if (Thread.current) |t| t.id else std.Thread.getCurrentId();
var waiter: Waiter = .{
.next = old_waiter,
.status = .init(.{ .cancelation = .parked, .awaitable = .null }),
.unpark_flag = unpark_flag_init,
.tid = self_tid,
};
if (m.state.cmpxchgWeak(
@ -17357,11 +17364,9 @@ const ParkingMutex = struct {
continue :state new_state;
}
// We're now in the list of waiters---park until we're given the lock.
park(.none, m, &waiter.status) catch |err| switch (err) {
park(.none, m, if (need_unpark_flag) &waiter.unpark_flag) catch |err| switch (err) {
error.Timeout => unreachable,
};
// We now hold the lock.
assert(waiter.status.load(.monotonic).cancelation == .none);
return;
},
}
@ -17383,7 +17388,7 @@ const ParkingMutex = struct {
_ => |last_state| {
// The logic here does not have ABA problems, and does some accesses non-atomically,
// because `Waiter.next` is owned by the lock holder (that's us!) once the waiter is
// in the linked list, up until we set `Waiter.status` to `.none`.
// in the linked list, up until we unpark the waiter.
// Run through the waiter list to the end to ensure fairness. This is obviously not
// ideal, but it shouldn't be a big deal in practice provided the critical section
@ -17412,8 +17417,8 @@ const ParkingMutex = struct {
}
}
// Now we're ready to actually hand the lock over to them.
const tid = waiter.tid; // load this before the store below potentially invalidates `waiter`
waiter.status.store(.{ .cancelation = .none, .awaitable = .null }, .release); // release lock
const tid = waiter.tid; // load before the unpark below potentially invalidates `waiter`
if (need_unpark_flag) setUnparkFlag(&waiter.unpark_flag);
unpark(&.{tid}, m);
return;
},
@ -17451,15 +17456,34 @@ fn timeoutToWindowsInterval(timeout: Io.Timeout) ?windows.LARGE_INTEGER {
}
}
/// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals
/// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending).
/// As such, on these targets only, we need to pass around a flag to track whether a thread is
/// "actually" being unparked.
const need_unpark_flag = switch (native_os) {
.netbsd, .illumos => true,
else => false,
};
const UnparkFlag = if (need_unpark_flag) std.atomic.Value(bool) else void;
const unpark_flag_init: UnparkFlag = if (need_unpark_flag) .init(false);
/// Must be called before `unpark`. After this function is called, the thread may be unparked at any
/// time, so the caller must not reference values on its stack.
fn setUnparkFlag(f: *UnparkFlag) void {
f.store(true, .release);
}
/// The type passed into `unpark` for the thread ID. You'd think this was just a `std.Thread.Id`,
/// but it seems that someone at Microsoft forgot how big their TIDs are supposed to be.
const UnparkTid = switch (native_os) {
.windows => usize,
else => std.Thread.Id,
};
fn park(
timeout: Io.Timeout,
/// This value has no semantic effect, but may allow the OS to optimize the operation.
addr_hint: ?*const anyopaque,
/// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals
/// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending).
/// As such, on these targets only, this `status` is checked to determine if an unpark is real.
/// no way to differentiate
status: *std.atomic.Value(Thread.Status),
unpark_flag: if (need_unpark_flag) *UnparkFlag else void,
) error{Timeout}!void {
comptime assert(use_parking_futex or use_parking_sleep);
switch (native_os) {
@ -17502,7 +17526,7 @@ fn park(
};
// It's okay to pass the same timeout in a loop. If it's a duration, the OS actually
// writes the remaining time into the buffer when the syscall returns.
while (status.load(.monotonic).cancelation == .parked) {
while (!unpark_flag.swap(false, .acquire)) {
switch (posix.errno(std.c._lwp_park(
if (clock_real) .REALTIME else .MONOTONIC,
.{ .ABSTIME = abstime },
@ -17523,12 +17547,6 @@ fn park(
else => comptime unreachable,
}
}
const UnparkTid = switch (native_os) {
// `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread IDs?
.windows => usize,
else => std.Thread.Id,
};
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
fn unpark(tids: []const UnparkTid, addr_hint: ?*const anyopaque) void {
comptime assert(use_parking_futex or use_parking_sleep);