diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 5464330bd0..558263a010 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -4955,7 +4955,10 @@ pub fn dirOpenFileWtf16( // kernel bug with retry attempts. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -4977,7 +4980,10 @@ pub fn dirOpenFileWtf16( // fixed by sleeping and retrying until the error goes away. syscall.finish(); if (max_attempts - attempt == 0) return error.FileBusy; - try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), + .clock = .awake, + } }); attempt += 1; syscall = try .start(); continue; @@ -10823,9 +10829,6 @@ fn nowPosix(clock: Io.Clock) Io.Timestamp { fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - return nowInner(clock); -} -fn nowInner(clock: Io.Clock) Io.Timestamp { return switch (native_os) { .windows => nowWindows(clock), .wasi => nowWasi(clock), @@ -15582,7 +15585,10 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { // this other than retrying the creation after the OS finishes // the deletion. syscall.finish(); - try parking_sleep.windowsRetrySleep(1); + try parking_sleep.sleep(.{ .duration = .{ + .raw = .fromMilliseconds(1), + .clock = .awake, + } }); syscall = try .start(); continue; }, @@ -16955,13 +16961,9 @@ const parking_futex = struct { /// /// * Removing the `Waiter` from `Bucket.waiters` /// * Decrementing `Bucket.num_waiters` - /// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time, - /// so must not be referenced again) - /// * Unparking the thread (last, so that the unparked thread definitely sees `done`) + /// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope + /// while it is still in the `Bucket`). thread_status: *std.atomic.Value(Thread.Status), - /// Initially `false`. Whoever updates `thread_status` to `.none`/`.canceling` will update - /// this to `true` once they are done with the `Waiter`, just before unparking `tid`. - done: std.atomic.Value(bool), }; fn bucketForAddress(address: usize) *Bucket { @@ -17000,7 +17002,6 @@ const parking_futex = struct { .address = @intFromPtr(ptr), .tid = self_tid, .thread_status = undefined, // populated in critical section - .done = .init(false), }; var status_buf: std.atomic.Value(Thread.Status) = undefined; @@ -17058,44 +17059,41 @@ const parking_futex = struct { bucket.waiters.append(&waiter.node); } - const deadline: ?Io.Clock.Timestamp = switch (timeout) { - .none => null, - .duration => |d| .{ - .raw = nowInner(d.clock).addDuration(d.raw), - .clock = d.clock, - }, - .deadline => |d| d, - }; - while (park(deadline, ptr)) { - if (waiter.done.load(.acquire)) return; // all done! + if (park(timeout, ptr)) { + // We were unparked by either `wake` or cancelation, so our current status is either + // `.none` or `.canceling`. In either case, they've already removed `waiter` from + // `bucket`, so we have nothing more to do! } else |err| switch (err) { - error.Timeout => switch (waiter.thread_status.fetchAnd( - .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, - .monotonic, - ).cancelation) { - .parked => { - // We saw a timeout and updated our own status from `.parked` to `.none`. It is - // our responsibility to remove `waiter` from `bucket`. - mutexLock(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); - bucket.waiters.remove(&waiter.node); - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); - }, - .none, .canceling => { - // Race condition: the timeout was reached, then `wake` or a cancelation tried - // to update our status. They won the race, so wait for them to do the cleanup. - // They'll tell us by setting `waiter.done` and unparking us. - while (!waiter.done.load(.acquire)) { - park(null, ptr) catch |e| switch (e) { + error.Timeout => { + // We're not out of the woods yet: an unpark could race with the timeout. + const old_status = waiter.thread_status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ); + switch (old_status.cancelation) { + .parked => { + // No race. It is our responsibility to remove `waiter` from `bucket`. + // New status is `.none`. + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + bucket.waiters.remove(&waiter.node); + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + }, + .none, .canceling => { + // Race condition: the timeout was reached, then `wake` or a canceler tried + // to unpark us. Whoever did that will remove us from `bucket`. Wait for + // that (and drop the unpark request in doing so). + // New status is `.none` or `.canceling` respectively. + park(.none, ptr) catch |e| switch (e) { error.Timeout => unreachable, }; - } - }, - .canceled => unreachable, - .blocked => unreachable, - .blocked_alertable => unreachable, - .blocked_alertable_canceling => unreachable, - .blocked_canceling => unreachable, + }, + .canceled => unreachable, + .blocked => unreachable, + .blocked_alertable => unreachable, + .blocked_canceling => unreachable, + .blocked_alertable_canceling => unreachable, + } }, } } @@ -17144,6 +17142,9 @@ const parking_futex = struct { waiter.node.next = waking_head; waking_head = &waiter.node; num_removed += 1; + // Signal to `waiter` that they're about to be unparked, in case we're racing with their + // timeout. See corresponding logic in `wake`. + waiter.address = 0; } _ = bucket.num_waiters.fetchSub(num_removed, .monotonic); @@ -17158,8 +17159,6 @@ const parking_futex = struct { const waiter: *Waiter = @fieldParentPtr("node", node); unpark_buf[unpark_len] = waiter.tid; unpark_len += 1; - waiter.done.store(true, .release); - // `waiter.*` is now potentially invalid so must not be referenced again. if (unpark_len == unpark_buf.len) { unpark(&unpark_buf, ptr); unpark_len = 0; @@ -17176,14 +17175,13 @@ const parking_futex = struct { defer mutexUnlock(&bucket.mutex); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); - waiter.done.store(true, .release); // potentially invalidates `waiter.*` } }; const parking_sleep = struct { comptime { assert(use_parking_sleep); } - fn sleep(deadline: ?Io.Clock.Timestamp) Io.Cancelable!void { + fn sleep(timeout: Io.Timeout) Io.Cancelable!void { const opt_thread = Thread.current; cancelable: { const thread = opt_thread orelse break :cancelable; @@ -17192,90 +17190,87 @@ const parking_sleep = struct { .unblocked => {}, } thread.futex_waiter = null; - const orig_status = thread.status.fetchOr( - .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, - .release, // release `thread.futex_waiter` - ); - switch (orig_status.cancelation) { - .none => {}, // status is now `.parked` - .canceling => return error.Canceled, // status is now `.canceled` - .canceled => break :cancelable, // status is still `.canceled` - .parked => unreachable, - .blocked => unreachable, - .blocked_alertable => unreachable, - .blocked_alertable_canceling => unreachable, - .blocked_canceling => unreachable, - } - while (park(deadline, null)) { - // Either a cancelation or a spurious unpark; let's see which! - switch (thread.status.load(.monotonic).cancelation) { - .parked => continue, // spurious unpark; keep sleeping - .canceling => { - // We got canceled; update our state and return. - thread.status.store( - .{ .cancelation = .canceled, .awaitable = orig_status.awaitable }, - .monotonic, - ); - return error.Canceled; - }, - .none => unreachable, - .canceled => unreachable, + { + const old_status = thread.status.fetchOr( + .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, + .release, // release `thread.futex_waiter` + ); + switch (old_status.cancelation) { + .none => {}, // status is now `.parked` + .canceling => return error.Canceled, // status is now `.canceled` + .canceled => break :cancelable, // status is still `.canceled` + .parked => unreachable, .blocked => unreachable, .blocked_alertable => unreachable, .blocked_alertable_canceling => unreachable, .blocked_canceling => unreachable, } - } else |err| switch (err) { - error.Timeout => switch (thread.status.fetchAnd( - .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + } + if (park(timeout, null)) { + // The only reason this could possibly happen is cancelation. + const old_status = thread.status.load(.monotonic); + assert(old_status.cancelation == .canceling); + thread.status.store( + .{ .cancelation = .canceled, .awaitable = old_status.awaitable }, .monotonic, - ).cancelation) { - // We updated our own status from `.parked` to `.none`. - .parked => return, // new status is `.none` - .canceling => { - // Timeout raced with a cancelation. We don't need to do anything, but - // the next `park` on this thread will see a spurious unpark. - // Status is still `.canceling`. - return; - }, - .none => unreachable, - .canceled => unreachable, - .blocked => unreachable, - .blocked_alertable => unreachable, - .blocked_alertable_canceling => unreachable, - .blocked_canceling => unreachable, + ); + return error.Canceled; + } else |err| switch (err) { + error.Timeout => { + // We're not out of the woods yet: an unpark could race with the timeout. + const old_status = thread.status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ); + switch (old_status.cancelation) { + .parked => return, // No race; new status is `.none` + .canceling => { + // Race condition: the timeout was reached, then someone tried to unpark + // us for a cancelation. Whoever did that will have called `unpark`, so + // drop that unpark request by waiting for it. + // Status is still `.canceling`. + park(.none, null) catch |e| switch (e) { + error.Timeout => unreachable, + }; + return; + }, + .none => unreachable, + .canceled => unreachable, + .blocked => unreachable, + .blocked_alertable => unreachable, + .blocked_canceling => unreachable, + .blocked_alertable_canceling => unreachable, + } }, } } - // Uncancelable sleep; this case is very simple. - while (park(deadline, null)) { - // Definitely spurious; nothing to do. + // Uncancelable sleep; we expect not to be manually unparked. + if (park(timeout, null)) { + unreachable; // unexpected unpark } else |err| switch (err) { error.Timeout => return, } } - /// Sleep for approximately `ms` awake milliseconds in an attempt to work around Windows kernel bugs. - fn windowsRetrySleep(ms: u32) (Io.Cancelable || Io.UnexpectedError)!void { - const now_timestamp = nowWindows(.awake); // '.awake' is supported on Windows - const deadline = now_timestamp.addDuration(.fromMilliseconds(ms)); - try parking_sleep.sleep(.{ .raw = deadline, .clock = .awake }); - } }; -/// Spurious wakeups are possible. -/// /// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. -fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { +fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (native_os) { .windows => { var timeout_buf: windows.LARGE_INTEGER = undefined; - const raw_timeout: ?*windows.LARGE_INTEGER = if (opt_deadline) |deadline| timeout: { - const now_timestamp = nowWindows(deadline.clock); - const nanoseconds = now_timestamp.durationTo(deadline.raw).nanoseconds; - timeout_buf = @intCast(@divTrunc(-nanoseconds, 100)); - break :timeout &timeout_buf; - } else null; + const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) { + .none => null, + .deadline => |timestamp| continue :timeout .{ .duration = .{ + .clock = timestamp.clock, + .raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw), + } }, + .duration => |duration| { + _ = duration.clock; // Windows only supports monotonic + timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100)); + break :timeout &timeout_buf; + }, + }; // `RtlWaitOnAddress` passes the futex address in as the first argument to this call, // but it's unclear what that actually does, especially since `NtAlertThreadByThreadId` // does *not* accept the address so the kernel can't really be using it as a hint. An @@ -17297,13 +17292,20 @@ fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{T }, .netbsd => { var ts_buf: posix.timespec = undefined; - const ts: ?*posix.timespec, const clock_real: bool = if (opt_deadline) |deadline| timeout: { - ts_buf = timestampToPosix(deadline.raw.nanoseconds); - break :timeout .{ &ts_buf, deadline.clock == .real }; - } else .{ null, true }; + const ts: ?*posix.timespec, const abstime: bool, const clock_real: bool = switch (timeout) { + .none => .{ null, false, false }, + .deadline => |timestamp| timeout: { + ts_buf = timestampToPosix(timestamp.raw.nanoseconds); + break :timeout .{ &ts_buf, true, timestamp.clock == .real }; + }, + .duration => |duration| timeout: { + ts_buf = timestampToPosix(duration.raw.nanoseconds); + break :timeout .{ &ts_buf, false, duration.clock == .real }; + }, + }; switch (posix.errno(std.c._lwp_park( if (clock_real) .REALTIME else .MONOTONIC, - .{ .ABSTIME = true }, + .{ .ABSTIME = abstime }, ts, 0, addr_hint,