diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index b9581196c0..b518826843 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -386,10 +386,14 @@ pub const ZigProcess = struct { child: std.process.Child, multi_reader_buffer: Io.File.MultiReader.Buffer(2), multi_reader: Io.File.MultiReader, - progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void, + progress_ipc_index: ?if (std.Progress.have_ipc) std.Progress.Ipc.Index else noreturn, pub const StreamEnum = enum { stdout, stderr }; + pub fn saveState(zp: *ZigProcess, prog_node: std.Progress.Node) void { + zp.progress_ipc_index = if (std.Progress.have_ipc) prog_node.takeIpcIndex() else null; + } + pub fn deinit(zp: *ZigProcess, io: Io) void { zp.child.kill(io); zp.multi_reader.deinit(); @@ -417,7 +421,14 @@ pub fn evalZigProcess( if (s.getZigProcess()) |zp| update: { assert(watch); - if (std.Progress.have_ipc) if (zp.progress_ipc_fd) |fd| prog_node.setIpcFd(fd); + if (zp.progress_ipc_index) |ipc_index| prog_node.setIpcIndex(ipc_index); + zp.progress_ipc_index = null; + var exited = false; + defer if (exited) { + s.cast(Compile).?.zig_process = null; + zp.deinit(io); + gpa.destroy(zp); + } else zp.saveState(prog_node); const result = zigProcessUpdate(s, zp, watch, web_server, gpa) catch |err| switch (err) { error.BrokenPipe, error.EndOfStream => |reason| { std.log.info("{s} restart required: {t}", .{ argv[0], reason }); @@ -426,7 +437,7 @@ pub fn evalZigProcess( return s.fail("unable to wait for {s}: {t}", .{ argv[0], e }); }; _ = term; - s.clearZigProcess(gpa); + exited = true; break :update; }, else => |e| return e, @@ -442,7 +453,7 @@ pub fn evalZigProcess( return s.fail("unable to wait for {s}: {t}", .{ argv[0], e }); }; s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0; - s.clearZigProcess(gpa); + exited = true; try handleChildProcessTerm(s, term); return error.MakeFailed; } @@ -467,19 +478,16 @@ pub fn evalZigProcess( .progress_node = prog_node, }) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err }); - zp.* = .{ - .child = zp.child, - .multi_reader_buffer = undefined, - .multi_reader = undefined, - .progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {}, - }; zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{ zp.child.stdout.?, zp.child.stderr.?, }); - if (watch) s.setZigProcess(zp); + if (watch) s.cast(Compile).?.zig_process = zp; defer if (!watch) zp.deinit(io); - const result = try zigProcessUpdate(s, zp, watch, web_server, gpa); + const result = result: { + defer if (watch) zp.saveState(prog_node); + break :result try zigProcessUpdate(s, zp, watch, web_server, gpa); + }; if (!watch) { // Send EOF to stdin. @@ -670,26 +678,6 @@ pub fn getZigProcess(s: *Step) ?*ZigProcess { }; } -fn setZigProcess(s: *Step, zp: *ZigProcess) void { - switch (s.id) { - .compile => s.cast(Compile).?.zig_process = zp, - else => unreachable, - } -} - -fn clearZigProcess(s: *Step, gpa: Allocator) void { - switch (s.id) { - .compile => { - const compile = s.cast(Compile).?; - if (compile.zig_process) |zp| { - gpa.destroy(zp); - compile.zig_process = null; - } - }, - else => unreachable, - } -} - fn sendMessage(io: Io, file: Io.File, tag: std.zig.Client.Message.Tag) !void { const header: std.zig.Client.Message.Header = .{ .tag = tag, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 49132e7918..197cc6c657 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -19,7 +19,7 @@ const Alignment = std.mem.Alignment; const assert = std.debug.assert; const posix = std.posix; const windows = std.os.windows; -const ws2_32 = std.os.windows.ws2_32; +const ws2_32 = windows.ws2_32; /// Thread-safe. /// @@ -2609,8 +2609,7 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { // opportunity to find additional ready operations. break :t 0; } - const max_poll_ms = std.math.maxInt(i32); - break :t max_poll_ms; + break :t std.math.maxInt(i32); }; const syscall = try Syscall.start(); const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms); @@ -2730,6 +2729,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout break :allocation allocation; }; @memcpy(slice[0..poll_buffer_len], storage.slice); + storage.slice = slice; } storage.slice[len] = .{ .fd = file.handle, @@ -2783,9 +2783,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout } const d = deadline orelse break :t -1; const duration = d.durationFromNow(t_io); - if (duration.raw.nanoseconds <= 0) return error.Timeout; - const max_poll_ms = std.math.maxInt(i32); - break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); + break :t @min(@max(0, duration.raw.toMilliseconds()), std.math.maxInt(i32)); }; const syscall = try Syscall.start(); const rc = posix.system.poll(&poll_buffer, poll_storage.len, timeout_ms); @@ -14420,7 +14418,10 @@ const WindowsEnvironStrings = struct { PATHEXT: ?[:0]const u16 = null, fn scan() WindowsEnvironStrings { - const ptr = windows.peb().ProcessParameters.Environment; + const peb = windows.peb(); + assert(windows.ntdll.RtlEnterCriticalSection(peb.FastPebLock) == .SUCCESS); + defer assert(windows.ntdll.RtlLeaveCriticalSection(peb.FastPebLock) == .SUCCESS); + const ptr = peb.ProcessParameters.Environment; var result: WindowsEnvironStrings = .{}; var i: usize = 0; @@ -14446,7 +14447,7 @@ const WindowsEnvironStrings = struct { inline for (@typeInfo(WindowsEnvironStrings).@"struct".fields) |field| { const field_name_w = comptime std.unicode.wtf8ToWtf16LeStringLiteral(field.name); - if (std.os.windows.eqlIgnoreCaseWtf16(key_w, field_name_w)) @field(result, field.name) = value_w; + if (windows.eqlIgnoreCaseWtf16(key_w, field_name_w)) @field(result, field.name) = value_w; } } @@ -14465,29 +14466,46 @@ fn scanEnviron(t: *Threaded) void { // This value expires with any call that modifies the environment, // which is outside of this Io implementation's control, so references // must be short-lived. - const ptr = windows.peb().ProcessParameters.Environment; + const peb = windows.peb(); + assert(windows.ntdll.RtlEnterCriticalSection(peb.FastPebLock) == .SUCCESS); + defer assert(windows.ntdll.RtlLeaveCriticalSection(peb.FastPebLock) == .SUCCESS); + const ptr = peb.ProcessParameters.Environment; var i: usize = 0; while (ptr[i] != 0) { - const key_start = i; // There are some special environment variables that start with =, // so we need a special case to not treat = as a key/value separator // if it's the first character. // https://devblogs.microsoft.com/oldnewthing/20100506-00/?p=14133 - if (ptr[key_start] == '=') i += 1; - + const key_start = i; + if (ptr[i] == '=') i += 1; while (ptr[i] != 0 and ptr[i] != '=') : (i += 1) {} const key_w = ptr[key_start..i]; - if (std.mem.eql(u16, key_w, &.{ 'N', 'O', '_', 'C', 'O', 'L', 'O', 'R' })) { + + const value_start = i + 1; + while (ptr[i] != 0) : (i += 1) {} // skip over '=' and value + const value_w = ptr[value_start..i]; + i += 1; // skip over null byte + + if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'N', 'O', '_', 'C', 'O', 'L', 'O', 'R' })) { t.environ.exist.NO_COLOR = true; - } else if (std.mem.eql(u16, key_w, &.{ 'C', 'L', 'I', 'C', 'O', 'L', 'O', 'R', '_', 'F', 'O', 'R', 'C', 'E' })) { + } else if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'C', 'L', 'I', 'C', 'O', 'L', 'O', 'R', '_', 'F', 'O', 'R', 'C', 'E' })) { t.environ.exist.CLICOLOR_FORCE = true; + } else if (windows.eqlIgnoreCaseWtf16(key_w, &.{ 'Z', 'I', 'G', '_', 'P', 'R', 'O', 'G', 'R', 'E', 'S', 'S' })) { + t.environ.zig_progress_file = file: { + var value_buf: [std.fmt.count("{d}", .{std.math.maxInt(usize)})]u8 = undefined; + const len = std.unicode.calcWtf8Len(value_w); + if (len > value_buf.len) break :file error.UnrecognizedFormat; + assert(std.unicode.wtf16LeToWtf8(&value_buf, value_w) == len); + break :file .{ + .handle = @ptrFromInt(std.fmt.parseInt(usize, value_buf[0..len], 10) catch + break :file error.UnrecognizedFormat), + .flags = .{ .nonblocking = true }, + }; + }; } comptime assert(@sizeOf(Environ.String) == 0); - - while (ptr[i] != 0) : (i += 1) {} // skip over '=' and value - i += 1; // skip over null byte } } else if (native_os == .wasi and !builtin.link_libc) { var environ_count: usize = undefined; @@ -14549,20 +14567,9 @@ fn scanEnviron(t: *Threaded) void { t.environ.exist.CLICOLOR_FORCE = true; } else if (std.mem.eql(u8, key, "ZIG_PROGRESS")) { t.environ.zig_progress_file = file: { - const int = std.fmt.parseInt(switch (@typeInfo(File.Handle)) { - .int => |int_info| @Int( - .unsigned, - int_info.bits - @intFromBool(int_info.signedness == .signed), - ), - .pointer => usize, - else => break :file error.UnsupportedOperation, - }, value, 10) catch break :file error.UnrecognizedFormat; break :file .{ - .handle = switch (@typeInfo(File.Handle)) { - .int => int, - .pointer => @ptrFromInt(int), - else => comptime unreachable, - }, + .handle = std.fmt.parseInt(u31, value, 10) catch + break :file error.UnrecognizedFormat, .flags = .{ .nonblocking = true }, }; }; @@ -14668,16 +14675,17 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp const any_ignore = (options.stdin == .ignore or options.stdout == .ignore or options.stderr == .ignore); const dev_null_fd = if (any_ignore) try getDevNullFd(t) else undefined; - const prog_pipe: [2]posix.fd_t = p: { - if (options.progress_node.index == .none) { - break :p .{ -1, -1 }; - } else { - // We use CLOEXEC for the same reason as in `pipe_flags`. - break :p try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - } - }; + const prog_pipe: [2]posix.fd_t = if (options.progress_node.index != .none) + // We use CLOEXEC for the same reason as in `pipe_flags`. + try pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }) + else + .{ -1, -1 }; errdefer destroyPipe(prog_pipe); + if (native_os == .linux and prog_pipe[0] != -1) { + _ = posix.system.fcntl(prog_pipe[0], posix.F.SETPIPE_SZ, @as(u32, std.Progress.max_packet_len * 2)); + } + var arena_allocator = std.heap.ArenaAllocator.init(t.allocator); defer arena_allocator.deinit(); const arena = arena_allocator.allocator(); @@ -14801,7 +14809,7 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp if (options.stderr == .pipe) posix.close(stderr_pipe[1]); if (prog_pipe[1] != -1) posix.close(prog_pipe[1]); - options.progress_node.setIpcFd(prog_pipe[0]); + options.progress_node.setIpcFile(t, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } }); return .{ .pid = pid, @@ -15259,8 +15267,9 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro const prog_pipe = if (options.progress_node.index != .none) try t.windowsCreatePipe(.{ .server = .{ .attributes = .{ .INHERIT = false }, .mode = .{ .IO = .ASYNCHRONOUS } }, - .client = .{ .attributes = .{ .INHERIT = true }, .mode = .{ .IO = .SYNCHRONOUS_NONALERT } }, + .client = .{ .attributes = .{ .INHERIT = true }, .mode = .{ .IO = .ASYNCHRONOUS } }, .inbound = true, + .quota = std.Progress.max_packet_len * 2, }) else undefined; errdefer if (options.progress_node.index != .none) for (prog_pipe) |handle| windows.CloseHandle(handle); @@ -15476,7 +15485,7 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro if (options.progress_node.index != .none) { windows.CloseHandle(prog_pipe[1]); - options.progress_node.setIpcFd(prog_pipe[0]); + options.progress_node.setIpcFile(t, .{ .handle = prog_pipe[0], .flags = .{ .nonblocking = true } }); } return .{ diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 780f27ec75..f17fed0a5b 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -11,7 +11,7 @@ const windows = std.os.windows; const testing = std.testing; const assert = std.debug.assert; const posix = std.posix; -const Writer = std.Io.Writer; +const Writer = Io.Writer; /// Currently this API only supports this value being set to stderr, which /// happens automatically inside `start`. @@ -21,13 +21,10 @@ io: Io, terminal_mode: TerminalMode, -update_worker: ?Io.Future(void), +update_worker: ?Io.Future(WorkerError!void), /// Atomically set by SIGWINCH as well as the root done() function. redraw_event: Io.Event, -/// Indicates a request to shut down and reset global state. -/// Accessed atomically. -done: bool, need_clear: bool, status: Status, @@ -43,15 +40,19 @@ draw_buffer: []u8, /// This is in a separate array from `node_storage` but with the same length so /// that it can be iterated over efficiently without trashing too much of the /// CPU cache. -node_parents: []Node.Parent, -node_storage: []Node.Storage, -node_freelist_next: []Node.OptionalIndex, +node_parents: [node_storage_buffer_len]Node.Parent, +node_storage: [node_storage_buffer_len]Node.Storage, +node_freelist_next: [node_storage_buffer_len]Node.OptionalIndex, node_freelist: Freelist, /// This is the number of elements in node arrays which have been used so far. Nodes before this /// index are either active, or on the freelist. The remaining nodes are implicitly free. This /// value may at times temporarily exceed the node count. node_end_index: u32, +ipc_next: Ipc.SlotAtomic, +ipc: [ipc_storage_buffer_len]Ipc, +ipc_files: [ipc_storage_buffer_len]Io.File, + start_failure: StartFailure, pub const Status = enum { @@ -77,6 +78,80 @@ const Freelist = packed struct(u32) { generation: u24, }; +pub const Ipc = packed struct(u32) { + /// mutex protecting `file` use, only locked by `serializeIpc` + locked: bool, + /// when unlocked: whether `file` is defined + /// when locked: whether `file` does not need to be closed + valid: bool, + unused: @Int(.unsigned, 32 - 2 - @bitSizeOf(Generation)) = 0, + generation: Generation, + + pub const Slot = std.math.IntFittingRange(0, ipc_storage_buffer_len - 1); + pub const Generation = @Int(.unsigned, 32 - @bitSizeOf(Slot)); + + const SlotAtomic = @Int(.unsigned, std.math.ceilPowerOfTwoAssert(usize, @min(@bitSizeOf(Slot), 8))); + + pub const Index = packed struct(u32) { + slot: Slot, + generation: Generation, + }; + + const Data = struct { + state: State, + bytes_read: u16, + main_index: u8, + start_index: u8, + nodes_len: u8, + + const State = enum { unused, pending, ready }; + + /// No operations have been started on this file. + const unused: Data = .{ + .state = .unused, + .bytes_read = 0, + .main_index = 0, + .start_index = 0, + .nodes_len = 0, + }; + + fn findLastPacket(data: *const Data, buffer: *const [max_packet_len]u8) struct { u16, u16 } { + assert(data.state == .ready); + var packet_start: u16 = 0; + var packet_end: u16 = 0; + const bytes_read = data.bytes_read; + while (bytes_read - packet_end >= 1) { + const nodes_len: u16 = buffer[packet_end]; + const packet_len = 1 + nodes_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent)); + if (packet_end + packet_len > bytes_read) break; + packet_start = packet_end; + packet_end += packet_len; + } + return .{ packet_start, packet_end }; + } + + fn rebase( + data: *Data, + buffer: *[max_packet_len]u8, + vec: *[1][]u8, + batch: *std.Io.Batch, + slot: Slot, + packet_end: u16, + ) void { + assert(data.state == .ready); + const remaining = buffer[packet_end..data.bytes_read]; + @memmove(buffer[0..remaining.len], remaining); + vec.* = .{buffer[remaining.len..]}; + batch.addAt(slot, .{ .file_read_streaming = .{ + .file = global_progress.ipc_files[slot], + .data = vec, + } }); + data.state = .pending; + data.bytes_read = @intCast(remaining.len); + } + }; +}; + pub const TerminalMode = union(enum) { off, ansi_escape_codes, @@ -116,7 +191,7 @@ pub const Node = struct { pub const none: Node = .{ .index = .none }; - pub const max_name_len = 40; + pub const max_name_len = 120; const Storage = extern struct { /// Little endian. @@ -127,25 +202,16 @@ pub const Node = struct { name: [max_name_len]u8 align(@alignOf(usize)), /// Not thread-safe. - fn getIpcFd(s: Storage) ?Io.File.Handle { - return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(Io.File.Handle)) { - .int => @bitCast(s.completed_count), - .pointer => @ptrFromInt(s.completed_count), - else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), - } else null; + fn getIpcIndex(s: Storage) ?Ipc.Index { + return if (s.estimated_total_count == std.math.maxInt(u32)) @bitCast(s.completed_count) else null; } /// Thread-safe. - fn setIpcFd(s: *Storage, fd: Io.File.Handle) void { - const integer: u32 = switch (@typeInfo(Io.File.Handle)) { - .int => @bitCast(fd), - .pointer => @intCast(@intFromPtr(fd)), - else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), - }; + fn setIpcIndex(s: *Storage, ipc_index: Ipc.Index) void { // `estimated_total_count` max int indicates the special state that // causes `completed_count` to be treated as a file descriptor, so // the order here matters. - @atomicStore(u32, &s.completed_count, integer, .monotonic); + @atomicStore(u32, &s.completed_count, @bitCast(ipc_index), .monotonic); @atomicStore(u32, &s.estimated_total_count, std.math.maxInt(u32), .release); // synchronizes with acquire in `serialize` } @@ -155,6 +221,14 @@ pub const Node = struct { s.estimated_total_count = @byteSwap(s.estimated_total_count); } + fn copyRoot(dest: *Node.Storage, src: *align(1) const Node.Storage) void { + dest.* = .{ + .completed_count = src.completed_count, + .estimated_total_count = src.estimated_total_count, + .name = if (src.name[0] == 0) dest.name else src.name, + }; + } + comptime { assert((@sizeOf(Storage) % 4) == 0); } @@ -242,7 +316,7 @@ pub const Node = struct { } const free_index = @atomicRmw(u32, &global_progress.node_end_index, .Add, 1, .monotonic); - if (free_index >= global_progress.node_storage.len) { + if (free_index >= node_storage_buffer_len) { // Ran out of node storage memory. Progress for this node will not be tracked. _ = @atomicRmw(u32, &global_progress.node_end_index, .Sub, 1, .monotonic); return Node.none; @@ -292,15 +366,17 @@ pub const Node = struct { const index = n.index.unwrap() orelse return; const storage = storageByIndex(index); // Avoid u32 max int which is used to indicate a special state. - const saturated = @min(std.math.maxInt(u32) - 1, count); - @atomicStore(u32, &storage.estimated_total_count, saturated, .monotonic); + const saturated_total_count = @min(std.math.maxInt(u32) - 1, count); + @atomicStore(u32, &storage.estimated_total_count, saturated_total_count, .monotonic); } /// Thread-safe. pub fn increaseEstimatedTotalItems(n: Node, count: usize) void { const index = n.index.unwrap() orelse return; const storage = storageByIndex(index); - _ = @atomicRmw(u32, &storage.estimated_total_count, .Add, std.math.lossyCast(u32, count), .monotonic); + // Avoid u32 max int which is used to indicate a special state. + const saturated_total_count = @min(std.math.maxInt(u32) - 1, count); + _ = @atomicRmw(u32, &storage.estimated_total_count, .Add, saturated_total_count, .monotonic); } /// Finish a started `Node`. Thread-safe. @@ -310,11 +386,25 @@ pub const Node = struct { return; } const index = n.index.unwrap() orelse return; + const io = global_progress.io; const parent_ptr = parentByIndex(index); if (@atomicLoad(Node.Parent, parent_ptr, .monotonic).unwrap()) |parent_index| { _ = @atomicRmw(u32, &storageByIndex(parent_index).completed_count, .Add, 1, .monotonic); @atomicStore(Node.Parent, parent_ptr, .unused, .monotonic); + if (storageByIndex(index).getIpcIndex()) |ipc_index| { + const file = global_progress.ipc_files[ipc_index.slot]; + const ipc = @atomicRmw( + Ipc, + &global_progress.ipc[ipc_index.slot], + .And, + .{ .locked = true, .valid = false, .generation = std.math.maxInt(Ipc.Generation) }, + .release, + ); + assert(ipc.valid and ipc.generation == ipc_index.generation); + if (!ipc.locked) file.close(io); + } + const freelist = &global_progress.node_freelist; var old_freelist = @atomicLoad(Freelist, freelist, .monotonic); while (true) { @@ -332,42 +422,52 @@ pub const Node = struct { }; } } else { - @atomicStore(bool, &global_progress.done, true, .monotonic); - const io = global_progress.io; - global_progress.redraw_event.set(io); - if (global_progress.update_worker) |*worker| worker.await(io); + if (global_progress.update_worker) |*worker| worker.cancel(io) catch {}; + for (&global_progress.ipc, &global_progress.ipc_files) |ipc, ipc_file| { + assert(!ipc.locked or !ipc.valid); // missing call to end() + if (ipc.locked or ipc.valid) ipc_file.close(io); + } } } - /// Posix-only. Used by `std.process.Child`. Thread-safe. - pub fn setIpcFd(node: Node, fd: Io.File.Handle) void { + /// Used by `std.process.Child`. Thread-safe. + pub fn setIpcFile(node: Node, expected_io_userdata: ?*anyopaque, file: Io.File) void { const index = node.index.unwrap() orelse return; - switch (@typeInfo(Io.File.Handle)) { - .int => { - assert(fd >= 0); - assert(fd != posix.STDOUT_FILENO); - assert(fd != posix.STDIN_FILENO); - assert(fd != posix.STDERR_FILENO); - }, - .pointer => { - assert(fd != windows.INVALID_HANDLE_VALUE); - }, - else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), - } - storageByIndex(index).setIpcFd(fd); + const io = global_progress.io; + assert(io.userdata == expected_io_userdata); + for (0..ipc_storage_buffer_len) |_| { + const slot: Ipc.Slot = @truncate( + @atomicRmw(Ipc.SlotAtomic, &global_progress.ipc_next, .Add, 1, .monotonic), + ); + if (slot >= ipc_storage_buffer_len) continue; + const ipc_ptr = &global_progress.ipc[slot]; + const ipc = @atomicLoad(Ipc, ipc_ptr, .monotonic); + if (ipc.locked or ipc.valid) continue; + const generation = ipc.generation +% 1; + if (@cmpxchgWeak( + Ipc, + ipc_ptr, + ipc, + .{ .locked = false, .valid = true, .generation = generation }, + .acquire, + .monotonic, + )) |_| continue; + global_progress.ipc_files[slot] = file; + storageByIndex(index).setIpcIndex(.{ .slot = slot, .generation = generation }); + break; + } else file.close(io); } - /// Posix-only. Thread-safe. Assumes the node is storing an IPC file - /// descriptor. - pub fn getIpcFd(node: Node) ?Io.File.Handle { - const index = node.index.unwrap() orelse return null; - const storage = storageByIndex(index); - const int = @atomicLoad(u32, &storage.completed_count, .monotonic); - return switch (@typeInfo(Io.File.Handle)) { - .int => @bitCast(int), - .pointer => @ptrFromInt(int), - else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), - }; + pub fn setIpcIndex(node: Node, ipc_index: Ipc.Index) void { + storageByIndex(node.index.unwrap() orelse return).setIpcIndex(ipc_index); + } + + /// Not thread-safe. + pub fn takeIpcIndex(node: Node) ?Ipc.Index { + const storage = storageByIndex(node.index.unwrap() orelse return null); + assert(storage.estimated_total_count == std.math.maxInt(u32)); + @atomicStore(u32, &storage.estimated_total_count, 0, .monotonic); + return @bitCast(storage.completed_count); } fn storageByIndex(index: Node.Index) *Node.Storage { @@ -387,7 +487,9 @@ pub const Node = struct { const storage = storageByIndex(free_index); @atomicStore(u32, &storage.completed_count, 0, .monotonic); - @atomicStore(u32, &storage.estimated_total_count, std.math.lossyCast(u32, estimated_total_items), .monotonic); + // Avoid u32 max int which is used to indicate a special state. + const saturated_total_count = @min(std.math.maxInt(u32) - 1, estimated_total_items); + @atomicStore(u32, &storage.estimated_total_count, saturated_total_count, .monotonic); const name_len = @min(max_name_len, name.len); copyAtomicStore(storage.name[0..name_len], name[0..name_len]); if (name_len < storage.name.len) @@ -414,16 +516,20 @@ var global_progress: Progress = .{ .rows = 0, .cols = 0, .draw_buffer = undefined, - .done = false, .need_clear = false, .status = .working, - .start_failure = .unstarted, - .node_parents = &node_parents_buffer, - .node_storage = &node_storage_buffer, - .node_freelist_next = &node_freelist_next_buffer, + .node_parents = undefined, + .node_storage = undefined, + .node_freelist_next = undefined, .node_freelist = .{ .head = .none, .generation = 0 }, .node_end_index = 0, + + .ipc_next = 0, + .ipc = undefined, + .ipc_files = undefined, + + .start_failure = .unstarted, }; pub const StartFailure = union(enum) { @@ -433,17 +539,23 @@ pub const StartFailure = union(enum) { parent_ipc: error{ UnsupportedOperation, UnrecognizedFormat }, }; -const node_storage_buffer_len = 83; -var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined; -var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined; -var node_freelist_next_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined; +/// One less than a power of two ensures `max_packet_len` is already a power of two. +const node_storage_buffer_len = ipc_storage_buffer_len - 1; + +/// Power of two to avoid wasted `ipc_next` increments. +const ipc_storage_buffer_len = 128; + +pub const max_packet_len = std.math.ceilPowerOfTwoAssert( + usize, + 1 + node_storage_buffer_len * (@sizeOf(Node.Storage) + @sizeOf(Node.OptionalIndex)), +); var default_draw_buffer: [4096]u8 = undefined; var debug_start_trace = std.debug.Trace.init; pub const have_ipc = switch (builtin.os.tag) { - .wasi, .freestanding, .windows => false, + .wasi, .freestanding => false, else => true, }; @@ -475,9 +587,9 @@ pub fn start(io: Io, options: Options) Node { } debug_start_trace.add("first initialized here"); - @memset(global_progress.node_parents, .unused); + @memset(&global_progress.node_parents, .unused); + @memset(&global_progress.ipc, .{ .locked = false, .valid = false, .generation = 0 }); const root_node = Node.init(@enumFromInt(0), .none, options.root_name, options.estimated_total_items); - global_progress.done = false; global_progress.node_end_index = 1; assert(options.draw_buffer.len >= 200); @@ -551,58 +663,55 @@ pub fn setStatus(new_status: Status) void { } /// Returns whether a resize is needed to learn the terminal size. -fn wait(io: Io, timeout_ns: u64) bool { +fn wait(io: Io, timeout_ns: u64) Io.Cancelable!bool { const timeout: Io.Timeout = .{ .duration = .{ .clock = .awake, .raw = .fromNanoseconds(timeout_ns), } }; const resize_flag = if (global_progress.redraw_event.waitTimeout(io, timeout)) |_| true else |err| switch (err) { - error.Timeout, error.Canceled => false, + error.Timeout => false, + error.Canceled => |e| return e, }; global_progress.redraw_event.reset(); return resize_flag or (global_progress.cols == 0); } -fn updateTask(io: Io) void { +const WorkerError = error{WindowTooSmall} || Io.ConcurrentError || Io.Cancelable || + Io.File.Writer.Error || Io.Operation.FileReadStreaming.Error; + +fn updateTask(io: Io) WorkerError!void { // Store this data in the thread so that it does not need to be part of the // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; + serialized_buffer.init(); + defer serialized_buffer.batch.cancel(io); // In this function we bypass the wrapper code inside `Io.lockStderr` / // `Io.tryLockStderr` in order to avoid clearing the terminal twice. // We still want to go through the `Io` instance however in case it uses a // task-switching mutex. - { - const resize_flag = wait(io, global_progress.initial_delay_ns); - if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; - maybeUpdateSize(io, resize_flag) catch return; - - const buffer, _ = computeRedraw(&serialized_buffer); - if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { - defer io.unlockStderr(); - global_progress.need_clear = true; - locked_stderr.file_writer.interface.writeAll(buffer) catch return; - } + try maybeUpdateSize(io, try wait(io, global_progress.initial_delay_ns)); + errdefer { + const cancel_protection = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(cancel_protection); + const stderr = io.vtable.lockStderr(io.userdata, null) catch |err| switch (err) { + error.Canceled => unreachable, // blocked + }; + defer io.unlockStderr(); + clearWrittenWithEscapeCodes(stderr.file_writer) catch {}; } - while (true) { - const resize_flag = wait(io, global_progress.refresh_rate_ns); - - if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - const stderr = io.vtable.lockStderr(io.userdata, null) catch return; - defer io.unlockStderr(); - return clearWrittenWithEscapeCodes(stderr.file_writer) catch {}; - } - - maybeUpdateSize(io, resize_flag) catch return; - - const buffer, _ = computeRedraw(&serialized_buffer); - if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { + const buffer, _ = try computeRedraw(io, &serialized_buffer); + if (try io.vtable.tryLockStderr(io.userdata, null)) |locked_stderr| { defer io.unlockStderr(); global_progress.need_clear = true; - locked_stderr.file_writer.interface.writeAll(buffer) catch return; + locked_stderr.file_writer.interface.writeAll(buffer) catch |err| switch (err) { + error.WriteFailed => return locked_stderr.file_writer.err.?, + }; } + + try maybeUpdateSize(io, try wait(io, global_progress.refresh_rate_ns)); } } @@ -614,79 +723,60 @@ fn windowsApiWriteMarker() void { _ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null); } -fn windowsApiUpdateTask(io: Io) void { +fn windowsApiUpdateTask(io: Io) WorkerError!void { + // Store this data in the thread so that it does not need to be part of the + // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; + serialized_buffer.init(); + defer serialized_buffer.batch.cancel(io); // In this function we bypass the wrapper code inside `Io.lockStderr` / // `Io.tryLockStderr` in order to avoid clearing the terminal twice. // We still want to go through the `Io` instance however in case it uses a // task-switching mutex. - { - const resize_flag = wait(io, global_progress.initial_delay_ns); - if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; - maybeUpdateSize(io, resize_flag) catch return; - - const buffer, const nl_n = computeRedraw(&serialized_buffer); - if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { - defer io.unlockStderr(); - windowsApiWriteMarker(); - global_progress.need_clear = true; - locked_stderr.file_writer.interface.writeAll(buffer) catch return; - windowsApiMoveToMarker(nl_n) catch return; - } + try maybeUpdateSize(io, try wait(io, global_progress.initial_delay_ns)); + errdefer { + const cancel_protection = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(cancel_protection); + _ = io.vtable.lockStderr(io.userdata, null) catch |err| switch (err) { + error.Canceled => unreachable, // blocked + }; + defer io.unlockStderr(); + clearWrittenWindowsApi() catch {}; } - while (true) { - const resize_flag = wait(io, global_progress.refresh_rate_ns); - - if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - _ = io.vtable.lockStderr(io.userdata, null) catch return; - defer io.unlockStderr(); - return clearWrittenWindowsApi() catch {}; - } - - maybeUpdateSize(io, resize_flag) catch return; - - const buffer, const nl_n = computeRedraw(&serialized_buffer); + const buffer, const nl_n = try computeRedraw(io, &serialized_buffer); if (io.vtable.tryLockStderr(io.userdata, null) catch return) |locked_stderr| { defer io.unlockStderr(); - clearWrittenWindowsApi() catch return; + try clearWrittenWindowsApi(); windowsApiWriteMarker(); global_progress.need_clear = true; - locked_stderr.file_writer.interface.writeAll(buffer) catch return; + locked_stderr.file_writer.interface.writeAll(buffer) catch |err| switch (err) { + error.WriteFailed => return locked_stderr.file_writer.err.?, + }; windowsApiMoveToMarker(nl_n) catch return; } + + try maybeUpdateSize(io, try wait(io, global_progress.refresh_rate_ns)); } } -fn ipcThreadRun(io: Io, file: Io.File) void { +fn ipcThreadRun(io: Io, file: Io.File) WorkerError!void { // Store this data in the thread so that it does not need to be part of the // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; + serialized_buffer.init(); + defer serialized_buffer.batch.cancel(io); + var fw = file.writerStreaming(io, &.{}); - { - _ = wait(io, global_progress.initial_delay_ns); - - if (@atomicLoad(bool, &global_progress.done, .monotonic)) - return; - - const serialized = serialize(&serialized_buffer); - writeIpc(io, file, serialized) catch |err| switch (err) { - error.BrokenPipe => return, - }; - } - + _ = try io.sleep(.fromNanoseconds(global_progress.initial_delay_ns), .awake); while (true) { - _ = wait(io, global_progress.refresh_rate_ns); - - if (@atomicLoad(bool, &global_progress.done, .monotonic)) - return; - - const serialized = serialize(&serialized_buffer); - writeIpc(io, file, serialized) catch |err| switch (err) { - error.BrokenPipe => return, + writeIpc(&fw.interface, try serialize(io, &serialized_buffer)) catch |err| switch (err) { + error.WriteFailed => return fw.err.?, }; + + _ = try io.sleep(.fromNanoseconds(global_progress.refresh_rate_ns), .awake); } } @@ -865,31 +955,49 @@ const Serialized = struct { const Buffer = struct { parents: [node_storage_buffer_len]Node.Parent, storage: [node_storage_buffer_len]Node.Storage, - map: [node_storage_buffer_len]Node.OptionalIndex, - parents_copy: [node_storage_buffer_len]Node.Parent, - storage_copy: [node_storage_buffer_len]Node.Storage, - ipc_metadata_fds_copy: [node_storage_buffer_len]Fd, - ipc_metadata_copy: [node_storage_buffer_len]SavedMetadata, + ipc_start: u8, + ipc_end: u8, + ipc_data: [ipc_storage_buffer_len]Ipc.Data, + ipc_buffers: [ipc_storage_buffer_len][max_packet_len]u8, + ipc_vecs: [ipc_storage_buffer_len][1][]u8, + batch_storage: [ipc_storage_buffer_len]Io.Operation.Storage, + batch: Io.Batch, - ipc_metadata_fds: [node_storage_buffer_len]Fd, - ipc_metadata: [node_storage_buffer_len]SavedMetadata, + fn init(buffer: *Buffer) void { + buffer.ipc_start = 0; + buffer.ipc_end = 0; + @memset(&buffer.ipc_data, .unused); + buffer.batch = .init(&buffer.batch_storage); + } }; }; -fn serialize(serialized_buffer: *Serialized.Buffer) Serialized { - var serialized_len: usize = 0; - var any_ipc = false; +fn serialize(io: Io, serialized_buffer: *Serialized.Buffer) !Serialized { + var prev_parents: [node_storage_buffer_len]Node.Parent = undefined; + var prev_storage: [node_storage_buffer_len]Node.Storage = undefined; + { + const ipc_start = serialized_buffer.ipc_start; + const ipc_end = serialized_buffer.ipc_end; + @memcpy(prev_parents[ipc_start..ipc_end], serialized_buffer.parents[ipc_start..ipc_end]); + @memcpy(prev_storage[ipc_start..ipc_end], serialized_buffer.storage[ipc_start..ipc_end]); + } // Iterate all of the nodes and construct a serializable copy of the state that can be examined // without atomics. The `@min` call is here because `node_end_index` might briefly exceed the // node count sometimes. - const end_index = @min(@atomicLoad(u32, &global_progress.node_end_index, .monotonic), global_progress.node_storage.len); + const end_index = @min( + @atomicLoad(u32, &global_progress.node_end_index, .monotonic), + node_storage_buffer_len, + ); + var map: [node_storage_buffer_len]Node.OptionalIndex = undefined; + var serialized_len: u8 = 0; + var maybe_ipc_start: ?u8 = null; for ( global_progress.node_parents[0..end_index], global_progress.node_storage[0..end_index], - serialized_buffer.map[0..end_index], - ) |*parent_ptr, *storage_ptr, *map| { + map[0..end_index], + ) |*parent_ptr, *storage_ptr, *map_entry| { const parent = @atomicLoad(Node.Parent, parent_ptr, .monotonic); if (parent == .unused) { // We might read "mixed" node data in this loop, due to weird atomic things @@ -903,17 +1011,17 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized { // parent, it will just not be printed at all. The general idea here is that performance // is more important than 100% correct output every frame, given that this API is likely // to be used in hot paths! - map.* = .none; + map_entry.* = .none; continue; } const dest_storage = &serialized_buffer.storage[serialized_len]; copyAtomicLoad(&dest_storage.name, &storage_ptr.name); - dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // sychronizes with release in `setIpcFd` + dest_storage.estimated_total_count = @atomicLoad(u32, &storage_ptr.estimated_total_count, .acquire); // sychronizes with release in `setIpcIndex` dest_storage.completed_count = @atomicLoad(u32, &storage_ptr.completed_count, .monotonic); - any_ipc = any_ipc or (dest_storage.getIpcFd() != null); serialized_buffer.parents[serialized_len] = parent; - map.* = @enumFromInt(serialized_len); + map_entry.* = @enumFromInt(serialized_len); + if (maybe_ipc_start == null and dest_storage.getIpcIndex() != null) maybe_ipc_start = serialized_len; serialized_len += 1; } @@ -922,13 +1030,201 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized { parent.* = switch (parent.*) { .unused => unreachable, .none => .none, - _ => |p| serialized_buffer.map[@intFromEnum(p)].toParent(), + _ => |p| map[@intFromEnum(p)].toParent(), }; } + // Fill pipe buffers. + const batch = &serialized_buffer.batch; + batch.awaitConcurrent(io, .{ + .duration = .{ .raw = .zero, .clock = .awake }, + }) catch |err| switch (err) { + error.Timeout => {}, + else => |e| return e, + }; + var ready_len: u8 = 0; + while (batch.next()) |operation| switch (operation.index) { + 0...ipc_storage_buffer_len - 1 => { + const ipc_data = &serialized_buffer.ipc_data[operation.index]; + ipc_data.bytes_read += @intCast( + operation.result.file_read_streaming catch |err| switch (err) { + error.EndOfStream => { + const file = global_progress.ipc_files[operation.index]; + const ipc = @atomicRmw( + Ipc, + &global_progress.ipc[operation.index], + .And, + .{ + .locked = false, + .valid = true, + .generation = std.math.maxInt(Ipc.Generation), + }, + .release, + ); + assert(ipc.locked); + if (!ipc.valid) file.close(io); + ipc_data.* = .unused; + continue; + }, + else => |e| return e, + }, + ); + assert(ipc_data.state == .pending); + ipc_data.state = .ready; + ready_len += 1; + }, + else => unreachable, + }; + // Find nodes which correspond to child processes. - if (any_ipc) - serialized_len = serializeIpc(serialized_len, serialized_buffer); + const ipc_start = maybe_ipc_start orelse serialized_len; + serialized_buffer.ipc_start = ipc_start; + for ( + serialized_buffer.parents[ipc_start..serialized_len], + serialized_buffer.storage[ipc_start..serialized_len], + ipc_start.., + ) |main_parent, *main_storage, main_index| { + if (main_parent == .unused) continue; + const ipc_index = main_storage.getIpcIndex() orelse continue; + const ipc = &global_progress.ipc[ipc_index.slot]; + const ipc_data = &serialized_buffer.ipc_data[ipc_index.slot]; + state: switch (ipc_data.state) { + .unused => { + if (@cmpxchgWeak( + Ipc, + ipc, + .{ .locked = false, .valid = true, .generation = ipc_index.generation }, + .{ .locked = true, .valid = true, .generation = ipc_index.generation }, + .acquire, + .monotonic, + )) |_| continue; + + const ipc_vec = &serialized_buffer.ipc_vecs[ipc_index.slot]; + ipc_vec.* = .{&serialized_buffer.ipc_buffers[ipc_index.slot]}; + batch.addAt(ipc_index.slot, .{ .file_read_streaming = .{ + .file = global_progress.ipc_files[ipc_index.slot], + .data = ipc_vec, + } }); + + ipc_data.* = .{ + .state = .pending, + .bytes_read = 0, + .main_index = @intCast(main_index), + .start_index = serialized_len, + .nodes_len = 0, + }; + main_storage.completed_count = 0; + main_storage.estimated_total_count = 0; + }, + .pending => { + const start_index = ipc_data.start_index; + const nodes_len = @min(ipc_data.nodes_len, node_storage_buffer_len - serialized_len); + + main_storage.copyRoot(&prev_storage[ipc_data.main_index]); + @memcpy( + serialized_buffer.storage[serialized_len..][0..nodes_len], + prev_storage[start_index..][0..nodes_len], + ); + for ( + serialized_buffer.parents[serialized_len..][0..nodes_len], + prev_parents[serialized_len..][0..nodes_len], + ) |*parent, prev_parent| parent.* = switch (prev_parent) { + .none, .unused => .none, + _ => if (@intFromEnum(prev_parent) == ipc_data.main_index) + @enumFromInt(main_index) + else if (@intFromEnum(prev_parent) >= start_index and + @intFromEnum(prev_parent) < start_index + nodes_len) + @enumFromInt(@intFromEnum(prev_parent) - start_index + serialized_len) + else + .none, + }; + + ipc_data.main_index = @intCast(main_index); + ipc_data.start_index = serialized_len; + ipc_data.nodes_len = nodes_len; + serialized_len += nodes_len; + }, + .ready => { + const ipc_buffer = &serialized_buffer.ipc_buffers[ipc_index.slot]; + const packet_start, const packet_end = ipc_data.findLastPacket(ipc_buffer); + const packet_is_empty = packet_end - packet_start <= 1; + if (!packet_is_empty) { + const storage, const parents, const nodes_len = packet_contents: { + var packet_index: usize = packet_start; + const nodes_len: u16 = ipc_buffer[packet_index]; + packet_index += 1; + const storage_bytes = + ipc_buffer[packet_index..][0 .. nodes_len * @sizeOf(Node.Storage)]; + packet_index += storage_bytes.len; + const parents_bytes = + ipc_buffer[packet_index..][0 .. nodes_len * @sizeOf(Node.Parent)]; + packet_index += parents_bytes.len; + assert(packet_index == packet_end); + const storage: []align(1) const Node.Storage = @ptrCast(storage_bytes); + const parents: []align(1) const Node.Parent = @ptrCast(parents_bytes); + const children_nodes_len = + @min(nodes_len - 1, node_storage_buffer_len - serialized_len); + break :packet_contents .{ storage, parents, children_nodes_len }; + }; + + // Mount the root here. + main_storage.copyRoot(&storage[0]); + if (is_big_endian) main_storage.byteSwap(); + + // Copy the rest of the tree to the end. + const serialized_storage = + serialized_buffer.storage[serialized_len..][0..nodes_len]; + @memcpy(serialized_storage, storage[1..][0..nodes_len]); + if (is_big_endian) for (serialized_storage) |*s| s.byteSwap(); + + // Patch up parent pointers taking into account how the subtree is mounted. + for ( + serialized_buffer.parents[serialized_len..][0..nodes_len], + parents[1..][0..nodes_len], + ) |*parent, prev_parent| parent.* = switch (prev_parent) { + // Fix bad data so the rest of the code does not see `unused`. + .none, .unused => .none, + // Root node is being mounted here. + @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index), + // Other nodes mounted at the end. + // Don't trust child data; if the data is outside the expected range, + // ignore the data. This also handles the case when data was truncated. + _ => if (@intFromEnum(prev_parent) <= nodes_len) + @enumFromInt(@intFromEnum(prev_parent) - 1 + serialized_len) + else + .none, + }; + + ipc_data.main_index = @intCast(main_index); + ipc_data.start_index = serialized_len; + ipc_data.nodes_len = nodes_len; + serialized_len += nodes_len; + } + const ipc_vec = &serialized_buffer.ipc_vecs[ipc_index.slot]; + ipc_data.rebase(ipc_buffer, ipc_vec, batch, ipc_index.slot, packet_end); + ready_len -= 1; + if (packet_is_empty) continue :state .pending; + }, + } + } + serialized_buffer.ipc_end = serialized_len; + + // Ignore data from unused pipes. This ensures that if a child process exists we will + // eventually see `EndOfStream` and close the pipe. + if (ready_len > 0) for ( + &serialized_buffer.ipc_data, + &serialized_buffer.ipc_buffers, + &serialized_buffer.ipc_vecs, + 0.., + ) |*ipc_data, *ipc_buffer, *ipc_vec, ipc_slot| switch (ipc_data.state) { + .unused, .pending => {}, + .ready => { + _, const packet_end = ipc_data.findLastPacket(ipc_buffer); + ipc_data.rebase(ipc_buffer, ipc_vec, batch, @intCast(ipc_slot), packet_end); + ready_len -= 1; + }, + }; + assert(ready_len == 0); return .{ .parents = serialized_buffer.parents[0..serialized_len], @@ -936,252 +1232,10 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized { }; } -const SavedMetadata = struct { - remaining_read_trash_bytes: u16, - main_index: u8, - start_index: u8, - nodes_len: u8, -}; +fn computeRedraw(io: Io, serialized_buffer: *Serialized.Buffer) !struct { []u8, usize } { + if (global_progress.rows == 0 or global_progress.cols == 0) return error.WindowTooSmall; -const Fd = enum(i32) { - _, - - fn init(fd: Io.File.Handle) Fd { - return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd); - } - - fn get(fd: Fd) Io.File.Handle { - return if (is_windows) - @ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd))))) - else - @intFromEnum(fd); - } -}; - -var ipc_metadata_len: u8 = 0; - -fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize { - const io = global_progress.io; - const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy; - const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy; - const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds; - const ipc_metadata = &serialized_buffer.ipc_metadata; - - var serialized_len = start_serialized_len; - var pipe_buf: [2 * 4096]u8 = undefined; - - const old_ipc_metadata_fds = ipc_metadata_fds_copy[0..ipc_metadata_len]; - const old_ipc_metadata = ipc_metadata_copy[0..ipc_metadata_len]; - ipc_metadata_len = 0; - - main_loop: for ( - serialized_buffer.parents[0..serialized_len], - serialized_buffer.storage[0..serialized_len], - 0.., - ) |main_parent, *main_storage, main_index| { - if (main_parent == .unused) continue; - const file: Io.File = .{ - .handle = main_storage.getIpcFd() orelse continue, - .flags = .{ .nonblocking = true }, - }; - const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata); - var bytes_read: usize = 0; - while (true) { - const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) { - error.WouldBlock, error.EndOfStream => break, - else => |e| { - std.log.debug("failed to read child progress data: {t}", .{e}); - main_storage.completed_count = 0; - main_storage.estimated_total_count = 0; - continue :main_loop; - }, - }; - if (opt_saved_metadata) |m| { - if (m.remaining_read_trash_bytes > 0) { - assert(bytes_read == 0); - if (m.remaining_read_trash_bytes >= n) { - m.remaining_read_trash_bytes = @intCast(m.remaining_read_trash_bytes - n); - continue; - } - const src = pipe_buf[m.remaining_read_trash_bytes..n]; - @memmove(pipe_buf[0..src.len], src); - m.remaining_read_trash_bytes = 0; - bytes_read = src.len; - continue; - } - } - bytes_read += n; - } - // Ignore all but the last message on the pipe. - var input: []u8 = pipe_buf[0..bytes_read]; - if (input.len == 0) { - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, file.handle); - continue; - } - - const storage, const parents = while (true) { - const subtree_len: usize = input[0]; - const expected_bytes = 1 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent)); - if (input.len < expected_bytes) { - // Ignore short reads. We'll handle the next full message when it comes instead. - const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len); - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, file.handle); - continue :main_loop; - } - if (input.len > expected_bytes) { - input = input[expected_bytes..]; - continue; - } - const storage_bytes = input[1..][0 .. subtree_len * @sizeOf(Node.Storage)]; - const parents_bytes = input[1 + storage_bytes.len ..][0 .. subtree_len * @sizeOf(Node.Parent)]; - break .{ - std.mem.bytesAsSlice(Node.Storage, storage_bytes), - std.mem.bytesAsSlice(Node.Parent, parents_bytes), - }; - }; - - const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len)); - - // Remember in case the pipe is empty on next update. - ipc_metadata_fds[ipc_metadata_len] = Fd.init(file.handle); - ipc_metadata[ipc_metadata_len] = .{ - .remaining_read_trash_bytes = 0, - .start_index = @intCast(serialized_len), - .nodes_len = nodes_len, - .main_index = @intCast(main_index), - }; - ipc_metadata_len += 1; - - // Mount the root here. - copyRoot(main_storage, &storage[0]); - if (is_big_endian) main_storage.byteSwap(); - - // Copy the rest of the tree to the end. - const storage_dest = serialized_buffer.storage[serialized_len..][0..nodes_len]; - @memcpy(storage_dest, storage[1..][0..nodes_len]); - - // Always little-endian over the pipe. - if (is_big_endian) for (storage_dest) |*s| s.byteSwap(); - - // Patch up parent pointers taking into account how the subtree is mounted. - for (serialized_buffer.parents[serialized_len..][0..nodes_len], parents[1..][0..nodes_len]) |*dest, p| { - dest.* = switch (p) { - // Fix bad data so the rest of the code does not see `unused`. - .none, .unused => .none, - // Root node is being mounted here. - @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index), - // Other nodes mounted at the end. - // Don't trust child data; if the data is outside the expected range, ignore the data. - // This also handles the case when data was truncated. - _ => |off| if (@intFromEnum(off) > nodes_len) - .none - else - @enumFromInt(serialized_len + @intFromEnum(off) - 1), - }; - } - - serialized_len += nodes_len; - } - - // Save a copy in case any pipes are empty on the next update. - @memcpy(serialized_buffer.parents_copy[0..serialized_len], serialized_buffer.parents[0..serialized_len]); - @memcpy(serialized_buffer.storage_copy[0..serialized_len], serialized_buffer.storage[0..serialized_len]); - @memcpy(ipc_metadata_fds_copy[0..ipc_metadata_len], ipc_metadata_fds[0..ipc_metadata_len]); - @memcpy(ipc_metadata_copy[0..ipc_metadata_len], ipc_metadata[0..ipc_metadata_len]); - - return serialized_len; -} - -fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void { - dest.* = .{ - .completed_count = src.completed_count, - .estimated_total_count = src.estimated_total_count, - .name = if (src.name[0] == 0) dest.name else src.name, - }; -} - -fn findOld( - ipc_fd: Io.File.Handle, - old_metadata_fds: []Fd, - old_metadata: []SavedMetadata, -) ?*SavedMetadata { - for (old_metadata_fds, old_metadata) |fd, *m| { - if (fd.get() == ipc_fd) - return m; - } - return null; -} - -fn useSavedIpcData( - start_serialized_len: usize, - serialized_buffer: *Serialized.Buffer, - main_storage: *Node.Storage, - main_index: usize, - opt_saved_metadata: ?*SavedMetadata, - remaining_read_trash_bytes: u16, - fd: Io.File.Handle, -) usize { - const parents_copy = &serialized_buffer.parents_copy; - const storage_copy = &serialized_buffer.storage_copy; - const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds; - const ipc_metadata = &serialized_buffer.ipc_metadata; - - const saved_metadata = opt_saved_metadata orelse { - main_storage.completed_count = 0; - main_storage.estimated_total_count = 0; - if (remaining_read_trash_bytes > 0) { - ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); - ipc_metadata[ipc_metadata_len] = .{ - .remaining_read_trash_bytes = remaining_read_trash_bytes, - .start_index = @intCast(start_serialized_len), - .nodes_len = 0, - .main_index = @intCast(main_index), - }; - ipc_metadata_len += 1; - } - return start_serialized_len; - }; - - const start_index = saved_metadata.start_index; - const nodes_len = @min(saved_metadata.nodes_len, serialized_buffer.storage.len - start_serialized_len); - const old_main_index = saved_metadata.main_index; - - ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); - ipc_metadata[ipc_metadata_len] = .{ - .remaining_read_trash_bytes = remaining_read_trash_bytes, - .start_index = @intCast(start_serialized_len), - .nodes_len = nodes_len, - .main_index = @intCast(main_index), - }; - ipc_metadata_len += 1; - - const parents = parents_copy[start_index..][0..nodes_len]; - const storage = storage_copy[start_index..][0..nodes_len]; - - copyRoot(main_storage, &storage_copy[old_main_index]); - - @memcpy(serialized_buffer.storage[start_serialized_len..][0..storage.len], storage); - - for (serialized_buffer.parents[start_serialized_len..][0..parents.len], parents) |*dest, p| { - dest.* = switch (p) { - .none, .unused => .none, - _ => |prev| d: { - if (@intFromEnum(prev) == old_main_index) { - break :d @enumFromInt(main_index); - } else if (@intFromEnum(prev) > nodes_len) { - break :d .none; - } else { - break :d @enumFromInt(@intFromEnum(prev) - start_index + start_serialized_len); - } - }, - }; - } - - return start_serialized_len + storage.len; -} - -fn computeRedraw(serialized_buffer: *Serialized.Buffer) struct { []u8, usize } { - const serialized = serialize(serialized_buffer); + const serialized = try serialize(io, serialized_buffer); // Now we can analyze our copy of the graph without atomics, reconstructing // children lists which do not exist in the canonical data. These are @@ -1416,9 +1470,7 @@ fn withinRowLimit(p: *Progress, nl_n: usize) bool { return nl_n + 2 < p.rows; } -var remaining_write_trash_bytes: usize = 0; - -fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!void { +fn writeIpc(writer: *Io.Writer, serialized: Serialized) Io.Writer.Error!void { // Byteswap if necessary to ensure little endian over the pipe. This is // needed because the parent or child process might be running in qemu. if (is_big_endian) for (serialized.storage) |*s| s.byteSwap(); @@ -1429,62 +1481,8 @@ fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!voi const storage = std.mem.sliceAsBytes(serialized.storage); const parents = std.mem.sliceAsBytes(serialized.parents); - var vecs: [3][]const u8 = .{ header, storage, parents }; - - // Ensures the packet can fit in the pipe buffer. - const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) + - node_storage_buffer_len * @sizeOf(Node.OptionalIndex); - comptime assert(upper_bound_msg_len <= 4096); - - while (remaining_write_trash_bytes > 0) { - // We do this in a separate write call to give a better chance for the - // writev below to be in a single packet. - const n = @min(parents.len, remaining_write_trash_bytes); - if (file.writeStreaming(io, &.{}, &.{parents[0..n]}, 1)) |written| { - remaining_write_trash_bytes -= written; - continue; - } else |err| switch (err) { - error.WouldBlock => return, - error.BrokenPipe => return error.BrokenPipe, - else => |e| { - std.log.debug("failed to send progress to parent process: {t}", .{e}); - return error.BrokenPipe; - }, - } - } - - // If this write would block we do not want to keep trying, but we need to - // know if a partial message was written. - if (writevNonblock(io, file, &vecs)) |written| { - const total = header.len + storage.len + parents.len; - if (written < total) { - remaining_write_trash_bytes = total - written; - } - } else |err| switch (err) { - error.WouldBlock => {}, - error.BrokenPipe => return error.BrokenPipe, - else => |e| { - std.log.debug("failed to send progress to parent process: {t}", .{e}); - return error.BrokenPipe; - }, - } -} - -fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error!usize { - var iov_index: usize = 0; - var written: usize = 0; - var total_written: usize = 0; - while (true) { - while (if (iov_index < iov.len) - written >= iov[iov_index].len - else - return total_written) : (iov_index += 1) written -= iov[iov_index].len; - iov[iov_index].ptr += written; - iov[iov_index].len -= written; - written = try file.writeStreaming(io, &.{}, iov, 1); - if (written == 0) return total_written; - total_written += written; - } + var vec = [3][]const u8{ header, storage, parents }; + try writer.writeVecAll(&vec); } fn maybeUpdateSize(io: Io, resize_flag: bool) !void { diff --git a/lib/std/os/linux.zig b/lib/std/os/linux.zig index 4539aaff4e..ef4616b111 100644 --- a/lib/std/os/linux.zig +++ b/lib/std/os/linux.zig @@ -1848,6 +1848,24 @@ pub const F = struct { pub const RDLCK = if (is_sparc) 1 else 0; pub const WRLCK = if (is_sparc) 2 else 1; pub const UNLCK = if (is_sparc) 3 else 2; + + pub const LINUX_SPECIFIC_BASE = 1024; + + pub const SETLEASE = LINUX_SPECIFIC_BASE + 0; + pub const GETLEASE = LINUX_SPECIFIC_BASE + 1; + pub const NOTIFY = LINUX_SPECIFIC_BASE + 2; + pub const DUPFD_QUERY = LINUX_SPECIFIC_BASE + 3; + pub const CREATED_QUERY = LINUX_SPECIFIC_BASE + 4; + pub const CANCELLK = LINUX_SPECIFIC_BASE + 5; + pub const DUPFD_CLOEXEC = LINUX_SPECIFIC_BASE + 6; + pub const SETPIPE_SZ = LINUX_SPECIFIC_BASE + 7; + pub const GETPIPE_SZ = LINUX_SPECIFIC_BASE + 8; + pub const ADD_SEALS = LINUX_SPECIFIC_BASE + 9; + pub const GET_SEALS = LINUX_SPECIFIC_BASE + 10; + pub const GET_RW_HINT = LINUX_SPECIFIC_BASE + 11; + pub const SET_RW_HINT = LINUX_SPECIFIC_BASE + 12; + pub const GET_FILE_RW_HINT = LINUX_SPECIFIC_BASE + 13; + pub const SET_FILE_RW_HINT = LINUX_SPECIFIC_BASE + 14; }; pub const F_OWNER = enum(i32) {