std: remove error.BrokenPipe from file reads, add error.EndOfStream

and make reading file streaming allowed to return 0 byte reads.
According to Microsoft documentation, on Windows it is possible to get
0-byte reads from pipes when 0-byte writes are made.
This commit is contained in:
Andrew Kelley 2026-01-27 15:31:23 -08:00
parent fdf1ee973e
commit 8a80b54640
8 changed files with 82 additions and 96 deletions

View file

@ -187,7 +187,7 @@ pub const VTable = struct {
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
/// Returns 0 on end of stream.
/// Returns 0 if reading at or past the end.
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
@ -263,18 +263,18 @@ pub const Operation = union(enum) {
status: Status(void) = .{ .unstarted = {} },
};
/// Returns 0 on end of stream.
/// May return 0 reads which is different than `error.EndOfStream`.
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
status: Status(Error!usize) = .{ .unstarted = {} },
pub const Error = error{
pub const Error = UnendingError || error{EndOfStream};
pub const UnendingError = error{
InputOutput,
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,

View file

@ -552,11 +552,13 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
});
}
pub const ReadStreamingError = error{EndOfStream} || Reader.Error;
/// Returns 0 on stream end or if `buffer` has no space available for data.
///
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize {
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) ReadStreamingError!usize {
var operation: Io.Operation = .{ .file_read_streaming = .{
.file = file,
.data = buffer,
@ -570,7 +572,6 @@ pub const ReadPositionalError = error{
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,

View file

@ -15,10 +15,9 @@ pub const Context = struct {
fr: File.Reader,
vec: [1][]u8,
err: ?Error,
eos: bool,
};
pub const Error = Allocator.Error || File.Reader.Error || Io.ConcurrentError;
pub const Error = Allocator.Error || File.ReadStreamingError || Io.ConcurrentError;
/// Trailing:
/// * `contexts: [len]Context`
@ -85,7 +84,6 @@ pub fn init(mr: *MultiReader, gpa: Allocator, io: Io, streams: *Streams, files:
},
.vec = .{&.{}},
.err = null,
.eos = false,
};
const operations = streams.operations();
const ring = streams.ring();
@ -198,8 +196,10 @@ fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void {
},
error.EndOfStream => |e| return e,
};
if (context.err != null) return error.ReadFailed;
if (context.eos) return error.EndOfStream;
if (context.err) |err| switch (err) {
error.EndOfStream => |e| return e,
else => return error.ReadFailed,
};
}
pub const FillError = Io.Batch.WaitError || error{
@ -225,10 +225,6 @@ pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillE
context.err = err;
continue;
};
if (n == 0) {
context.eos = true;
continue;
}
const r = &context.fr.interface;
r.end += n;
if (r.buffer.len - r.end < unused_capacity) {

View file

@ -26,7 +26,7 @@ size_err: ?SizeError = null,
seek_err: ?SeekError = null,
interface: Io.Reader,
pub const Error = Io.Operation.FileReadStreaming.Error || Io.Cancelable;
pub const Error = Io.Operation.FileReadStreaming.UnendingError || Io.Cancelable;
pub const SizeError = File.StatError || error{
/// Occurs if, for example, the file handle is a network socket and therefore does not have a size.
@ -280,14 +280,16 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = r.file.readStreaming(io, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = r.file.readStreaming(io, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;
@ -335,14 +337,16 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = file.readStreaming(io, dest) catch |err| {
r.err = err;
return error.ReadFailed;
const n = file.readStreaming(io, dest) catch |err| switch (err) {
error.EndOfStream => {
r.size = r.pos;
return error.EndOfStream;
},
else => |e| {
r.err = e;
return error.ReadFailed;
},
};
if (n == 0) {
r.size = r.pos;
return error.EndOfStream;
}
r.pos += n;
if (n > data_size) {
r.interface.end += n - data_size;

View file

@ -8583,14 +8583,14 @@ fn fileClose(userdata: ?*anyopaque, files: []const File) void {
for (files) |file| posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.ReadStreamingError!usize {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
if (is_windows) return fileReadStreamingWindows(file, data);
return fileReadStreamingPosix(file, data);
}
fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingError!usize {
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
for (data) |buf| {
@ -8611,28 +8611,24 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => {
syscall.finish();
if (nread == 0) return error.EndOfStream;
return nread;
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.BADF => return error.IsDir, // File operation on directory.
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
.NOTCAPABLE => return error.AccessDenied,
else => |err| return posix.unexpectedErrno(err),
}
},
.BADF => return syscall.fail(error.IsDir), // File operation on directory.
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.NOTCAPABLE => return syscall.fail(error.AccessDenied),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
@ -8643,36 +8639,33 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.Reader.Error!usiz
switch (posix.errno(rc)) {
.SUCCESS => {
syscall.finish();
if (rc == 0) return error.EndOfStream;
return @intCast(rc);
},
.INTR, .TIMEDOUT => {
try syscall.checkCancel();
continue;
},
else => |e| {
.BADF => {
syscall.finish();
switch (e) {
.INVAL => |err| return errnoBug(err),
.FAULT => |err| return errnoBug(err),
.AGAIN => return error.WouldBlock,
.BADF => {
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.IO => return error.InputOutput,
.ISDIR => return error.IsDir,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.NOTCONN => return error.SocketUnconnected,
.CONNRESET => return error.ConnectionResetByPeer,
else => |err| return posix.unexpectedErrno(err),
}
if (native_os == .wasi) return error.IsDir; // File operation on directory.
return error.NotOpenForReading;
},
.AGAIN => return syscall.fail(error.WouldBlock),
.IO => return syscall.fail(error.InputOutput),
.ISDIR => return syscall.fail(error.IsDir),
.NOBUFS => return syscall.fail(error.SystemResources),
.NOMEM => return syscall.fail(error.SystemResources),
.NOTCONN => return syscall.fail(error.SocketUnconnected),
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.INVAL => |err| return syscall.errnoBug(err),
.FAULT => |err| return syscall.errnoBug(err),
else => |err| return syscall.unexpectedErrno(err),
}
}
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!usize {
fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize {
var io_status_block: windows.IO_STATUS_BLOCK = undefined;
if (ntReadFile(file.handle, data, &io_status_block)) |result| switch (result) {
.status => return ntReadFileResult(&io_status_block),
@ -8707,11 +8700,9 @@ fn fileReadStreamingWindows(file: File, data: []const []u8) File.Reader.Error!us
fn ntReadFileResult(io_status_block: *windows.IO_STATUS_BLOCK) !usize {
switch (io_status_block.u.Status) {
.SUCCESS => {
assert(io_status_block.Information != 0);
return io_status_block.Information;
},
.END_OF_FILE, .PIPE_BROKEN => return 0,
.SUCCESS => return io_status_block.Information,
.END_OF_FILE => return error.EndOfStream,
.PIPE_BROKEN => return error.EndOfStream,
.PENDING => unreachable,
.INVALID_DEVICE_REQUEST => return error.IsDir,
.LOCK_NOT_GRANTED => return error.LockViolation,
@ -8749,15 +8740,9 @@ fn ntReadFile(handle: windows.HANDLE, data: []const []u8, iosb: *windows.IO_STAT
return .pending;
},
.SUCCESS => {
// Only END_OF_FILE is the true end.
if (iosb.Information == 0) {
try syscall.checkCancel();
continue;
} else {
syscall.finish();
iosb.u.Status = .SUCCESS;
return .status;
}
syscall.finish();
iosb.u.Status = .SUCCESS;
return .status;
},
.CANCELLED => {
try syscall.checkCancel();

View file

@ -984,7 +984,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
var bytes_read: usize = 0;
while (true) {
const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) {
error.WouldBlock => break,
error.WouldBlock, error.EndOfStream => break,
else => |e| {
std.log.debug("failed to read child progress data: {t}", .{e});
main_storage.completed_count = 0;
@ -992,7 +992,6 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
continue :main_loop;
},
};
if (n == 0) break;
if (opt_saved_metadata) |m| {
if (m.remaining_read_trash_bytes > 0) {
assert(bytes_read == 0);

View file

@ -176,19 +176,21 @@ pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions)
while (remaining > 0) {
try batch.wait(io, options.timeout);
while (batch.next()) |op| {
const n = try reads[op].file_read_streaming.status.result;
if (n == 0) {
remaining -= 1;
} else {
lists[op].items.len += n;
if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong;
if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1);
const cap = lists[op].unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vecs[op][0] = cap;
reads[op].file_read_streaming.status = .{ .unstarted = {} };
batch.add(op);
}
const n = reads[op].file_read_streaming.status.result catch |err| switch (err) {
error.EndOfStream => {
remaining -= 1;
continue;
},
else => |e| return e,
};
lists[op].items.len += n;
if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong;
if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1);
const cap = lists[op].unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vecs[op][0] = cap;
reads[op].file_read_streaming.status = .{ .unstarted = {} };
batch.add(op);
}
}
}

View file

@ -420,7 +420,6 @@ pub fn resolveTargetQuery(io: Io, query: Target.Query) DetectError!Target {
error.Canceled => |e| return e,
error.Unexpected => |e| return e,
error.WouldBlock => return error.Unexpected,
error.BrokenPipe => return error.Unexpected,
error.ConnectionResetByPeer => return error.Unexpected,
error.NotOpenForReading => return error.Unexpected,
error.SocketUnconnected => return error.Unexpected,