diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 4b1ebbc59b..60df1bdd81 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -29,8 +29,8 @@ const ws2_32 = std.os.windows.ws2_32; /// * scanning environment variables on some targets /// * memory-mapping when mmap or equivalent is not available allocator: Allocator, -mutex: std.Thread.Mutex = .{}, -cond: std.Thread.Condition = .{}, +mutex: Mutex = .init, +cond: Condition = .init, run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, stack_size: usize, @@ -1486,8 +1486,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded; pub const global_single_threaded: *Threaded = &global_single_threaded_instance; pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); t.async_limit = new_limit; } @@ -1508,11 +1508,11 @@ pub fn deinit(t: *Threaded) void { fn join(t: *Threaded) void { if (builtin.single_threaded) return; { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); t.join_requested = true; } - t.cond.broadcast(); + condBroadcast(&t.cond); t.wait_group.wait(); } @@ -1574,20 +1574,20 @@ fn worker(t: *Threaded) void { defer t.wait_group.finish(); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); while (true) { while (t.run_queue.popFirst()) |runnable_node| { - t.mutex.unlock(); + mutexUnlock(&t.mutex); thread.cancel_protection = .unblocked; const runnable: *Runnable = @fieldParentPtr("node", runnable_node); runnable.startFn(runnable, &thread, t); - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); t.busy_count -= 1; } if (t.join_requested) break; - t.cond.wait(&t.mutex); + condWait(&t.cond, &t.mutex); } } @@ -2004,12 +2004,12 @@ fn async( }, }; - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - t.mutex.unlock(); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2023,7 +2023,7 @@ fn async( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - t.mutex.unlock(); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2033,8 +2033,8 @@ fn async( t.run_queue.prepend(&future.runnable.node); - t.mutex.unlock(); - t.cond.signal(); + mutexUnlock(&t.mutex); + condSignal(&t.cond); return @ptrCast(future); } @@ -2056,8 +2056,8 @@ fn concurrent( }; errdefer future.destroy(gpa); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -2080,7 +2080,7 @@ fn concurrent( t.run_queue.prepend(&future.runnable.node); - t.cond.signal(); + condSignal(&t.cond); return @ptrCast(future); } @@ -2101,12 +2101,12 @@ fn groupAsync( error.OutOfMemory => return groupAsyncEager(start, context.ptr), }; - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - t.mutex.unlock(); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); } @@ -2119,7 +2119,7 @@ fn groupAsync( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - t.mutex.unlock(); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); }; @@ -2136,8 +2136,8 @@ fn groupAsync( }, .monotonic); t.run_queue.prepend(&task.runnable.node); - t.mutex.unlock(); - t.cond.signal(); + mutexUnlock(&t.mutex); + condSignal(&t.cond); } fn groupAsyncEager( start: *const fn (context: *const anyopaque) Io.Cancelable!void, @@ -2201,8 +2201,8 @@ fn groupConcurrent( }; errdefer task.destroy(gpa); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -2233,7 +2233,7 @@ fn groupConcurrent( }, .monotonic); t.run_queue.prepend(&task.runnable.node); - t.cond.signal(); + condSignal(&t.cond); } fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { @@ -3838,8 +3838,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION { if (!t.system_basic_information.initialized.load(.acquire)) { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); switch (windows.ntdll.NtQuerySystemInformation( .SystemBasicInformation, @@ -14373,8 +14373,8 @@ const WindowsEnvironStrings = struct { }; fn scanEnviron(t: *Threaded) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.environ.initialized) return; t.environ.initialized = true; @@ -14729,8 +14729,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp fn getDevNullFd(t: *Threaded) !posix.fd_t { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) return t.null_file.fd; } const mode: u32 = 0; @@ -14741,8 +14741,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t { .SUCCESS => { syscall.finish(); const fresh_fd: posix.fd_t = @intCast(rc); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) { posix.close(fresh_fd); return t.null_file.fd; @@ -15402,8 +15402,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |handle| return handle; } @@ -15437,8 +15437,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -15458,8 +15458,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { fn getNulHandle(t: *Threaded) !windows.HANDLE { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |handle| return handle; } @@ -15505,8 +15505,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -16551,15 +16551,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void { } fn randomMainThread(t: *Threaded, buffer: []u8) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (!t.csprng.isInitialized()) { @branchHint(.unlikely); var seed: [Csprng.seed_len]u8 = undefined; { - t.mutex.unlock(); - defer t.mutex.lock(); + mutexUnlock(&t.mutex); + defer mutexLockUncancelable(&t.mutex); const prev = swapCancelProtection(t, .blocked); defer _ = swapCancelProtection(t, prev); @@ -16744,8 +16744,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void { fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.fd == -2) return error.EntropyUnavailable; if (t.random_file.fd != -1) return t.random_file.fd; @@ -16785,8 +16785,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { .SUCCESS => { syscall.finish(); if (!statx.mask.TYPE) return error.EntropyUnavailable; - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16813,8 +16813,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { switch (posix.errno(fstat_sym(fd, &stat))) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16878,13 +16878,13 @@ const parking_futex = struct { /// avoid a race. num_waiters: std.atomic.Value(u32), /// Protects `waiters`. - mutex: std.Thread.Mutex, + mutex: Mutex, waiters: std.DoublyLinkedList, /// Prevent false sharing between buckets. _: void align(std.atomic.cache_line) = {}, - const init: Bucket = .{ .num_waiters = .init(0), .mutex = .{}, .waiters = .{} }; + const init: Bucket = .{ .num_waiters = .init(0), .mutex = .init, .waiters = .{} }; }; const Waiter = struct { @@ -16947,8 +16947,8 @@ const parking_futex = struct { var status_buf: std.atomic.Value(Thread.Status) = undefined; { - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); _ = bucket.num_waiters.fetchAdd(1, .acquire); @@ -17017,8 +17017,8 @@ const parking_futex = struct { .parked => { // We saw a timeout and updated our own status from `.parked` to `.none`. It is // our responsibility to remove `waiter` from `bucket`. - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); }, @@ -17057,8 +17057,8 @@ const parking_futex = struct { // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. var waking_head: ?*std.DoublyLinkedList.Node = null; { - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); var num_removed: u32 = 0; var it = bucket.waiters.first; @@ -17113,8 +17113,8 @@ const parking_futex = struct { fn removeCanceledWaiter(waiter: *Waiter) void { const bucket = bucketForAddress(waiter.address); - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); waiter.done.store(true, .release); // potentially invalidates `waiter.*` @@ -18102,3 +18102,141 @@ fn eventSet(event: *Io.Event) void { .waiting => Thread.futexWake(@ptrCast(event), std.math.maxInt(u32)), } } + +const Condition = if (!is_windows) Io.Condition else struct { + condition: windows.CONDITION_VARIABLE, + const init: @This() = .{ .condition = .{} }; +}; + +/// Same as `Io.Condition.broadcast` but avoids the VTable. +fn condBroadcast(cond: *Condition) void { + if (is_windows) return windows.ntdll.RtlWakeAllConditionVariable(&cond.condition); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.waiters, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + Thread.futexWake(&cond.epoch.raw, prev_state.waiters - prev_state.signals); + return; + }; + } +} + +/// Same as `Io.Condition.signal` but avoids the VTable. +fn condSignal(cond: *Condition) void { + if (is_windows) return windows.ntdll.RtlWakeConditionVariable(&cond.condition); + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.signals + 1, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + Thread.futexWake(&cond.epoch.raw, 1); + return; + }; + } +} + +/// Same as `Io.Condition.waitUncancelable` but avoids the VTable. +fn condWait(cond: *Condition, mutex: *Mutex) void { + if (is_windows) { + _ = windows.kernel32.SleepConditionVariableSRW(&cond.condition, &mutex.srwlock, windows.INFINITE, 0); + return; + } + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load + + { + const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters + } + + mutexUnlock(mutex); + defer mutexLockUncancelable(mutex); + + while (true) { + Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null); + + epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod + + var prev_state = cond.state.load(.monotonic); + while (prev_state.signals > 0) { + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters - 1, + .signals = prev_state.signals - 1, + }, .acquire, .monotonic) orelse { + // We successfully consumed a signal. + return; + }; + } + } +} + +const Mutex = if (!is_windows) Io.Mutex else struct { + srwlock: windows.SRWLOCK, + const init: @This() = .{ .srwlock = .{} }; +}; + +/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable. +fn mutexLock(m: *Io.Mutex) Io.Cancelable!void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, + .acquire, + .monotonic, + ) orelse { + @branchHint(.likely); + return; + }; + if (initial_state == .contended) { + try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } +} + +/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable. +fn mutexLockUncancelable(m: *Mutex) void { + if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock); + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, + .acquire, + .monotonic, + ) orelse { + @branchHint(.likely); + return; + }; + if (initial_state == .contended) { + Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } +} + +/// Same as `Io.Mutex.unlock` but avoids the VTable. +fn mutexUnlock(m: *Mutex) void { + if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock); + switch (m.state.swap(.unlocked, .release)) { + .unlocked => unreachable, + .locked_once => {}, + .contended => { + @branchHint(.unlikely); + Thread.futexWake(@ptrCast(&m.state.raw), 1); + }, + } +}