diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 129ad59543..be0bb284c3 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -628,6 +628,7 @@ const Thread = struct { cancel_protection: Io.CancelProtection, /// Always released when `Status.cancelation` is set to `.parked`. futex_waiter: if (use_parking_futex) ?*parking_futex.Waiter else ?noreturn, + unpark_flag: UnparkFlag, csprng: Csprng, @@ -1018,6 +1019,7 @@ const Thread = struct { if (thread.futex_waiter) |futex_waiter| { parking_futex.removeCanceledWaiter(futex_waiter); } + if (need_unpark_flag) setUnparkFlag(&thread.unpark_flag); unpark(&.{thread.id}, null); return false; }, @@ -1559,6 +1561,7 @@ fn worker(t: *Threaded) void { }), .cancel_protection = .unblocked, .futex_waiter = undefined, + .unpark_flag = unpark_flag_init, .csprng = .{}, }; Thread.current = &thread; @@ -17007,6 +17010,7 @@ const parking_futex = struct { /// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope /// while it is still in the `Bucket`). thread_status: *std.atomic.Value(Thread.Status), + unpark_flag: if (need_unpark_flag) *UnparkFlag else void, }; fn bucketForAddress(address: usize) *Bucket { @@ -17045,9 +17049,11 @@ const parking_futex = struct { .address = @intFromPtr(ptr), .tid = self_tid, .thread_status = undefined, // populated in critical section + .unpark_flag = undefined, // populated in critical section }; var status_buf: std.atomic.Value(Thread.Status) = undefined; + var unpark_flag_buf: UnparkFlag = unpark_flag_init; { bucket.mutex.lock(); @@ -17062,7 +17068,7 @@ const parking_futex = struct { // This is in the critical section to avoid marking the thread as parked until we're // certain that we're actually going to park. - waiter.thread_status = status: { + waiter.thread_status, waiter.unpark_flag = status: { cancelable: { if (uncancelable) break :cancelable; const thread = opt_thread orelse break :cancelable; @@ -17090,19 +17096,19 @@ const parking_futex = struct { .blocked_canceling => unreachable, } // We could now be unparked for a cancelation at any time! - break :status &thread.status; + break :status .{ &thread.status, if (need_unpark_flag) &thread.unpark_flag }; } // This is an uncancelable wait, so just use `status_buf`. Note that the value of // `status_buf.awaitable` is irrelevant because this is only visible to futex code, // while only cancelation cares about `awaitable`. status_buf.raw = .{ .cancelation = .parked, .awaitable = .null }; - break :status &status_buf; + break :status .{ &status_buf, if (need_unpark_flag) &unpark_flag_buf }; }; bucket.waiters.append(&waiter.node); } - if (park(timeout, ptr, waiter.thread_status)) { + if (park(timeout, ptr, waiter.unpark_flag)) { // We were unparked by either `wake` or cancelation, so our current status is either // `.none` or `.canceling`. In either case, they've already removed `waiter` from // `bucket`, so we have nothing more to do! @@ -17127,7 +17133,7 @@ const parking_futex = struct { // to unpark us. Whoever did that will remove us from `bucket`. Wait for // that (and drop the unpark request in doing so). // New status is `.none` or `.canceling` respectively. - park(.none, ptr, waiter.thread_status) catch |e| switch (e) { + park(.none, ptr, waiter.unpark_flag) catch |e| switch (e) { error.Timeout => unreachable, }; }, @@ -17201,6 +17207,7 @@ const parking_futex = struct { waking_head = node.next; const waiter: *Waiter = @fieldParentPtr("node", node); unpark_buf[unpark_len] = waiter.tid; + if (need_unpark_flag) setUnparkFlag(waiter.unpark_flag); unpark_len += 1; if (unpark_len == unpark_buf.len) { unpark(&unpark_buf, ptr); @@ -17249,7 +17256,7 @@ const parking_sleep = struct { .blocked_canceling => unreachable, } } - if (park(timeout, null, &thread.status)) { + if (park(timeout, null, if (need_unpark_flag) &thread.unpark_flag)) { // The only reason this could possibly happen is cancelation. const old_status = thread.status.load(.monotonic); assert(old_status.cancelation == .canceling); @@ -17272,7 +17279,7 @@ const parking_sleep = struct { // us for a cancelation. Whoever did that will have called `unpark`, so // drop that unpark request by waiting for it. // Status is still `.canceling`. - park(.none, null, &thread.status) catch |e| switch (e) { + park(.none, null, if (need_unpark_flag) &thread.unpark_flag) catch |e| switch (e) { error.Timeout => unreachable, }; return; @@ -17288,8 +17295,8 @@ const parking_sleep = struct { } } // Uncancelable sleep; we expect not to be manually unparked. - var dummy_status: std.atomic.Value(Thread.Status) = .init(.{ .cancelation = .parked, .awaitable = .null }); - if (park(timeout, null, &dummy_status)) { + var dummy_flag: UnparkFlag = unpark_flag_init; + if (park(timeout, null, if (need_unpark_flag) &dummy_flag)) { unreachable; // unexpected unpark } else |err| switch (err) { error.Timeout => return, @@ -17322,7 +17329,7 @@ const ParkingMutex = struct { } }; const Waiter = struct { - status: std.atomic.Value(Thread.Status), + unpark_flag: UnparkFlag, /// Never modified once the `Waiter` is in the linked list. next: ?*Waiter, /// Never modified once the `Waiter` is in the linked list. @@ -17345,7 +17352,7 @@ const ParkingMutex = struct { const self_tid = if (Thread.current) |t| t.id else std.Thread.getCurrentId(); var waiter: Waiter = .{ .next = old_waiter, - .status = .init(.{ .cancelation = .parked, .awaitable = .null }), + .unpark_flag = unpark_flag_init, .tid = self_tid, }; if (m.state.cmpxchgWeak( @@ -17357,11 +17364,9 @@ const ParkingMutex = struct { continue :state new_state; } // We're now in the list of waiters---park until we're given the lock. - park(.none, m, &waiter.status) catch |err| switch (err) { + park(.none, m, if (need_unpark_flag) &waiter.unpark_flag) catch |err| switch (err) { error.Timeout => unreachable, }; - // We now hold the lock. - assert(waiter.status.load(.monotonic).cancelation == .none); return; }, } @@ -17383,7 +17388,7 @@ const ParkingMutex = struct { _ => |last_state| { // The logic here does not have ABA problems, and does some accesses non-atomically, // because `Waiter.next` is owned by the lock holder (that's us!) once the waiter is - // in the linked list, up until we set `Waiter.status` to `.none`. + // in the linked list, up until we unpark the waiter. // Run through the waiter list to the end to ensure fairness. This is obviously not // ideal, but it shouldn't be a big deal in practice provided the critical section @@ -17412,8 +17417,8 @@ const ParkingMutex = struct { } } // Now we're ready to actually hand the lock over to them. - const tid = waiter.tid; // load this before the store below potentially invalidates `waiter` - waiter.status.store(.{ .cancelation = .none, .awaitable = .null }, .release); // release lock + const tid = waiter.tid; // load before the unpark below potentially invalidates `waiter` + if (need_unpark_flag) setUnparkFlag(&waiter.unpark_flag); unpark(&.{tid}, m); return; }, @@ -17451,15 +17456,34 @@ fn timeoutToWindowsInterval(timeout: Io.Timeout) ?windows.LARGE_INTEGER { } } +/// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals +/// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending). +/// As such, on these targets only, we need to pass around a flag to track whether a thread is +/// "actually" being unparked. +const need_unpark_flag = switch (native_os) { + .netbsd, .illumos => true, + else => false, +}; +const UnparkFlag = if (need_unpark_flag) std.atomic.Value(bool) else void; +const unpark_flag_init: UnparkFlag = if (need_unpark_flag) .init(false); +/// Must be called before `unpark`. After this function is called, the thread may be unparked at any +/// time, so the caller must not reference values on its stack. +fn setUnparkFlag(f: *UnparkFlag) void { + f.store(true, .release); +} + +/// The type passed into `unpark` for the thread ID. You'd think this was just a `std.Thread.Id`, +/// but it seems that someone at Microsoft forgot how big their TIDs are supposed to be. +const UnparkTid = switch (native_os) { + .windows => usize, + else => std.Thread.Id, +}; + fn park( timeout: Io.Timeout, /// This value has no semantic effect, but may allow the OS to optimize the operation. addr_hint: ?*const anyopaque, - /// The API on NetBSD and Illumos sucks and can unpark spuriously (well, it *can't*, but signals - /// cause an indistinguishable unblock, and libpthread really likes to leave unparks pending). - /// As such, on these targets only, this `status` is checked to determine if an unpark is real. - /// no way to differentiate - status: *std.atomic.Value(Thread.Status), + unpark_flag: if (need_unpark_flag) *UnparkFlag else void, ) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (native_os) { @@ -17502,7 +17526,7 @@ fn park( }; // It's okay to pass the same timeout in a loop. If it's a duration, the OS actually // writes the remaining time into the buffer when the syscall returns. - while (status.load(.monotonic).cancelation == .parked) { + while (!unpark_flag.swap(false, .acquire)) { switch (posix.errno(std.c._lwp_park( if (clock_real) .REALTIME else .MONOTONIC, .{ .ABSTIME = abstime }, @@ -17523,12 +17547,6 @@ fn park( else => comptime unreachable, } } - -const UnparkTid = switch (native_os) { - // `NtAlertMultipleThreadByThreadId` is weird and wants 64-bit thread IDs? - .windows => usize, - else => std.Thread.Id, -}; /// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. fn unpark(tids: []const UnparkTid, addr_hint: ?*const anyopaque) void { comptime assert(use_parking_futex or use_parking_sleep);