diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 4b1ebbc59b..11708af071 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -29,8 +29,8 @@ const ws2_32 = std.os.windows.ws2_32; /// * scanning environment variables on some targets /// * memory-mapping when mmap or equivalent is not available allocator: Allocator, -mutex: std.Thread.Mutex = .{}, -cond: std.Thread.Condition = .{}, +mutex: Io.Mutex = .init, +cond: Io.Condition = .init, run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, stack_size: usize, @@ -1486,8 +1486,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded; pub const global_single_threaded: *Threaded = &global_single_threaded_instance; pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); t.async_limit = new_limit; } @@ -1508,11 +1508,11 @@ pub fn deinit(t: *Threaded) void { fn join(t: *Threaded) void { if (builtin.single_threaded) return; { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); t.join_requested = true; } - t.cond.broadcast(); + condBroadcast(&t.cond); t.wait_group.wait(); } @@ -1574,20 +1574,20 @@ fn worker(t: *Threaded) void { defer t.wait_group.finish(); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); while (true) { while (t.run_queue.popFirst()) |runnable_node| { - t.mutex.unlock(); + mutexUnlock(&t.mutex); thread.cancel_protection = .unblocked; const runnable: *Runnable = @fieldParentPtr("node", runnable_node); runnable.startFn(runnable, &thread, t); - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); t.busy_count -= 1; } if (t.join_requested) break; - t.cond.wait(&t.mutex); + condWait(&t.cond, &t.mutex); } } @@ -2004,12 +2004,12 @@ fn async( }, }; - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - t.mutex.unlock(); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2023,7 +2023,7 @@ fn async( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - t.mutex.unlock(); + mutexUnlock(&t.mutex); future.destroy(gpa); start(context.ptr, result.ptr); return null; @@ -2033,8 +2033,8 @@ fn async( t.run_queue.prepend(&future.runnable.node); - t.mutex.unlock(); - t.cond.signal(); + mutexUnlock(&t.mutex); + condSignal(&t.cond); return @ptrCast(future); } @@ -2056,8 +2056,8 @@ fn concurrent( }; errdefer future.destroy(gpa); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -2080,7 +2080,7 @@ fn concurrent( t.run_queue.prepend(&future.runnable.node); - t.cond.signal(); + condSignal(&t.cond); return @ptrCast(future); } @@ -2101,12 +2101,12 @@ fn groupAsync( error.OutOfMemory => return groupAsyncEager(start, context.ptr), }; - t.mutex.lock(); + mutexLockUncancelable(&t.mutex); const busy_count = t.busy_count; if (busy_count >= @intFromEnum(t.async_limit)) { - t.mutex.unlock(); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); } @@ -2119,7 +2119,7 @@ fn groupAsync( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch { t.wait_group.finish(); t.busy_count = busy_count; - t.mutex.unlock(); + mutexUnlock(&t.mutex); task.destroy(gpa); return groupAsyncEager(start, context.ptr); }; @@ -2136,8 +2136,8 @@ fn groupAsync( }, .monotonic); t.run_queue.prepend(&task.runnable.node); - t.mutex.unlock(); - t.cond.signal(); + mutexUnlock(&t.mutex); + condSignal(&t.cond); } fn groupAsyncEager( start: *const fn (context: *const anyopaque) Io.Cancelable!void, @@ -2201,8 +2201,8 @@ fn groupConcurrent( }; errdefer task.destroy(gpa); - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); const busy_count = t.busy_count; @@ -2233,7 +2233,7 @@ fn groupConcurrent( }, .monotonic); t.run_queue.prepend(&task.runnable.node); - t.cond.signal(); + condSignal(&t.cond); } fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { @@ -3838,8 +3838,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat { fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION { if (!t.system_basic_information.initialized.load(.acquire)) { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); switch (windows.ntdll.NtQuerySystemInformation( .SystemBasicInformation, @@ -14299,10 +14299,9 @@ const Wsa = struct { }; fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void { - const t_io = io(t); const wsa = &t.wsa; - try wsa.mutex.lock(t_io); - defer wsa.mutex.unlock(t_io); + try mutexLock(&wsa.mutex); + defer mutexUnlock(&wsa.mutex); switch (wsa.status) { .uninitialized => { var wsa_data: ws2_32.WSADATA = undefined; @@ -14373,8 +14372,8 @@ const WindowsEnvironStrings = struct { }; fn scanEnviron(t: *Threaded) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.environ.initialized) return; t.environ.initialized = true; @@ -14729,8 +14728,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp fn getDevNullFd(t: *Threaded) !posix.fd_t { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) return t.null_file.fd; } const mode: u32 = 0; @@ -14741,8 +14740,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t { .SUCCESS => { syscall.finish(); const fresh_fd: posix.fd_t = @intCast(rc); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.fd != -1) { posix.close(fresh_fd); return t.null_file.fd; @@ -15402,8 +15401,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |handle| return handle; } @@ -15437,8 +15436,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -15458,8 +15457,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE { fn getNulHandle(t: *Threaded) !windows.HANDLE { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |handle| return handle; } @@ -15505,8 +15504,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { )) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.null_file.handle) |prev_handle| { windows.CloseHandle(fresh_handle); return prev_handle; @@ -16551,15 +16550,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void { } fn randomMainThread(t: *Threaded, buffer: []u8) void { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (!t.csprng.isInitialized()) { @branchHint(.unlikely); var seed: [Csprng.seed_len]u8 = undefined; { - t.mutex.unlock(); - defer t.mutex.lock(); + mutexUnlock(&t.mutex); + defer mutexLockUncancelable(&t.mutex); const prev = swapCancelProtection(t, .blocked); defer _ = swapCancelProtection(t, prev); @@ -16744,8 +16743,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void { fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { { - t.mutex.lock(); - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); + defer mutexUnlock(&t.mutex); if (t.random_file.fd == -2) return error.EntropyUnavailable; if (t.random_file.fd != -1) return t.random_file.fd; @@ -16785,8 +16784,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { .SUCCESS => { syscall.finish(); if (!statx.mask.TYPE) return error.EntropyUnavailable; - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16813,8 +16812,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t { switch (posix.errno(fstat_sym(fd, &stat))) { .SUCCESS => { syscall.finish(); - t.mutex.lock(); // Another thread might have won the race. - defer t.mutex.unlock(); + mutexLockUncancelable(&t.mutex); // Another thread might have won the race. + defer mutexUnlock(&t.mutex); if (t.random_file.fd >= 0) { posix.close(fd); return t.random_file.fd; @@ -16878,13 +16877,13 @@ const parking_futex = struct { /// avoid a race. num_waiters: std.atomic.Value(u32), /// Protects `waiters`. - mutex: std.Thread.Mutex, + mutex: Io.Mutex, waiters: std.DoublyLinkedList, /// Prevent false sharing between buckets. _: void align(std.atomic.cache_line) = {}, - const init: Bucket = .{ .num_waiters = .init(0), .mutex = .{}, .waiters = .{} }; + const init: Bucket = .{ .num_waiters = .init(0), .mutex = .init, .waiters = .{} }; }; const Waiter = struct { @@ -16947,8 +16946,8 @@ const parking_futex = struct { var status_buf: std.atomic.Value(Thread.Status) = undefined; { - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); _ = bucket.num_waiters.fetchAdd(1, .acquire); @@ -17017,8 +17016,8 @@ const parking_futex = struct { .parked => { // We saw a timeout and updated our own status from `.parked` to `.none`. It is // our responsibility to remove `waiter` from `bucket`. - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); }, @@ -17057,8 +17056,8 @@ const parking_futex = struct { // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. var waking_head: ?*std.DoublyLinkedList.Node = null; { - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); var num_removed: u32 = 0; var it = bucket.waiters.first; @@ -17113,8 +17112,8 @@ const parking_futex = struct { fn removeCanceledWaiter(waiter: *Waiter) void { const bucket = bucketForAddress(waiter.address); - bucket.mutex.lock(); - defer bucket.mutex.unlock(); + mutexLockUncancelable(&bucket.mutex); + defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); waiter.done.store(true, .release); // potentially invalidates `waiter.*` @@ -18102,3 +18101,123 @@ fn eventSet(event: *Io.Event) void { .waiting => Thread.futexWake(@ptrCast(event), std.math.maxInt(u32)), } } + +/// Same as `Io.Condition.broadcast` but avoids the VTable. +fn condBroadcast(cond: *Io.Condition) void { + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.waiters, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + Thread.futexWake(&cond.epoch.raw, prev_state.waiters - prev_state.signals); + return; + }; + } +} + +/// Same as `Io.Condition.signal` but avoids the VTable. +fn condSignal(cond: *Io.Condition) void { + var prev_state = cond.state.load(.monotonic); + while (prev_state.waiters > prev_state.signals) { + @branchHint(.unlikely); + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters, + .signals = prev_state.signals + 1, + }, .release, .monotonic) orelse { + // Update the epoch to tell the waiting threads that there are new signals for them. + // Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen + // between it observing the epoch and sleeping on it, but this is extraordinarily + // unlikely due to the precise number of calls required. + _ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update + Thread.futexWake(&cond.epoch.raw, 1); + return; + }; + } +} + +/// Same as `Io.Condition.waitUncancelable` but avoids the VTable. +fn condWait(cond: *Io.Condition, mutex: *Io.Mutex) void { + var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load + + { + const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic); + assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters + } + + mutexUnlock(mutex); + defer mutexLockUncancelable(mutex); + + while (true) { + Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null); + + epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod + + var prev_state = cond.state.load(.monotonic); + while (prev_state.signals > 0) { + prev_state = cond.state.cmpxchgWeak(prev_state, .{ + .waiters = prev_state.waiters - 1, + .signals = prev_state.signals - 1, + }, .acquire, .monotonic) orelse { + // We successfully consumed a signal. + return; + }; + } + } +} + +/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable. +fn mutexLock(m: *Io.Mutex) Io.Cancelable!void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, + .acquire, + .monotonic, + ) orelse { + @branchHint(.likely); + return; + }; + if (initial_state == .contended) { + try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } +} + +/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable. +fn mutexLockUncancelable(m: *Io.Mutex) void { + const initial_state = m.state.cmpxchgWeak( + .unlocked, + .locked_once, + .acquire, + .monotonic, + ) orelse { + @branchHint(.likely); + return; + }; + if (initial_state == .contended) { + Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } + while (m.state.swap(.contended, .acquire) != .unlocked) { + Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null); + } +} + +/// Same as `Io.Mutex.unlock` but avoids the VTable. +fn mutexUnlock(m: *Io.Mutex) void { + switch (m.state.swap(.unlocked, .release)) { + .unlocked => unreachable, + .locked_once => {}, + .contended => { + @branchHint(.unlikely); + Thread.futexWake(@ptrCast(&m.state.raw), 1); + }, + } +}