mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 01:24:49 +01:00
Revert "std.Io.Threaded: spurious unparks are possible"
It turns out that at least on Windows, spurious unparks are *not*
possible, and in fact triggering them breaks some RTL synchronization
primitives. For instance, if you have a pending unpark going into a
contended `RtlEnterCriticalSection` call, it will never unblock. In
other words, the Windows API worked exactly how I thought it did, and
it's only the NetBSD/Illumos one which is dumb. This is actually exactly
why Windows 8 introduced the parking API despite alertable sleeps being
a thing!
This commit doesn't yet deal with making NetBSD work, nor does it even
compile I imagine. The next commit will fix everything back up.
This reverts commit c518593e97.
This commit is contained in:
parent
56a43fb86f
commit
7c08f77efa
1 changed files with 123 additions and 121 deletions
|
|
@ -4955,7 +4955,10 @@ pub fn dirOpenFileWtf16(
|
|||
// kernel bug with retry attempts.
|
||||
syscall.finish();
|
||||
if (max_attempts - attempt == 0) return error.FileBusy;
|
||||
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
attempt += 1;
|
||||
syscall = try .start();
|
||||
continue;
|
||||
|
|
@ -4977,7 +4980,10 @@ pub fn dirOpenFileWtf16(
|
|||
// fixed by sleeping and retrying until the error goes away.
|
||||
syscall.finish();
|
||||
if (max_attempts - attempt == 0) return error.FileBusy;
|
||||
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
attempt += 1;
|
||||
syscall = try .start();
|
||||
continue;
|
||||
|
|
@ -10823,9 +10829,6 @@ fn nowPosix(clock: Io.Clock) Io.Timestamp {
|
|||
fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Timestamp {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
_ = t;
|
||||
return nowInner(clock);
|
||||
}
|
||||
fn nowInner(clock: Io.Clock) Io.Timestamp {
|
||||
return switch (native_os) {
|
||||
.windows => nowWindows(clock),
|
||||
.wasi => nowWasi(clock),
|
||||
|
|
@ -15582,7 +15585,10 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
|
|||
// this other than retrying the creation after the OS finishes
|
||||
// the deletion.
|
||||
syscall.finish();
|
||||
try parking_sleep.windowsRetrySleep(1);
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds(1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
syscall = try .start();
|
||||
continue;
|
||||
},
|
||||
|
|
@ -16955,13 +16961,9 @@ const parking_futex = struct {
|
|||
///
|
||||
/// * Removing the `Waiter` from `Bucket.waiters`
|
||||
/// * Decrementing `Bucket.num_waiters`
|
||||
/// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time,
|
||||
/// so must not be referenced again)
|
||||
/// * Unparking the thread (last, so that the unparked thread definitely sees `done`)
|
||||
/// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope
|
||||
/// while it is still in the `Bucket`).
|
||||
thread_status: *std.atomic.Value(Thread.Status),
|
||||
/// Initially `false`. Whoever updates `thread_status` to `.none`/`.canceling` will update
|
||||
/// this to `true` once they are done with the `Waiter`, just before unparking `tid`.
|
||||
done: std.atomic.Value(bool),
|
||||
};
|
||||
|
||||
fn bucketForAddress(address: usize) *Bucket {
|
||||
|
|
@ -17000,7 +17002,6 @@ const parking_futex = struct {
|
|||
.address = @intFromPtr(ptr),
|
||||
.tid = self_tid,
|
||||
.thread_status = undefined, // populated in critical section
|
||||
.done = .init(false),
|
||||
};
|
||||
|
||||
var status_buf: std.atomic.Value(Thread.Status) = undefined;
|
||||
|
|
@ -17058,44 +17059,41 @@ const parking_futex = struct {
|
|||
bucket.waiters.append(&waiter.node);
|
||||
}
|
||||
|
||||
const deadline: ?Io.Clock.Timestamp = switch (timeout) {
|
||||
.none => null,
|
||||
.duration => |d| .{
|
||||
.raw = nowInner(d.clock).addDuration(d.raw),
|
||||
.clock = d.clock,
|
||||
},
|
||||
.deadline => |d| d,
|
||||
};
|
||||
while (park(deadline, ptr)) {
|
||||
if (waiter.done.load(.acquire)) return; // all done!
|
||||
if (park(timeout, ptr)) {
|
||||
// We were unparked by either `wake` or cancelation, so our current status is either
|
||||
// `.none` or `.canceling`. In either case, they've already removed `waiter` from
|
||||
// `bucket`, so we have nothing more to do!
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => switch (waiter.thread_status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
).cancelation) {
|
||||
.parked => {
|
||||
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
|
||||
// our responsibility to remove `waiter` from `bucket`.
|
||||
mutexLock(&bucket.mutex);
|
||||
defer mutexUnlock(&bucket.mutex);
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
},
|
||||
.none, .canceling => {
|
||||
// Race condition: the timeout was reached, then `wake` or a cancelation tried
|
||||
// to update our status. They won the race, so wait for them to do the cleanup.
|
||||
// They'll tell us by setting `waiter.done` and unparking us.
|
||||
while (!waiter.done.load(.acquire)) {
|
||||
park(null, ptr) catch |e| switch (e) {
|
||||
error.Timeout => {
|
||||
// We're not out of the woods yet: an unpark could race with the timeout.
|
||||
const old_status = waiter.thread_status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.parked => {
|
||||
// No race. It is our responsibility to remove `waiter` from `bucket`.
|
||||
// New status is `.none`.
|
||||
bucket.mutex.lock();
|
||||
defer bucket.mutex.unlock();
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
},
|
||||
.none, .canceling => {
|
||||
// Race condition: the timeout was reached, then `wake` or a canceler tried
|
||||
// to unpark us. Whoever did that will remove us from `bucket`. Wait for
|
||||
// that (and drop the unpark request in doing so).
|
||||
// New status is `.none` or `.canceling` respectively.
|
||||
park(.none, ptr) catch |e| switch (e) {
|
||||
error.Timeout => unreachable,
|
||||
};
|
||||
}
|
||||
},
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
},
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -17144,6 +17142,9 @@ const parking_futex = struct {
|
|||
waiter.node.next = waking_head;
|
||||
waking_head = &waiter.node;
|
||||
num_removed += 1;
|
||||
// Signal to `waiter` that they're about to be unparked, in case we're racing with their
|
||||
// timeout. See corresponding logic in `wake`.
|
||||
waiter.address = 0;
|
||||
}
|
||||
|
||||
_ = bucket.num_waiters.fetchSub(num_removed, .monotonic);
|
||||
|
|
@ -17158,8 +17159,6 @@ const parking_futex = struct {
|
|||
const waiter: *Waiter = @fieldParentPtr("node", node);
|
||||
unpark_buf[unpark_len] = waiter.tid;
|
||||
unpark_len += 1;
|
||||
waiter.done.store(true, .release);
|
||||
// `waiter.*` is now potentially invalid so must not be referenced again.
|
||||
if (unpark_len == unpark_buf.len) {
|
||||
unpark(&unpark_buf, ptr);
|
||||
unpark_len = 0;
|
||||
|
|
@ -17176,14 +17175,13 @@ const parking_futex = struct {
|
|||
defer mutexUnlock(&bucket.mutex);
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
|
||||
}
|
||||
};
|
||||
const parking_sleep = struct {
|
||||
comptime {
|
||||
assert(use_parking_sleep);
|
||||
}
|
||||
fn sleep(deadline: ?Io.Clock.Timestamp) Io.Cancelable!void {
|
||||
fn sleep(timeout: Io.Timeout) Io.Cancelable!void {
|
||||
const opt_thread = Thread.current;
|
||||
cancelable: {
|
||||
const thread = opt_thread orelse break :cancelable;
|
||||
|
|
@ -17192,90 +17190,87 @@ const parking_sleep = struct {
|
|||
.unblocked => {},
|
||||
}
|
||||
thread.futex_waiter = null;
|
||||
const orig_status = thread.status.fetchOr(
|
||||
.{ .cancelation = @enumFromInt(0b001), .awaitable = .null },
|
||||
.release, // release `thread.futex_waiter`
|
||||
);
|
||||
switch (orig_status.cancelation) {
|
||||
.none => {}, // status is now `.parked`
|
||||
.canceling => return error.Canceled, // status is now `.canceled`
|
||||
.canceled => break :cancelable, // status is still `.canceled`
|
||||
.parked => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
while (park(deadline, null)) {
|
||||
// Either a cancelation or a spurious unpark; let's see which!
|
||||
switch (thread.status.load(.monotonic).cancelation) {
|
||||
.parked => continue, // spurious unpark; keep sleeping
|
||||
.canceling => {
|
||||
// We got canceled; update our state and return.
|
||||
thread.status.store(
|
||||
.{ .cancelation = .canceled, .awaitable = orig_status.awaitable },
|
||||
.monotonic,
|
||||
);
|
||||
return error.Canceled;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
{
|
||||
const old_status = thread.status.fetchOr(
|
||||
.{ .cancelation = @enumFromInt(0b001), .awaitable = .null },
|
||||
.release, // release `thread.futex_waiter`
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.none => {}, // status is now `.parked`
|
||||
.canceling => return error.Canceled, // status is now `.canceled`
|
||||
.canceled => break :cancelable, // status is still `.canceled`
|
||||
.parked => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => switch (thread.status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
}
|
||||
if (park(timeout, null)) {
|
||||
// The only reason this could possibly happen is cancelation.
|
||||
const old_status = thread.status.load(.monotonic);
|
||||
assert(old_status.cancelation == .canceling);
|
||||
thread.status.store(
|
||||
.{ .cancelation = .canceled, .awaitable = old_status.awaitable },
|
||||
.monotonic,
|
||||
).cancelation) {
|
||||
// We updated our own status from `.parked` to `.none`.
|
||||
.parked => return, // new status is `.none`
|
||||
.canceling => {
|
||||
// Timeout raced with a cancelation. We don't need to do anything, but
|
||||
// the next `park` on this thread will see a spurious unpark.
|
||||
// Status is still `.canceling`.
|
||||
return;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
);
|
||||
return error.Canceled;
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => {
|
||||
// We're not out of the woods yet: an unpark could race with the timeout.
|
||||
const old_status = thread.status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.parked => return, // No race; new status is `.none`
|
||||
.canceling => {
|
||||
// Race condition: the timeout was reached, then someone tried to unpark
|
||||
// us for a cancelation. Whoever did that will have called `unpark`, so
|
||||
// drop that unpark request by waiting for it.
|
||||
// Status is still `.canceling`.
|
||||
park(.none, null) catch |e| switch (e) {
|
||||
error.Timeout => unreachable,
|
||||
};
|
||||
return;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_alertable => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
.blocked_alertable_canceling => unreachable,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// Uncancelable sleep; this case is very simple.
|
||||
while (park(deadline, null)) {
|
||||
// Definitely spurious; nothing to do.
|
||||
// Uncancelable sleep; we expect not to be manually unparked.
|
||||
if (park(timeout, null)) {
|
||||
unreachable; // unexpected unpark
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => return,
|
||||
}
|
||||
}
|
||||
/// Sleep for approximately `ms` awake milliseconds in an attempt to work around Windows kernel bugs.
|
||||
fn windowsRetrySleep(ms: u32) (Io.Cancelable || Io.UnexpectedError)!void {
|
||||
const now_timestamp = nowWindows(.awake); // '.awake' is supported on Windows
|
||||
const deadline = now_timestamp.addDuration(.fromMilliseconds(ms));
|
||||
try parking_sleep.sleep(.{ .raw = deadline, .clock = .awake });
|
||||
}
|
||||
};
|
||||
|
||||
/// Spurious wakeups are possible.
|
||||
///
|
||||
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
|
||||
fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
|
||||
fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
|
||||
comptime assert(use_parking_futex or use_parking_sleep);
|
||||
switch (native_os) {
|
||||
.windows => {
|
||||
var timeout_buf: windows.LARGE_INTEGER = undefined;
|
||||
const raw_timeout: ?*windows.LARGE_INTEGER = if (opt_deadline) |deadline| timeout: {
|
||||
const now_timestamp = nowWindows(deadline.clock);
|
||||
const nanoseconds = now_timestamp.durationTo(deadline.raw).nanoseconds;
|
||||
timeout_buf = @intCast(@divTrunc(-nanoseconds, 100));
|
||||
break :timeout &timeout_buf;
|
||||
} else null;
|
||||
const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) {
|
||||
.none => null,
|
||||
.deadline => |timestamp| continue :timeout .{ .duration = .{
|
||||
.clock = timestamp.clock,
|
||||
.raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw),
|
||||
} },
|
||||
.duration => |duration| {
|
||||
_ = duration.clock; // Windows only supports monotonic
|
||||
timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100));
|
||||
break :timeout &timeout_buf;
|
||||
},
|
||||
};
|
||||
// `RtlWaitOnAddress` passes the futex address in as the first argument to this call,
|
||||
// but it's unclear what that actually does, especially since `NtAlertThreadByThreadId`
|
||||
// does *not* accept the address so the kernel can't really be using it as a hint. An
|
||||
|
|
@ -17297,13 +17292,20 @@ fn park(opt_deadline: ?Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{T
|
|||
},
|
||||
.netbsd => {
|
||||
var ts_buf: posix.timespec = undefined;
|
||||
const ts: ?*posix.timespec, const clock_real: bool = if (opt_deadline) |deadline| timeout: {
|
||||
ts_buf = timestampToPosix(deadline.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, deadline.clock == .real };
|
||||
} else .{ null, true };
|
||||
const ts: ?*posix.timespec, const abstime: bool, const clock_real: bool = switch (timeout) {
|
||||
.none => .{ null, false, false },
|
||||
.deadline => |timestamp| timeout: {
|
||||
ts_buf = timestampToPosix(timestamp.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, true, timestamp.clock == .real };
|
||||
},
|
||||
.duration => |duration| timeout: {
|
||||
ts_buf = timestampToPosix(duration.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, false, duration.clock == .real };
|
||||
},
|
||||
};
|
||||
switch (posix.errno(std.c._lwp_park(
|
||||
if (clock_real) .REALTIME else .MONOTONIC,
|
||||
.{ .ABSTIME = true },
|
||||
.{ .ABSTIME = abstime },
|
||||
ts,
|
||||
0,
|
||||
addr_hint,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue