diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 0627eed34a..735efa9992 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -3963,10 +3963,7 @@ pub fn dirOpenFileWtf16( // kernel bug with retry attempts. syscall.finish(); if (max_attempts - attempt == 0) return error.SharingViolation; - try parking_sleep.sleep(.{ .duration = .{ - .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), - .clock = .awake, - } }); + try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); attempt += 1; syscall = try .start(); continue; @@ -3988,10 +3985,7 @@ pub fn dirOpenFileWtf16( // fixed by sleeping and retrying until the error goes away. syscall.finish(); if (max_attempts - attempt == 0) return error.SharingViolation; - try parking_sleep.sleep(.{ .duration = .{ - .raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1), - .clock = .awake, - } }); + try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1); attempt += 1; syscall = try .start(); continue; @@ -9638,6 +9632,9 @@ fn nowPosix(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; + return nowInner(clock); +} +fn nowInner(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { return switch (native_os) { .windows => nowWindows(clock), .wasi => nowWasi(clock), @@ -9687,7 +9684,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (use_parking_sleep) return parking_sleep.sleep(timeout); + if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t))); if (native_os == .wasi) return sleepWasi(t, timeout); if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout); return sleepNanosleep(t, timeout); @@ -14197,10 +14194,7 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE { // this other than retrying the creation after the OS finishes // the deletion. syscall.finish(); - try parking_sleep.sleep(.{ .duration = .{ - .raw = .fromMilliseconds(1), - .clock = .awake, - } }); + try parking_sleep.windowsRetrySleep(1); syscall = try .start(); continue; }, @@ -15570,9 +15564,13 @@ const parking_futex = struct { /// /// * Removing the `Waiter` from `Bucket.waiters` /// * Decrementing `Bucket.num_waiters` - /// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope - /// while it is still in the `Bucket`). + /// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time, + /// so must not be referenced again) + /// * Unparking the thread (last, so that the unparked thread definitely sees `done`) thread_status: *std.atomic.Value(Thread.Status), + /// Initially `false`. Whoever updates `thread_status` to `.none`/`.canceling` will update + /// this to `true` once they are done with the `Waiter`, just before unparking `tid`. + done: std.atomic.Value(bool), }; fn bucketForAddress(address: usize) *Bucket { @@ -15611,6 +15609,7 @@ const parking_futex = struct { .address = @intFromPtr(ptr), .tid = self_tid, .thread_status = undefined, // populated in critical section + .done = .init(false), }; var status_buf: std.atomic.Value(Thread.Status) = undefined; @@ -15667,40 +15666,43 @@ const parking_futex = struct { bucket.waiters.append(&waiter.node); } - if (park(timeout, ptr)) { - // We were unparked by either `wake` or cancelation, so our current status is either - // `.none` or `.canceling`. In either case, they've already removed `waiter` from - // `bucket`, so we have nothing more to do! + const deadline: ?Io.Clock.Timestamp = switch (timeout) { + .none => null, + .duration => |d| .{ + .raw = (nowInner(d.clock) catch unreachable).addDuration(d.raw), + .clock = d.clock, + }, + .deadline => |d| d, + }; + while (park(deadline, ptr)) { + if (waiter.done.load(.acquire)) return; // all done! } else |err| switch (err) { - error.Timeout => { - // We're not out of the woods yet: an unpark could race with the timeout. - const old_status = waiter.thread_status.fetchAnd( - .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, - .monotonic, - ); - switch (old_status.cancelation) { - .parked => { - // No race. It is our responsibility to remove `waiter` from `bucket`. - // New status is `.none`. - bucket.mutex.lock(); - defer bucket.mutex.unlock(); - bucket.waiters.remove(&waiter.node); - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); - }, - .none, .canceling => { - // Race condition: the timeout was reached, then `wake` or a canceler tried - // to unpark us. Whoever did that will remove us from `bucket`. Wait for - // that (and drop the unpark request in doing so). - // New status is `.none` or `.canceling` respectively. - park(.none, ptr) catch |e| switch (e) { + error.Timeout => switch (waiter.thread_status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ).cancelation) { + .parked => { + // We saw a timeout and updated our own status from `.parked` to `.none`. It is + // our responsibility to remove `waiter` from `bucket`. + bucket.mutex.lock(); + defer bucket.mutex.unlock(); + bucket.waiters.remove(&waiter.node); + assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + }, + .none, .canceling => { + // Race condition: the timeout was reached, then `wake` or a cancelation tried + // to update our status. They won the race, so wait for them to do the cleanup. + // They'll tell us by setting `waiter.done` and unparking us. + while (!waiter.done.load(.acquire)) { + park(null, ptr) catch |e| switch (e) { error.Timeout => unreachable, }; - }, - .canceled => unreachable, - .blocked => unreachable, - .blocked_windows_dns => unreachable, - .blocked_canceling => unreachable, - } + } + }, + .canceled => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, }, } } @@ -15748,9 +15750,6 @@ const parking_futex = struct { waiter.node.next = waking_head; waking_head = &waiter.node; num_removed += 1; - // Signal to `waiter` that they're about to be unparked, in case we're racing with their - // timeout. See corresponding logic in `wake`. - waiter.address = 0; } _ = bucket.num_waiters.fetchSub(num_removed, .monotonic); @@ -15765,6 +15764,8 @@ const parking_futex = struct { const waiter: *Waiter = @fieldParentPtr("node", node); unpark_buf[unpark_len] = waiter.tid; unpark_len += 1; + waiter.done.store(true, .release); + // `waiter.*` is now potentially invalid so must not be referenced again. if (unpark_len == unpark_buf.len) { unpark(&unpark_buf, ptr); unpark_len = 0; @@ -15781,13 +15782,14 @@ const parking_futex = struct { defer bucket.mutex.unlock(); bucket.waiters.remove(&waiter.node); assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + waiter.done.store(true, .release); // potentially invalidates `waiter.*` } }; const parking_sleep = struct { comptime { assert(use_parking_sleep); } - fn sleep(timeout: Io.Timeout) Io.Cancelable!void { + fn sleep(deadline: ?Io.Clock.Timestamp) Io.SleepError!void { const opt_thread = Thread.current; cancelable: { const thread = opt_thread orelse break :cancelable; @@ -15796,85 +15798,90 @@ const parking_sleep = struct { .unblocked => {}, } thread.futex_waiter = null; - { - const old_status = thread.status.fetchOr( - .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, - .release, // release `thread.futex_waiter` - ); - switch (old_status.cancelation) { - .none => {}, // status is now `.parked` - .canceling => return error.Canceled, // status is now `.canceled` - .canceled => break :cancelable, // status is still `.canceled` - .parked => unreachable, + const orig_status = thread.status.fetchOr( + .{ .cancelation = @enumFromInt(0b001), .awaitable = .null }, + .release, // release `thread.futex_waiter` + ); + switch (orig_status.cancelation) { + .none => {}, // status is now `.parked` + .canceling => return error.Canceled, // status is now `.canceled` + .canceled => break :cancelable, // status is still `.canceled` + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + while (park(deadline, null)) { + // Either a cancelation or a spurious unpark; let's see which! + switch (thread.status.load(.monotonic).cancelation) { + .parked => continue, // spurious unpark; keep sleeping + .canceling => { + // We got canceled; update our state and return. + thread.status.store( + .{ .cancelation = .canceled, .awaitable = orig_status.awaitable }, + .monotonic, + ); + return error.Canceled; + }, + .none => unreachable, + .canceled => unreachable, .blocked => unreachable, .blocked_windows_dns => unreachable, .blocked_canceling => unreachable, } - } - if (park(timeout, null)) { - // The only reason this could possibly happen is cancelation. - const old_status = thread.status.load(.monotonic); - assert(old_status.cancelation == .canceling); - thread.status.store( - .{ .cancelation = .canceled, .awaitable = old_status.awaitable }, - .monotonic, - ); - return error.Canceled; } else |err| switch (err) { - error.Timeout => { - // We're not out of the woods yet: an unpark could race with the timeout. - const old_status = thread.status.fetchAnd( - .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, - .monotonic, - ); - switch (old_status.cancelation) { - .parked => return, // No race; new status is `.none` - .canceling => { - // Race condition: the timeout was reached, then someone tried to unpark - // us for a cancelation. Whoever did that will have called `unpark`, so - // drop that unpark request by waiting for it. - // Status is still `.canceling`. - park(.none, null) catch |e| switch (e) { - error.Timeout => unreachable, - }; - return; - }, - .none => unreachable, - .canceled => unreachable, - .blocked => unreachable, - .blocked_windows_dns => unreachable, - .blocked_canceling => unreachable, - } + error.Timeout => switch (thread.status.fetchAnd( + .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, + .monotonic, + ).cancelation) { + // We updated our own status from `.parked` to `.none`. + .parked => return, // new status is `.none` + .canceling => { + // Timeout raced with a cancelation. We don't need to do anything, but + // the next `park` on this thread will see a spurious unpark. + // Status is still `.canceling`. + return; + }, + .none => unreachable, + .canceled => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, }, } } - // Uncancelable sleep; we expect not to be manually unparked. - if (park(timeout, null)) { - unreachable; // unexpected unpark + // Uncancelable sleep; this case is very simple. + while (park(deadline, null)) { + // Definitely spurious; nothing to do. } else |err| switch (err) { error.Timeout => return, } } + /// Sleep for approximately `ms` awake milliseconds in an attempt to work around Windows kernel bugs. + fn windowsRetrySleep(ms: u32) (Io.Cancelable || Io.UnexpectedError)!void { + const now_timestamp = nowWindows(.awake) catch unreachable; // '.awake' is supported on Windows + const deadline = now_timestamp.addDuration(.fromMilliseconds(ms)); + parking_sleep.sleep(.{ .raw = deadline, .clock = .awake }) catch |err| switch (err) { + error.UnsupportedClock => unreachable, + else => |e| return e, + }; + } }; +/// Spurious wakeups are possible. +/// /// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation. -fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { +fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void { comptime assert(use_parking_futex or use_parking_sleep); switch (builtin.target.os.tag) { .windows => { var timeout_buf: windows.LARGE_INTEGER = undefined; - const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) { - .none => null, - .deadline => |timestamp| continue :timeout .{ .duration = .{ - .clock = timestamp.clock, - .raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw), - } }, - .duration => |duration| { - _ = duration.clock; // Windows only supports monotonic - timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100)); - break :timeout &timeout_buf; - }, - }; + const raw_timeout: ?*windows.LARGE_INTEGER = if (opt_deadline) |deadline| timeout: { + const now_timestamp = nowWindows(deadline.clock) catch unreachable; + const nanoseconds = now_timestamp.durationTo(deadline.raw).nanoseconds; + timeout_buf = @intCast(@divTrunc(-nanoseconds, 100)); + break :timeout &timeout_buf; + } else null; // `RtlWaitOnAddress` passes the futex address in as the first argument to this call, // but it's unclear what that actually does, especially since `NtAlertThreadByThreadId` // does *not* accept the address so the kernel can't really be using it as a hint. An @@ -15896,20 +15903,13 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void { }, .netbsd => { var ts_buf: posix.timespec = undefined; - const ts: ?*posix.timespec, const abstime: bool, const clock_real: bool = switch (timeout) { - .none => .{ null, false, false }, - .deadline => |timestamp| timeout: { - ts_buf = timestampToPosix(timestamp.raw.nanoseconds); - break :timeout .{ &ts_buf, true, timestamp.clock == .real }; - }, - .duration => |duration| timeout: { - ts_buf = timestampToPosix(duration.raw.nanoseconds); - break :timeout .{ &ts_buf, false, duration.clock == .real }; - }, - }; + const ts: ?*posix.timespec, const clock_real: bool = if (opt_deadline) |deadline| timeout: { + ts_buf = timestampToPosix(deadline.raw.nanoseconds); + break :timeout .{ &ts_buf, deadline.clock == .real }; + } else .{ null, true }; switch (posix.errno(std.c._lwp_park( if (clock_real) .REALTIME else .MONOTONIC, - .{ .ABSTIME = abstime }, + .{ .ABSTIME = true }, ts, 0, addr_hint,