diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 6255d96978..5464330bd0 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -29,8 +29,8 @@ const ws2_32 = std.os.windows.ws2_32; /// * scanning environment variables on some targets /// * memory-mapping when mmap or equivalent is not available allocator: Allocator, -mutex: Mutex = .init, -cond: Condition = .init, +mutex: Io.Mutex = .init, +cond: Io.Condition = .init, run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, stack_size: usize, @@ -1505,8 +1505,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded; pub const global_single_threaded: *Threaded = &global_single_threaded_instance; pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); t.async_limit = new_limit; } @@ -1527,8 +1527,8 @@ pub fn deinit(t: *Threaded) void { fn join(t: *Threaded) void { if (builtin.single_threaded) return; { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); t.join_requested = true; } condBroadcast(&t.cond); @@ -1593,16 +1593,16 @@ fn worker(t: *Threaded) void { defer t.wait_group.finish(); - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); while (true) { while (t.run_queue.popFirst()) |runnable_node| { - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); thread.cancel_protection = .unblocked; const runnable: *Runnable = @fieldParentPtr("node", runnable_node); runnable.startFn(runnable, &thread, t); - mutexLockInternal(&t.mutex); + mutexLock(&t.mutex); t.busy_count -= 1; } if (t.join_requested) break; @@ -2025,12 +2025,12 @@ fn async( }, }; - mutexLockInternal(&t.mutex); + mutexLock(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2044,7 +2044,7 @@ fn async( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2054,7 +2054,7 @@ fn async( t.run_queue.prepend(&future.runnable.node); - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); condSignal(&t.cond); return @ptrCast(future); } @@ -2077,8 +2077,8 @@ fn concurrent( }; errdefer future.destroy(gpa); - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -2122,12 +2122,12 @@ fn groupAsync( error.OutOfMemory => return groupAsyncEager(start, context.ptr), }; - mutexLockInternal(&t.mutex); + mutexLock(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); } @@ -2140,7 +2140,7 @@ fn groupAsync( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); }; @@ -2157,7 +2157,7 @@ fn groupAsync( }, .monotonic); t.run_queue.prepend(&task.runnable.node); - mutexUnlockInternal(&t.mutex); + mutexUnlock(&t.mutex); condSignal(&t.cond); } fn groupAsyncEager( @@ -2222,8 +2222,8 @@ fn groupConcurrent( }; errdefer task.destroy(gpa); - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -3847,8 +3847,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION { if (!t.system_basic_information.initialized.load(.acquire)) { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); switch (windows.ntdll.NtQuerySystemInformation( .SystemBasicInformation, @@ -14359,10 +14359,9 @@ const Wsa = struct { }; fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void { - const t_io = io(t); const wsa = &t.wsa; - try wsa.mutex.lock(t_io); - defer wsa.mutex.unlock(t_io); + try mutexLock(&wsa.mutex); + defer mutexUnlock(&wsa.mutex); switch (wsa.status) { .uninitialized => { var wsa_data: ws2_32.WSADATA = undefined; @@ -14433,8 +14432,8 @@ const WindowsEnvironStrings = struct { }; fn scanEnviron(t: *Threaded) void { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.environ.initialized) return; t.environ.initialized = true; @@ -14789,8 +14788,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp fn getDevNullFd(t: *Threaded) !posix.fd_t { { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) return t.null_file.fd; } const mode: u32 = 0; @@ -14801,8 +14800,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t { .SUCCESS => { syscall.finish(); const fresh_fd: posix.fd_t = @intCast(rc); - mutexLockInternal(&t.mutex); // Another thread might have won the race. - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) { posix.close(fresh_fd); return t.null_file.fd; @@ -15462,8 +15461,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |handle| return handle; } @@ -15497,8 +15496,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - mutexLockInternal(&t.mutex); // Another thread might have won the race. - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -15518,8 +15517,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { fn getNulHandle(t: *Threaded) !windows.HANDLE { { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |handle| return handle; } @@ -15565,8 +15564,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - mutexLockInternal(&t.mutex); // Another thread might have won the race. - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -16611,15 +16610,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void { } fn randomMainThread(t: *Threaded, buffer: []u8) void { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (!t.csprng.isInitialized()) { @branchHint(.unlikely); var seed: [Csprng.seed_len]u8 = undefined; { - mutexUnlockInternal(&t.mutex); - defer mutexLockInternal(&t.mutex); + mutexUnlock(&t.mutex); + defer mutexLock(&t.mutex); const prev = swapCancelProtection(t, .blocked); defer _ = swapCancelProtection(t, prev); @@ -16804,8 +16803,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void { fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { { - mutexLockInternal(&t.mutex); - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.fd == -2) return error.EntropyUnavailable; if (t.random_file.fd != -1) return t.random_file.fd; @@ -16845,8 +16844,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { .SUCCESS => { syscall.finish(); if (!statx.mask.TYPE) return error.EntropyUnavailable; - mutexLockInternal(&t.mutex); // Another thread might have won the race. - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16873,8 +16872,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { switch (posix.errno(fstat_sym(fd, &stat))) { .SUCCESS => { syscall.finish(); - mutexLockInternal(&t.mutex); // Another thread might have won the race. - defer mutexUnlockInternal(&t.mutex); + mutexLock(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16938,7 +16937,7 @@ const parking_futex = struct { /// avoid a race. num_waiters: std.atomic.Value(u32), /// Protects `waiters`. - mutex: Mutex, + mutex: Io.Mutex, waiters: std.DoublyLinkedList, /// Prevent false sharing between buckets. @@ -17007,8 +17006,8 @@ const parking_futex = struct { var status_buf: std.atomic.Value(Thread.Status) = undefined; { - mutexLockInternal(&bucket.mutex); - defer mutexUnlockInternal(&bucket.mutex); + mutexLock(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); _ = bucket.num_waiters.fetchAdd(1, .acquire); @@ -17077,8 +17076,8 @@ const parking_futex = struct { .parked => { // We saw a timeout and updated our own status from `.parked` to `.none`. It is // our responsibility to remove `waiter` from `bucket`. - mutexLockInternal(&bucket.mutex); - defer mutexUnlockInternal(&bucket.mutex); + mutexLock(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); }, @@ -17117,8 +17116,8 @@ const parking_futex = struct { // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. var waking_head: ?*std.DoublyLinkedList.Node = null; { - mutexLockInternal(&bucket.mutex); - defer mutexUnlockInternal(&bucket.mutex); + mutexLock(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); var num_removed: u32 = 0; var it = bucket.waiters.first; @@ -17173,8 +17172,8 @@ const parking_futex = struct { fn removeCanceledWaiter(waiter: *Waiter) void { const bucket = bucketForAddress(waiter.address); - mutexLockInternal(&bucket.mutex); - defer mutexUnlockInternal(&bucket.mutex); + mutexLock(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); waiter.done.store(true, .release); // potentially invalidates `waiter.*` @@ -18160,14 +18159,8 @@ fn eventSet(event: *Io.Event) void { } } -const Condition = if (!is_windows) Io.Condition else struct { - condition: windows.CONDITION_VARIABLE, - const init: @This() = .{ .condition = .{} }; -}; - /// Same as `Io.Condition.broadcast` but avoids the VTable. -fn condBroadcast(cond: *Condition) void { - if (is_windows) return windows.ntdll.RtlWakeAllConditionVariable(&cond.condition); +fn condBroadcast(cond: *Io.Condition) void { var prev_state = cond.state.load(.monotonic); while (prev_state.waiters > prev_state.signals) { @branchHint(.unlikely); @@ -18187,8 +18180,7 @@ fn condBroadcast(cond: *Condition) void { } /// Same as `Io.Condition.signal` but avoids the VTable. -fn condSignal(cond: *Condition) void { - if (is_windows) return windows.ntdll.RtlWakeConditionVariable(&cond.condition); +fn condSignal(cond: *Io.Condition) void { var prev_state = cond.state.load(.monotonic); while (prev_state.waiters > prev_state.signals) { @branchHint(.unlikely); @@ -18208,11 +18200,7 @@ fn condSignal(cond: *Condition) void { } /// Same as `Io.Condition.waitUncancelable` but avoids the VTable. -fn condWait(cond: *Condition, mutex: *Mutex) void { - if (is_windows) { - _ = windows.kernel32.SleepConditionVariableSRW(&cond.condition, &mutex.srwlock, windows.INFINITE, 0); - return; - } +fn condWait(cond: *Io.Condition, mutex: *Io.Mutex) void { var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load { @@ -18220,8 +18208,8 @@ fn condWait(cond: *Condition, mutex: *Mutex) void { assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters } - mutexUnlockInternal(mutex); - defer mutexLockInternal(mutex); + mutexUnlock(mutex); + defer mutexLock(mutex); while (true) { Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null); @@ -18241,16 +18229,6 @@ fn condWait(cond: *Condition, mutex: *Mutex) void { } } -const Mutex = if (!is_windows) Io.Mutex else struct { - srwlock: windows.SRWLOCK, - const init: @This() = .{ .srwlock = .{} }; -}; - -fn mutexLockInternal(m: *Mutex) void { - if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock); - return mutexLock(m); -} - /// Same as `Io.Mutex.lockUncancelable` but avoids the VTable. pub fn mutexLock(m: *Io.Mutex) void { const initial_state = m.state.cmpxchgWeak( @@ -18270,11 +18248,6 @@ pub fn mutexLock(m: *Io.Mutex) void { } } -fn mutexUnlockInternal(m: *Mutex) void { - if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock); - return mutexUnlock(m); -} - /// Same as `Io.Mutex.unlock` but avoids the VTable. pub fn mutexUnlock(m: *Io.Mutex) void { switch (m.state.swap(.unlocked, .release)) {