mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 01:04:43 +01:00
std.Io.Threaded: spurious unparks are possible
Apparently the thread parking APIs on Windows and NetBSD aren't as good as I thought---or, at least, the way they're *used* makes them not as good. It's perfectly possible to use these APIs in a way where you don't trigger spurious wakeups, but standard primitives (SRWLOCK on Windows, pthread bits on NetBSD) are perfectly happy to leave pending unparks sitting around, meaning in practice you have to assume spurious unparks are possible. This brings me great sadness... but we soldier on!
This commit is contained in:
parent
4df6119335
commit
c518593e97
1 changed files with 122 additions and 122 deletions
|
|
@ -3963,10 +3963,7 @@ pub fn dirOpenFileWtf16(
|
|||
// kernel bug with retry attempts.
|
||||
syscall.finish();
|
||||
if (max_attempts - attempt == 0) return error.SharingViolation;
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
|
||||
attempt += 1;
|
||||
syscall = try .start();
|
||||
continue;
|
||||
|
|
@ -3988,10 +3985,7 @@ pub fn dirOpenFileWtf16(
|
|||
// fixed by sleeping and retrying until the error goes away.
|
||||
syscall.finish();
|
||||
if (max_attempts - attempt == 0) return error.SharingViolation;
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds((@as(u32, 1) << attempt) >> 1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
try parking_sleep.windowsRetrySleep((@as(u32, 1) << attempt) >> 1);
|
||||
attempt += 1;
|
||||
syscall = try .start();
|
||||
continue;
|
||||
|
|
@ -9638,6 +9632,9 @@ fn nowPosix(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
|||
fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
_ = t;
|
||||
return nowInner(clock);
|
||||
}
|
||||
fn nowInner(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
||||
return switch (native_os) {
|
||||
.windows => nowWindows(clock),
|
||||
.wasi => nowWasi(clock),
|
||||
|
|
@ -9687,7 +9684,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
|||
|
||||
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
if (use_parking_sleep) return parking_sleep.sleep(timeout);
|
||||
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t)));
|
||||
if (native_os == .wasi) return sleepWasi(t, timeout);
|
||||
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);
|
||||
return sleepNanosleep(t, timeout);
|
||||
|
|
@ -14197,10 +14194,7 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
|
|||
// this other than retrying the creation after the OS finishes
|
||||
// the deletion.
|
||||
syscall.finish();
|
||||
try parking_sleep.sleep(.{ .duration = .{
|
||||
.raw = .fromMilliseconds(1),
|
||||
.clock = .awake,
|
||||
} });
|
||||
try parking_sleep.windowsRetrySleep(1);
|
||||
syscall = try .start();
|
||||
continue;
|
||||
},
|
||||
|
|
@ -15570,9 +15564,13 @@ const parking_futex = struct {
|
|||
///
|
||||
/// * Removing the `Waiter` from `Bucket.waiters`
|
||||
/// * Decrementing `Bucket.num_waiters`
|
||||
/// * Unparking the thread (*after* the above, so that the `Waiter` does not go out of scope
|
||||
/// while it is still in the `Bucket`).
|
||||
/// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time,
|
||||
/// so must not be referenced again)
|
||||
/// * Unparking the thread (last, so that the unparked thread definitely sees `done`)
|
||||
thread_status: *std.atomic.Value(Thread.Status),
|
||||
/// Initially `false`. Whoever updates `thread_status` to `.none`/`.canceling` will update
|
||||
/// this to `true` once they are done with the `Waiter`, just before unparking `tid`.
|
||||
done: std.atomic.Value(bool),
|
||||
};
|
||||
|
||||
fn bucketForAddress(address: usize) *Bucket {
|
||||
|
|
@ -15611,6 +15609,7 @@ const parking_futex = struct {
|
|||
.address = @intFromPtr(ptr),
|
||||
.tid = self_tid,
|
||||
.thread_status = undefined, // populated in critical section
|
||||
.done = .init(false),
|
||||
};
|
||||
|
||||
var status_buf: std.atomic.Value(Thread.Status) = undefined;
|
||||
|
|
@ -15667,40 +15666,43 @@ const parking_futex = struct {
|
|||
bucket.waiters.append(&waiter.node);
|
||||
}
|
||||
|
||||
if (park(timeout, ptr)) {
|
||||
// We were unparked by either `wake` or cancelation, so our current status is either
|
||||
// `.none` or `.canceling`. In either case, they've already removed `waiter` from
|
||||
// `bucket`, so we have nothing more to do!
|
||||
const deadline: ?Io.Clock.Timestamp = switch (timeout) {
|
||||
.none => null,
|
||||
.duration => |d| .{
|
||||
.raw = (nowInner(d.clock) catch unreachable).addDuration(d.raw),
|
||||
.clock = d.clock,
|
||||
},
|
||||
.deadline => |d| d,
|
||||
};
|
||||
while (park(deadline, ptr)) {
|
||||
if (waiter.done.load(.acquire)) return; // all done!
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => {
|
||||
// We're not out of the woods yet: an unpark could race with the timeout.
|
||||
const old_status = waiter.thread_status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.parked => {
|
||||
// No race. It is our responsibility to remove `waiter` from `bucket`.
|
||||
// New status is `.none`.
|
||||
bucket.mutex.lock();
|
||||
defer bucket.mutex.unlock();
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
},
|
||||
.none, .canceling => {
|
||||
// Race condition: the timeout was reached, then `wake` or a canceler tried
|
||||
// to unpark us. Whoever did that will remove us from `bucket`. Wait for
|
||||
// that (and drop the unpark request in doing so).
|
||||
// New status is `.none` or `.canceling` respectively.
|
||||
park(.none, ptr) catch |e| switch (e) {
|
||||
error.Timeout => switch (waiter.thread_status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
).cancelation) {
|
||||
.parked => {
|
||||
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
|
||||
// our responsibility to remove `waiter` from `bucket`.
|
||||
bucket.mutex.lock();
|
||||
defer bucket.mutex.unlock();
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
},
|
||||
.none, .canceling => {
|
||||
// Race condition: the timeout was reached, then `wake` or a cancelation tried
|
||||
// to update our status. They won the race, so wait for them to do the cleanup.
|
||||
// They'll tell us by setting `waiter.done` and unparking us.
|
||||
while (!waiter.done.load(.acquire)) {
|
||||
park(null, ptr) catch |e| switch (e) {
|
||||
error.Timeout => unreachable,
|
||||
};
|
||||
},
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
}
|
||||
},
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -15748,9 +15750,6 @@ const parking_futex = struct {
|
|||
waiter.node.next = waking_head;
|
||||
waking_head = &waiter.node;
|
||||
num_removed += 1;
|
||||
// Signal to `waiter` that they're about to be unparked, in case we're racing with their
|
||||
// timeout. See corresponding logic in `wake`.
|
||||
waiter.address = 0;
|
||||
}
|
||||
|
||||
_ = bucket.num_waiters.fetchSub(num_removed, .monotonic);
|
||||
|
|
@ -15765,6 +15764,8 @@ const parking_futex = struct {
|
|||
const waiter: *Waiter = @fieldParentPtr("node", node);
|
||||
unpark_buf[unpark_len] = waiter.tid;
|
||||
unpark_len += 1;
|
||||
waiter.done.store(true, .release);
|
||||
// `waiter.*` is now potentially invalid so must not be referenced again.
|
||||
if (unpark_len == unpark_buf.len) {
|
||||
unpark(&unpark_buf, ptr);
|
||||
unpark_len = 0;
|
||||
|
|
@ -15781,13 +15782,14 @@ const parking_futex = struct {
|
|||
defer bucket.mutex.unlock();
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
|
||||
}
|
||||
};
|
||||
const parking_sleep = struct {
|
||||
comptime {
|
||||
assert(use_parking_sleep);
|
||||
}
|
||||
fn sleep(timeout: Io.Timeout) Io.Cancelable!void {
|
||||
fn sleep(deadline: ?Io.Clock.Timestamp) Io.SleepError!void {
|
||||
const opt_thread = Thread.current;
|
||||
cancelable: {
|
||||
const thread = opt_thread orelse break :cancelable;
|
||||
|
|
@ -15796,85 +15798,90 @@ const parking_sleep = struct {
|
|||
.unblocked => {},
|
||||
}
|
||||
thread.futex_waiter = null;
|
||||
{
|
||||
const old_status = thread.status.fetchOr(
|
||||
.{ .cancelation = @enumFromInt(0b001), .awaitable = .null },
|
||||
.release, // release `thread.futex_waiter`
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.none => {}, // status is now `.parked`
|
||||
.canceling => return error.Canceled, // status is now `.canceled`
|
||||
.canceled => break :cancelable, // status is still `.canceled`
|
||||
.parked => unreachable,
|
||||
const orig_status = thread.status.fetchOr(
|
||||
.{ .cancelation = @enumFromInt(0b001), .awaitable = .null },
|
||||
.release, // release `thread.futex_waiter`
|
||||
);
|
||||
switch (orig_status.cancelation) {
|
||||
.none => {}, // status is now `.parked`
|
||||
.canceling => return error.Canceled, // status is now `.canceled`
|
||||
.canceled => break :cancelable, // status is still `.canceled`
|
||||
.parked => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
while (park(deadline, null)) {
|
||||
// Either a cancelation or a spurious unpark; let's see which!
|
||||
switch (thread.status.load(.monotonic).cancelation) {
|
||||
.parked => continue, // spurious unpark; keep sleeping
|
||||
.canceling => {
|
||||
// We got canceled; update our state and return.
|
||||
thread.status.store(
|
||||
.{ .cancelation = .canceled, .awaitable = orig_status.awaitable },
|
||||
.monotonic,
|
||||
);
|
||||
return error.Canceled;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
}
|
||||
if (park(timeout, null)) {
|
||||
// The only reason this could possibly happen is cancelation.
|
||||
const old_status = thread.status.load(.monotonic);
|
||||
assert(old_status.cancelation == .canceling);
|
||||
thread.status.store(
|
||||
.{ .cancelation = .canceled, .awaitable = old_status.awaitable },
|
||||
.monotonic,
|
||||
);
|
||||
return error.Canceled;
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => {
|
||||
// We're not out of the woods yet: an unpark could race with the timeout.
|
||||
const old_status = thread.status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
);
|
||||
switch (old_status.cancelation) {
|
||||
.parked => return, // No race; new status is `.none`
|
||||
.canceling => {
|
||||
// Race condition: the timeout was reached, then someone tried to unpark
|
||||
// us for a cancelation. Whoever did that will have called `unpark`, so
|
||||
// drop that unpark request by waiting for it.
|
||||
// Status is still `.canceling`.
|
||||
park(.none, null) catch |e| switch (e) {
|
||||
error.Timeout => unreachable,
|
||||
};
|
||||
return;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
}
|
||||
error.Timeout => switch (thread.status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
.monotonic,
|
||||
).cancelation) {
|
||||
// We updated our own status from `.parked` to `.none`.
|
||||
.parked => return, // new status is `.none`
|
||||
.canceling => {
|
||||
// Timeout raced with a cancelation. We don't need to do anything, but
|
||||
// the next `park` on this thread will see a spurious unpark.
|
||||
// Status is still `.canceling`.
|
||||
return;
|
||||
},
|
||||
.none => unreachable,
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
.blocked_windows_dns => unreachable,
|
||||
.blocked_canceling => unreachable,
|
||||
},
|
||||
}
|
||||
}
|
||||
// Uncancelable sleep; we expect not to be manually unparked.
|
||||
if (park(timeout, null)) {
|
||||
unreachable; // unexpected unpark
|
||||
// Uncancelable sleep; this case is very simple.
|
||||
while (park(deadline, null)) {
|
||||
// Definitely spurious; nothing to do.
|
||||
} else |err| switch (err) {
|
||||
error.Timeout => return,
|
||||
}
|
||||
}
|
||||
/// Sleep for approximately `ms` awake milliseconds in an attempt to work around Windows kernel bugs.
|
||||
fn windowsRetrySleep(ms: u32) (Io.Cancelable || Io.UnexpectedError)!void {
|
||||
const now_timestamp = nowWindows(.awake) catch unreachable; // '.awake' is supported on Windows
|
||||
const deadline = now_timestamp.addDuration(.fromMilliseconds(ms));
|
||||
parking_sleep.sleep(.{ .raw = deadline, .clock = .awake }) catch |err| switch (err) {
|
||||
error.UnsupportedClock => unreachable,
|
||||
else => |e| return e,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/// Spurious wakeups are possible.
|
||||
///
|
||||
/// `addr_hint` has no semantic effect, but may allow the OS to optimize this operation.
|
||||
fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
|
||||
fn park(opt_deadline: ?std.Io.Clock.Timestamp, addr_hint: ?*const anyopaque) error{Timeout}!void {
|
||||
comptime assert(use_parking_futex or use_parking_sleep);
|
||||
switch (builtin.target.os.tag) {
|
||||
.windows => {
|
||||
var timeout_buf: windows.LARGE_INTEGER = undefined;
|
||||
const raw_timeout: ?*windows.LARGE_INTEGER = timeout: switch (timeout) {
|
||||
.none => null,
|
||||
.deadline => |timestamp| continue :timeout .{ .duration = .{
|
||||
.clock = timestamp.clock,
|
||||
.raw = (nowWindows(timestamp.clock) catch unreachable).durationTo(timestamp.raw),
|
||||
} },
|
||||
.duration => |duration| {
|
||||
_ = duration.clock; // Windows only supports monotonic
|
||||
timeout_buf = @intCast(@divTrunc(-duration.raw.nanoseconds, 100));
|
||||
break :timeout &timeout_buf;
|
||||
},
|
||||
};
|
||||
const raw_timeout: ?*windows.LARGE_INTEGER = if (opt_deadline) |deadline| timeout: {
|
||||
const now_timestamp = nowWindows(deadline.clock) catch unreachable;
|
||||
const nanoseconds = now_timestamp.durationTo(deadline.raw).nanoseconds;
|
||||
timeout_buf = @intCast(@divTrunc(-nanoseconds, 100));
|
||||
break :timeout &timeout_buf;
|
||||
} else null;
|
||||
// `RtlWaitOnAddress` passes the futex address in as the first argument to this call,
|
||||
// but it's unclear what that actually does, especially since `NtAlertThreadByThreadId`
|
||||
// does *not* accept the address so the kernel can't really be using it as a hint. An
|
||||
|
|
@ -15896,20 +15903,13 @@ fn park(timeout: Io.Timeout, addr_hint: ?*const anyopaque) error{Timeout}!void {
|
|||
},
|
||||
.netbsd => {
|
||||
var ts_buf: posix.timespec = undefined;
|
||||
const ts: ?*posix.timespec, const abstime: bool, const clock_real: bool = switch (timeout) {
|
||||
.none => .{ null, false, false },
|
||||
.deadline => |timestamp| timeout: {
|
||||
ts_buf = timestampToPosix(timestamp.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, true, timestamp.clock == .real };
|
||||
},
|
||||
.duration => |duration| timeout: {
|
||||
ts_buf = timestampToPosix(duration.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, false, duration.clock == .real };
|
||||
},
|
||||
};
|
||||
const ts: ?*posix.timespec, const clock_real: bool = if (opt_deadline) |deadline| timeout: {
|
||||
ts_buf = timestampToPosix(deadline.raw.nanoseconds);
|
||||
break :timeout .{ &ts_buf, deadline.clock == .real };
|
||||
} else .{ null, true };
|
||||
switch (posix.errno(std.c._lwp_park(
|
||||
if (clock_real) .REALTIME else .MONOTONIC,
|
||||
.{ .ABSTIME = abstime },
|
||||
.{ .ABSTIME = true },
|
||||
ts,
|
||||
0,
|
||||
addr_hint,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue