std: finish implementing futexWait with timer

This commit is contained in:
Andrew Kelley 2025-12-10 18:14:43 -08:00
parent 3a6e15449b
commit 95b0399d1b
3 changed files with 39 additions and 35 deletions

View file

@ -1147,7 +1147,6 @@ const GroupClosure = struct {
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const event: *Io.Event = @ptrCast(&group.context);
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;

View file

@ -24,7 +24,7 @@ terminal_mode: TerminalMode,
update_worker: ?Io.Future(void),
/// Atomically set by SIGWINCH as well as the root done() function.
redraw_event: Io.ResetEvent,
redraw_event: Io.Event,
/// Indicates a request to shut down and reset global state.
/// Accessed atomically.
done: bool,
@ -333,8 +333,9 @@ pub const Node = struct {
}
} else {
@atomicStore(bool, &global_progress.done, true, .monotonic);
global_progress.redraw_event.set();
if (global_progress.update_worker) |worker| worker.await(global_progress.io);
const io = global_progress.io;
global_progress.redraw_event.set(io);
if (global_progress.update_worker) |*worker| worker.await(io);
}
}
@ -421,7 +422,7 @@ pub const StartFailure = union(enum) {
unstarted,
spawn_ipc_worker: error{ConcurrencyUnavailable},
spawn_update_worker: error{ConcurrencyUnavailable},
parse_env_var: error{},
parse_env_var: error{ InvalidCharacter, Overflow },
};
const node_storage_buffer_len = 83;
@ -452,7 +453,7 @@ const noop_impl = builtin.single_threaded or switch (builtin.os.tag) {
/// Call `Node.end` when done.
///
/// If an error occurs, `start_failure` will be populated.
pub fn start(options: Options, io: Io) Node {
pub fn start(io: Io, options: Options) Node {
// Ensure there is only 1 global Progress object.
if (global_progress.node_end_index != 0) {
debug_start_trace.dump();
@ -467,8 +468,8 @@ pub fn start(options: Options, io: Io) Node {
assert(options.draw_buffer.len >= 200);
global_progress.draw_buffer = options.draw_buffer;
global_progress.refresh_rate_ns = options.refresh_rate_ns;
global_progress.initial_delay_ns = options.initial_delay_ns;
global_progress.refresh_rate_ns = @intCast(options.refresh_rate_ns.toNanoseconds());
global_progress.initial_delay_ns = @intCast(options.initial_delay_ns.toNanoseconds());
if (noop_impl)
return Node.none;
@ -541,9 +542,13 @@ pub fn setStatus(new_status: Status) void {
}
/// Returns whether a resize is needed to learn the terminal size.
fn wait(timeout_ns: u64) bool {
const resize_flag = if (global_progress.redraw_event.timedWait(timeout_ns)) |_| true else |err| switch (err) {
error.Timeout => false,
fn wait(io: Io, timeout_ns: u64) bool {
const timeout: Io.Timeout = .{ .duration = .{
.clock = .awake,
.raw = .fromNanoseconds(timeout_ns),
} };
const resize_flag = if (global_progress.redraw_event.waitTimeout(io, timeout)) |_| true else |err| switch (err) {
error.Timeout, error.Canceled => false,
};
global_progress.redraw_event.reset();
return resize_flag or (global_progress.cols == 0);
@ -555,34 +560,34 @@ fn updateThreadRun(io: Io) void {
var serialized_buffer: Serialized.Buffer = undefined;
{
const resize_flag = wait(global_progress.initial_delay_ns);
const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
if (io.tryLockStderrWriter(&.{})) |w| {
if (io.tryLockStderrWriter(&.{})) |fw| {
defer io.unlockStderrWriter();
global_progress.need_clear = true;
w.writeAll(buffer) catch return;
fw.writeAllUnescaped(buffer) catch return;
}
}
while (true) {
const resize_flag = wait(global_progress.refresh_rate_ns);
const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
const w = io.lockStderrWriter(&.{}) catch return;
const fw = io.lockStderrWriter(&.{}) catch return;
defer io.unlockStderrWriter();
return clearWrittenWithEscapeCodes(w) catch {};
return clearWrittenWithEscapeCodes(fw) catch {};
}
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
if (io.tryLockStderrWriter(&.{})) |w| {
if (io.tryLockStderrWriter(&.{})) |fw| {
defer io.unlockStderrWriter();
global_progress.need_clear = true;
w.writeAll(buffer) catch return;
fw.writeAllUnescaped(buffer) catch return;
}
}
}
@ -599,22 +604,22 @@ fn windowsApiUpdateThreadRun(io: Io) void {
var serialized_buffer: Serialized.Buffer = undefined;
{
const resize_flag = wait(global_progress.initial_delay_ns);
const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
if (io.tryLockStderrWriter()) |w| {
if (io.tryLockStderrWriter()) |fw| {
defer io.unlockStderrWriter();
windowsApiWriteMarker();
global_progress.need_clear = true;
w.writeAll(buffer) catch return;
fw.writeAllUnescaped(buffer) catch return;
windowsApiMoveToMarker(nl_n) catch return;
}
}
while (true) {
const resize_flag = wait(global_progress.refresh_rate_ns);
const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
_ = io.lockStderrWriter() catch return;
@ -625,24 +630,24 @@ fn windowsApiUpdateThreadRun(io: Io) void {
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
if (io.tryLockStderrWriter()) |w| {
if (io.tryLockStderrWriter()) |fw| {
defer io.unlockStderrWriter();
clearWrittenWindowsApi() catch return;
windowsApiWriteMarker();
global_progress.need_clear = true;
w.writeAll(buffer) catch return;
fw.writeAllUnescaped(buffer) catch return;
windowsApiMoveToMarker(nl_n) catch return;
}
}
}
fn ipcThreadRun(io: Io, file: Io.File) anyerror!void {
fn ipcThreadRun(io: Io, file: Io.File) void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
{
_ = wait(global_progress.initial_delay_ns);
_ = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic))
return;
@ -654,7 +659,7 @@ fn ipcThreadRun(io: Io, file: Io.File) anyerror!void {
}
while (true) {
_ = wait(global_progress.refresh_rate_ns);
_ = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic))
return;
@ -1504,7 +1509,7 @@ fn handleSigWinch(sig: posix.SIG, info: *const posix.siginfo_t, ctx_ptr: ?*anyop
_ = info;
_ = ctx_ptr;
assert(sig == .WINCH);
global_progress.redraw_event.set();
global_progress.redraw_event.set(global_progress.io);
}
const have_sigwinch = switch (builtin.os.tag) {

View file

@ -816,9 +816,9 @@ pub fn writeStackTrace(st: *const StackTrace, writer: *Writer, fwm: File.Writer.
}
/// A thin wrapper around `writeStackTrace` which writes to stderr and ignores write errors.
pub fn dumpStackTrace(st: *const StackTrace) void {
const stderr, const tty_config = lockStderrWriter(&.{});
const stderr = lockStderrWriter(&.{});
defer unlockStderrWriter();
writeStackTrace(st, stderr, tty_config) catch |err| switch (err) {
writeStackTrace(st, &stderr.interface, stderr.mode) catch |err| switch (err) {
error.WriteFailed => {},
};
}
@ -1682,21 +1682,21 @@ pub fn ConfigurableTrace(comptime size: usize, comptime stack_frame_count: usize
pub fn dump(t: @This()) void {
if (!enabled) return;
const stderr, const tty_config = lockStderrWriter(&.{});
const stderr = lockStderrWriter(&.{});
defer unlockStderrWriter();
const end = @min(t.index, size);
for (t.addrs[0..end], 0..) |frames_array, i| {
stderr.print("{s}:\n", .{t.notes[i]}) catch return;
stderr.interface.print("{s}:\n", .{t.notes[i]}) catch return;
var frames_array_mutable = frames_array;
const frames = mem.sliceTo(frames_array_mutable[0..], 0);
const stack_trace: StackTrace = .{
.index = frames.len,
.instruction_addresses = frames,
};
writeStackTrace(&stack_trace, stderr, tty_config) catch return;
writeStackTrace(&stack_trace, &stderr.interface, stderr.mode) catch return;
}
if (t.index > end) {
stderr.print("{d} more traces not shown; consider increasing trace size\n", .{
stderr.interface.print("{d} more traces not shown; consider increasing trace size\n", .{
t.index - end,
}) catch return;
}