Revert "std.Io.Threaded: work around parking futex bug"

This reverts commit 5312063138.
This commit is contained in:
Matthew Lugg 2026-02-03 19:42:54 +00:00
parent 184c8f9545
commit 56a43fb86f
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E

View file

@ -29,8 +29,8 @@ const ws2_32 = std.os.windows.ws2_32;
/// * scanning environment variables on some targets
/// * memory-mapping when mmap or equivalent is not available
allocator: Allocator,
mutex: Mutex = .init,
cond: Condition = .init,
mutex: Io.Mutex = .init,
cond: Io.Condition = .init,
run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
stack_size: usize,
@ -1505,8 +1505,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded;
pub const global_single_threaded: *Threaded = &global_single_threaded_instance;
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
t.async_limit = new_limit;
}
@ -1527,8 +1527,8 @@ pub fn deinit(t: *Threaded) void {
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
t.join_requested = true;
}
condBroadcast(&t.cond);
@ -1593,16 +1593,16 @@ fn worker(t: *Threaded) void {
defer t.wait_group.finish();
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
while (true) {
while (t.run_queue.popFirst()) |runnable_node| {
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
thread.cancel_protection = .unblocked;
const runnable: *Runnable = @fieldParentPtr("node", runnable_node);
runnable.startFn(runnable, &thread, t);
mutexLockInternal(&t.mutex);
mutexLock(&t.mutex);
t.busy_count -= 1;
}
if (t.join_requested) break;
@ -2025,12 +2025,12 @@ fn async(
},
};
mutexLockInternal(&t.mutex);
mutexLock(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2044,7 +2044,7 @@ fn async(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2054,7 +2054,7 @@ fn async(
t.run_queue.prepend(&future.runnable.node);
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
condSignal(&t.cond);
return @ptrCast(future);
}
@ -2077,8 +2077,8 @@ fn concurrent(
};
errdefer future.destroy(gpa);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
const busy_count = t.busy_count;
@ -2122,12 +2122,12 @@ fn groupAsync(
error.OutOfMemory => return groupAsyncEager(start, context.ptr),
};
mutexLockInternal(&t.mutex);
mutexLock(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
}
@ -2140,7 +2140,7 @@ fn groupAsync(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
};
@ -2157,7 +2157,7 @@ fn groupAsync(
}, .monotonic);
t.run_queue.prepend(&task.runnable.node);
mutexUnlockInternal(&t.mutex);
mutexUnlock(&t.mutex);
condSignal(&t.cond);
}
fn groupAsyncEager(
@ -2222,8 +2222,8 @@ fn groupConcurrent(
};
errdefer task.destroy(gpa);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
const busy_count = t.busy_count;
@ -3847,8 +3847,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION {
if (!t.system_basic_information.initialized.load(.acquire)) {
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
switch (windows.ntdll.NtQuerySystemInformation(
.SystemBasicInformation,
@ -14359,10 +14359,9 @@ const Wsa = struct {
};
fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void {
const t_io = io(t);
const wsa = &t.wsa;
try wsa.mutex.lock(t_io);
defer wsa.mutex.unlock(t_io);
try mutexLock(&wsa.mutex);
defer mutexUnlock(&wsa.mutex);
switch (wsa.status) {
.uninitialized => {
var wsa_data: ws2_32.WSADATA = undefined;
@ -14433,8 +14432,8 @@ const WindowsEnvironStrings = struct {
};
fn scanEnviron(t: *Threaded) void {
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.environ.initialized) return;
t.environ.initialized = true;
@ -14789,8 +14788,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
fn getDevNullFd(t: *Threaded) !posix.fd_t {
{
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.null_file.fd != -1) return t.null_file.fd;
}
const mode: u32 = 0;
@ -14801,8 +14800,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t {
.SUCCESS => {
syscall.finish();
const fresh_fd: posix.fd_t = @intCast(rc);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.null_file.fd != -1) {
posix.close(fresh_fd);
return t.null_file.fd;
@ -15462,8 +15461,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
{
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.random_file.handle) |handle| return handle;
}
@ -15497,8 +15496,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -15518,8 +15517,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
fn getNulHandle(t: *Threaded) !windows.HANDLE {
{
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.null_file.handle) |handle| return handle;
}
@ -15565,8 +15564,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.null_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -16611,15 +16610,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void {
}
fn randomMainThread(t: *Threaded, buffer: []u8) void {
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (!t.csprng.isInitialized()) {
@branchHint(.unlikely);
var seed: [Csprng.seed_len]u8 = undefined;
{
mutexUnlockInternal(&t.mutex);
defer mutexLockInternal(&t.mutex);
mutexUnlock(&t.mutex);
defer mutexLock(&t.mutex);
const prev = swapCancelProtection(t, .blocked);
defer _ = swapCancelProtection(t, prev);
@ -16804,8 +16803,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void {
fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
{
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.random_file.fd == -2) return error.EntropyUnavailable;
if (t.random_file.fd != -1) return t.random_file.fd;
@ -16845,8 +16844,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
.SUCCESS => {
syscall.finish();
if (!statx.mask.TYPE) return error.EntropyUnavailable;
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16873,8 +16872,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
switch (posix.errno(fstat_sym(fd, &stat))) {
.SUCCESS => {
syscall.finish();
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
mutexLock(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16938,7 +16937,7 @@ const parking_futex = struct {
/// avoid a race.
num_waiters: std.atomic.Value(u32),
/// Protects `waiters`.
mutex: Mutex,
mutex: Io.Mutex,
waiters: std.DoublyLinkedList,
/// Prevent false sharing between buckets.
@ -17007,8 +17006,8 @@ const parking_futex = struct {
var status_buf: std.atomic.Value(Thread.Status) = undefined;
{
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
_ = bucket.num_waiters.fetchAdd(1, .acquire);
@ -17077,8 +17076,8 @@ const parking_futex = struct {
.parked => {
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
// our responsibility to remove `waiter` from `bucket`.
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
},
@ -17117,8 +17116,8 @@ const parking_futex = struct {
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
var waking_head: ?*std.DoublyLinkedList.Node = null;
{
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
var num_removed: u32 = 0;
var it = bucket.waiters.first;
@ -17173,8 +17172,8 @@ const parking_futex = struct {
fn removeCanceledWaiter(waiter: *Waiter) void {
const bucket = bucketForAddress(waiter.address);
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
mutexLock(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
@ -18160,14 +18159,8 @@ fn eventSet(event: *Io.Event) void {
}
}
const Condition = if (!is_windows) Io.Condition else struct {
condition: windows.CONDITION_VARIABLE,
const init: @This() = .{ .condition = .{} };
};
/// Same as `Io.Condition.broadcast` but avoids the VTable.
fn condBroadcast(cond: *Condition) void {
if (is_windows) return windows.ntdll.RtlWakeAllConditionVariable(&cond.condition);
fn condBroadcast(cond: *Io.Condition) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
@ -18187,8 +18180,7 @@ fn condBroadcast(cond: *Condition) void {
}
/// Same as `Io.Condition.signal` but avoids the VTable.
fn condSignal(cond: *Condition) void {
if (is_windows) return windows.ntdll.RtlWakeConditionVariable(&cond.condition);
fn condSignal(cond: *Io.Condition) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
@ -18208,11 +18200,7 @@ fn condSignal(cond: *Condition) void {
}
/// Same as `Io.Condition.waitUncancelable` but avoids the VTable.
fn condWait(cond: *Condition, mutex: *Mutex) void {
if (is_windows) {
_ = windows.kernel32.SleepConditionVariableSRW(&cond.condition, &mutex.srwlock, windows.INFINITE, 0);
return;
}
fn condWait(cond: *Io.Condition, mutex: *Io.Mutex) void {
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
@ -18220,8 +18208,8 @@ fn condWait(cond: *Condition, mutex: *Mutex) void {
assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters
}
mutexUnlockInternal(mutex);
defer mutexLockInternal(mutex);
mutexUnlock(mutex);
defer mutexLock(mutex);
while (true) {
Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null);
@ -18241,16 +18229,6 @@ fn condWait(cond: *Condition, mutex: *Mutex) void {
}
}
const Mutex = if (!is_windows) Io.Mutex else struct {
srwlock: windows.SRWLOCK,
const init: @This() = .{ .srwlock = .{} };
};
fn mutexLockInternal(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock);
return mutexLock(m);
}
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
pub fn mutexLock(m: *Io.Mutex) void {
const initial_state = m.state.cmpxchgWeak(
@ -18270,11 +18248,6 @@ pub fn mutexLock(m: *Io.Mutex) void {
}
}
fn mutexUnlockInternal(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock);
return mutexUnlock(m);
}
/// Same as `Io.Mutex.unlock` but avoids the VTable.
pub fn mutexUnlock(m: *Io.Mutex) void {
switch (m.state.swap(.unlocked, .release)) {