From 8a80b5464022a9fb4320e37f9dfc2aa83539ff07 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 27 Jan 2026 15:31:23 -0800 Subject: [PATCH] std: remove error.BrokenPipe from file reads, add error.EndOfStream and make reading file streaming allowed to return 0 byte reads. According to Microsoft documentation, on Windows it is possible to get 0-byte reads from pipes when 0-byte writes are made. --- lib/std/Io.zig | 8 ++-- lib/std/Io/File.zig | 5 +- lib/std/Io/File/MultiReader.zig | 14 ++---- lib/std/Io/File/Reader.zig | 34 +++++++------ lib/std/Io/Threaded.zig | 85 ++++++++++++++------------------- lib/std/Progress.zig | 3 +- lib/std/process/Child.zig | 28 ++++++----- lib/std/zig/system.zig | 1 - 8 files changed, 82 insertions(+), 96 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 438fb01529..c00e4619e8 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -187,7 +187,7 @@ pub const VTable = struct { fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize, fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize, fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize, - /// Returns 0 on end of stream. + /// Returns 0 if reading at or past the end. fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize, fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void, fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void, @@ -263,18 +263,18 @@ pub const Operation = union(enum) { status: Status(void) = .{ .unstarted = {} }, }; - /// Returns 0 on end of stream. + /// May return 0 reads which is different than `error.EndOfStream`. pub const FileReadStreaming = struct { file: File, data: []const []u8, status: Status(Error!usize) = .{ .unstarted = {} }, - pub const Error = error{ + pub const Error = UnendingError || error{EndOfStream}; + pub const UnendingError = error{ InputOutput, SystemResources, /// Trying to read a directory file descriptor as if it were a file. IsDir, - BrokenPipe, ConnectionResetByPeer, /// File was not opened with read capability. NotOpenForReading, diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index c545b60222..df5b3b5a53 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -552,11 +552,13 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void { }); } +pub const ReadStreamingError = error{EndOfStream} || Reader.Error; + /// Returns 0 on stream end or if `buffer` has no space available for data. /// /// See also: /// * `reader` -pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize { +pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize { var operation: Io.Operation = .{ .file_read_streaming = .{ .file = file, .data = buffer, @@ -570,7 +572,6 @@ pub const ReadPositionalError = error{ SystemResources, /// Trying to read a directory file descriptor as if it were a file. IsDir, - BrokenPipe, /// Non-blocking has been enabled, and reading from the file descriptor /// would block. WouldBlock, diff --git a/lib/std/Io/File/MultiReader.zig b/lib/std/Io/File/MultiReader.zig index 0cfa777e96..08ad76000c 100644 --- a/lib/std/Io/File/MultiReader.zig +++ b/lib/std/Io/File/MultiReader.zig @@ -15,10 +15,9 @@ pub const Context = struct { fr: File.Reader, vec: [1][]u8, err: ?Error, - eos: bool, }; -pub const Error = Allocator.Error || File.Reader.Error || Io.ConcurrentError; +pub const Error = Allocator.Error || File.ReadStreamingError || Io.ConcurrentError; /// Trailing: /// * `contexts: [len]Context` @@ -85,7 +84,6 @@ pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files: }, .vec = .{&.{}}, .err = null, - .eos = false, }; const operations = streams.operations(); const ring = streams.ring(); @@ -198,8 +196,10 @@ fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void { }, error.EndOfStream => |e| return e, }; - if (context.err != null) return error.ReadFailed; - if (context.eos) return error.EndOfStream; + if (context.err) |err| switch (err) { + error.EndOfStream => |e| return e, + else => return error.ReadFailed, + }; } pub const FillError = Io.Batch.WaitError || error{ @@ -225,10 +225,6 @@ pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillE context.err = err; continue; }; - if (n == 0) { - context.eos = true; - continue; - } const r = &context.fr.interface; r.end += n; if (r.buffer.len - r.end < unused_capacity) { diff --git a/lib/std/Io/File/Reader.zig b/lib/std/Io/File/Reader.zig index 7703521d7e..effd000df8 100644 --- a/lib/std/Io/File/Reader.zig +++ b/lib/std/Io/File/Reader.zig @@ -26,7 +26,7 @@ size_err: ?SizeError = null, seek_err: ?SeekError = null, interface: Io.Reader, -pub const Error = Io.Operation.FileReadStreaming.Error || Io.Cancelable; +pub const Error = Io.Operation.FileReadStreaming.UnendingError || Io.Cancelable; pub const SizeError = File.StatError || error{ /// Occurs if, for example, the file handle is a network socket and therefore does not have a size. @@ -280,14 +280,16 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize { const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data); const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); - const n = r.file.readStreaming(io, dest) catch |err| { - r.err = err; - return error.ReadFailed; + const n = r.file.readStreaming(io, dest) catch |err| switch (err) { + error.EndOfStream => { + r.size = r.pos; + return error.EndOfStream; + }, + else => |e| { + r.err = e; + return error.ReadFailed; + }, }; - if (n == 0) { - r.size = r.pos; - return error.EndOfStream; - } r.pos += n; if (n > data_size) { r.interface.end += n - data_size; @@ -335,14 +337,16 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize { const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data); const dest = iovecs_buffer[0..dest_n]; assert(dest[0].len > 0); - const n = file.readStreaming(io, dest) catch |err| { - r.err = err; - return error.ReadFailed; + const n = file.readStreaming(io, dest) catch |err| switch (err) { + error.EndOfStream => { + r.size = r.pos; + return error.EndOfStream; + }, + else => |e| { + r.err = e; + return error.ReadFailed; + }, }; - if (n == 0) { - r.size = r.pos; - return error.EndOfStream; - } r.pos += n; if (n > data_size) { r.interface.end += n - data_size; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 00f14ed741..c9c38c6b29 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -8583,14 +8583,14 @@ fn fileClose(userdata: ?*anyopaque, files: []const File) void { for (files) |file| posix.close(file.handle); } -fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize { +fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.ReadStreamingError!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; if (is_windows) return fileReadStreamingWindows(file, data); return fileReadStreamingPosix(file, data); } -fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usize { +fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingError!usize { var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined; var i: usize = 0; for (data) |buf| { @@ -8611,28 +8611,24 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) { .SUCCESS => { syscall.finish(); + if (nread == 0) return error.EndOfStream; return nread; }, .INTR, .TIMEDOUT => { try syscall.checkCancel(); continue; }, - else => |e| { - syscall.finish(); - switch (e) { - .INVAL => |err| return errnoBug(err), - .FAULT => |err| return errnoBug(err), - .BADF => return error.IsDir, // File operation on directory. - .IO => return error.InputOutput, - .ISDIR => return error.IsDir, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketUnconnected, - .CONNRESET => return error.ConnectionResetByPeer, - .NOTCAPABLE => return error.AccessDenied, - else => |err| return posix.unexpectedErrno(err), - } - }, + .BADF => return syscall.fail(error.IsDir), // File operation on directory. + .IO => return syscall.fail(error.InputOutput), + .ISDIR => return syscall.fail(error.IsDir), + .NOBUFS => return syscall.fail(error.SystemResources), + .NOMEM => return syscall.fail(error.SystemResources), + .NOTCONN => return syscall.fail(error.SocketUnconnected), + .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .NOTCAPABLE => return syscall.fail(error.AccessDenied), + .INVAL => |err| return syscall.errnoBug(err), + .FAULT => |err| return syscall.errnoBug(err), + else => |err| return syscall.unexpectedErrno(err), } } } @@ -8643,36 +8639,33 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz switch (posix.errno(rc)) { .SUCCESS => { syscall.finish(); + if (rc == 0) return error.EndOfStream; return @intCast(rc); }, .INTR, .TIMEDOUT => { try syscall.checkCancel(); continue; }, - else => |e| { + .BADF => { syscall.finish(); - switch (e) { - .INVAL => |err| return errnoBug(err), - .FAULT => |err| return errnoBug(err), - .AGAIN => return error.WouldBlock, - .BADF => { - if (native_os == .wasi) return error.IsDir; // File operation on directory. - return error.NotOpenForReading; - }, - .IO => return error.InputOutput, - .ISDIR => return error.IsDir, - .NOBUFS => return error.SystemResources, - .NOMEM => return error.SystemResources, - .NOTCONN => return error.SocketUnconnected, - .CONNRESET => return error.ConnectionResetByPeer, - else => |err| return posix.unexpectedErrno(err), - } + if (native_os == .wasi) return error.IsDir; // File operation on directory. + return error.NotOpenForReading; }, + .AGAIN => return syscall.fail(error.WouldBlock), + .IO => return syscall.fail(error.InputOutput), + .ISDIR => return syscall.fail(error.IsDir), + .NOBUFS => return syscall.fail(error.SystemResources), + .NOMEM => return syscall.fail(error.SystemResources), + .NOTCONN => return syscall.fail(error.SocketUnconnected), + .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .INVAL => |err| return syscall.errnoBug(err), + .FAULT => |err| return syscall.errnoBug(err), + else => |err| return syscall.unexpectedErrno(err), } } } -fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize { +fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize { var io_status_block: windows.IO_STATUS_BLOCK = undefined; if (ntReadFile(file.handle, data, &io_status_block)) |result| switch (result) { .status => return ntReadFileResult(&io_status_block), @@ -8707,11 +8700,9 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us fn ntReadFileResult(io_status_block: *windows.IO_STATUS_BLOCK) !usize { switch (io_status_block.u.Status) { - .SUCCESS => { - assert(io_status_block.Information != 0); - return io_status_block.Information; - }, - .END_OF_FILE, .PIPE_BROKEN => return 0, + .SUCCESS => return io_status_block.Information, + .END_OF_FILE => return error.EndOfStream, + .PIPE_BROKEN => return error.EndOfStream, .PENDING => unreachable, .INVALID_DEVICE_REQUEST => return error.IsDir, .LOCK_NOT_GRANTED => return error.LockViolation, @@ -8749,15 +8740,9 @@ fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STAT return .pending; }, .SUCCESS => { - // Only END_OF_FILE is the true end. - if (iosb.Information == 0) { - try syscall.checkCancel(); - continue; - } else { - syscall.finish(); - iosb.u.Status = .SUCCESS; - return .status; - } + syscall.finish(); + iosb.u.Status = .SUCCESS; + return .status; }, .CANCELLED => { try syscall.checkCancel(); diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index 5ccc46778b..d0ee9e556f 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -984,7 +984,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff var bytes_read: usize = 0; while (true) { const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) { - error.WouldBlock => break, + error.WouldBlock, error.EndOfStream => break, else => |e| { std.log.debug("failed to read child progress data: {t}", .{e}); main_storage.completed_count = 0; @@ -992,7 +992,6 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff continue :main_loop; }, }; - if (n == 0) break; if (opt_saved_metadata) |m| { if (m.remaining_read_trash_bytes > 0) { assert(bytes_read == 0); diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index fe6dfa389d..e226fb7a9b 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -176,19 +176,21 @@ pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) while (remaining > 0) { try batch.wait(io, options.timeout); while (batch.next()) |op| { - const n = try reads[op].file_read_streaming.status.result; - if (n == 0) { - remaining -= 1; - } else { - lists[op].items.len += n; - if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong; - if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1); - const cap = lists[op].unusedCapacitySlice(); - if (cap.len == 0) return error.StreamTooLong; - vecs[op][0] = cap; - reads[op].file_read_streaming.status = .{ .unstarted = {} }; - batch.add(op); - } + const n = reads[op].file_read_streaming.status.result catch |err| switch (err) { + error.EndOfStream => { + remaining -= 1; + continue; + }, + else => |e| return e, + }; + lists[op].items.len += n; + if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong; + if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1); + const cap = lists[op].unusedCapacitySlice(); + if (cap.len == 0) return error.StreamTooLong; + vecs[op][0] = cap; + reads[op].file_read_streaming.status = .{ .unstarted = {} }; + batch.add(op); } } } diff --git a/lib/std/zig/system.zig b/lib/std/zig/system.zig index 5046e2f51b..b32b554dee 100644 --- a/lib/std/zig/system.zig +++ b/lib/std/zig/system.zig @@ -420,7 +420,6 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target { error.Canceled => |e| return e, error.Unexpected => |e| return e, error.WouldBlock => return error.Unexpected, - error.BrokenPipe => return error.Unexpected, error.ConnectionResetByPeer => return error.Unexpected, error.NotOpenForReading => return error.Unexpected, error.SocketUnconnected => return error.Unexpected,