zig/lib/std/Io.zig
Andrew Kelley f612d5ae96 std: remove error.BrokenPipe from file reads, add error.EndOfStream
and make reading file streaming allowed to return 0 byte reads.
According to Microsoft documentation, on Windows it is possible to get
0-byte reads from pipes when 0-byte writes are made.
2026-01-27 15:33:46 -08:00

2301 lines
96 KiB
Zig

//! A cross-platform interface that abstracts all I/O operations and
//! concurrency. It includes:
//! * file system
//! * networking
//! * processes
//! * time and sleeping
//! * randomness
//! * async, await, concurrent, and cancel
//! * concurrent queues
//! * wait groups and select
//! * mutexes, futexes, events, and conditions
//! * memory mapped files
//! This interface allows programmers to write optimal, reusable code while
//! participating in these operations.
const Io = @This();
const builtin = @import("builtin");
const is_windows = builtin.os.tag == .windows;
const std = @import("std.zig");
const windows = std.os.windows;
const posix = std.posix;
const math = std.math;
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Alignment = std.mem.Alignment;
pub fn poll(
gpa: Allocator,
comptime StreamEnum: type,
files: PollFiles(StreamEnum),
) Poller(StreamEnum) {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
var result: Poller(StreamEnum) = .{
.gpa = gpa,
.readers = @splat(.failing),
.poll_fds = undefined,
.windows = if (is_windows) .{
.first_read_done = false,
.overlapped = [1]windows.OVERLAPPED{
std.mem.zeroes(windows.OVERLAPPED),
} ** enum_fields.len,
.small_bufs = undefined,
.active = .{
.count = 0,
.handles_buf = undefined,
.stream_map = undefined,
},
} else {},
};
inline for (enum_fields, 0..) |field, i| {
if (is_windows) {
result.windows.active.handles_buf[i] = @field(files, field.name).handle;
} else {
result.poll_fds[i] = .{
.fd = @field(files, field.name).handle,
.events = posix.POLL.IN,
.revents = undefined,
};
}
}
return result;
}
pub fn Poller(comptime StreamEnum: type) type {
return struct {
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
const PollFd = if (is_windows) void else posix.pollfd;
gpa: Allocator,
readers: [enum_fields.len]Reader,
poll_fds: [enum_fields.len]PollFd,
windows: if (is_windows) struct {
first_read_done: bool,
overlapped: [enum_fields.len]windows.OVERLAPPED,
small_bufs: [enum_fields.len][128]u8,
active: struct {
count: math.IntFittingRange(0, enum_fields.len),
handles_buf: [enum_fields.len]windows.HANDLE,
stream_map: [enum_fields.len]StreamEnum,
pub fn removeAt(self: *@This(), index: u32) void {
assert(index < self.count);
for (index + 1..self.count) |i| {
self.handles_buf[i - 1] = self.handles_buf[i];
self.stream_map[i - 1] = self.stream_map[i];
}
self.count -= 1;
}
},
} else void,
const Self = @This();
pub fn deinit(self: *Self) void {
const gpa = self.gpa;
if (is_windows) {
// cancel any pending IO to prevent clobbering OVERLAPPED value
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
_ = windows.kernel32.CancelIo(h);
}
}
inline for (&self.readers) |*r| gpa.free(r.buffer);
self.* = undefined;
}
pub fn poll(self: *Self) !bool {
if (is_windows) {
return pollWindows(self, null);
} else {
return pollPosix(self, null);
}
}
pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
if (is_windows) {
return pollWindows(self, nanoseconds);
} else {
return pollPosix(self, nanoseconds);
}
}
pub fn reader(self: *Self, which: StreamEnum) *Reader {
return &self.readers[@intFromEnum(which)];
}
pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 {
const gpa = self.gpa;
const r = reader(self, which);
if (r.seek == 0) {
const new = try gpa.realloc(r.buffer, r.end);
r.buffer = &.{};
r.end = 0;
return new;
}
const new = try gpa.dupe(u8, r.buffered());
gpa.free(r.buffer);
r.buffer = &.{};
r.seek = 0;
r.end = 0;
return new;
}
fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
const bump_amt = 512;
const gpa = self.gpa;
if (!self.windows.first_read_done) {
var already_read_data = false;
for (0..enum_fields.len) |i| {
const handle = self.windows.active.handles_buf[i];
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
gpa,
handle,
&self.windows.overlapped[i],
&self.readers[i],
&self.windows.small_bufs[i],
bump_amt,
)) {
.populated, .empty => |state| {
if (state == .populated) already_read_data = true;
self.windows.active.handles_buf[self.windows.active.count] = handle;
self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
self.windows.active.count += 1;
},
.closed => {}, // don't add to the wait_objects list
.closed_populated => {
// don't add to the wait_objects list, but we did already get data
already_read_data = true;
},
}
}
self.windows.first_read_done = true;
if (already_read_data) return true;
}
while (true) {
if (self.windows.active.count == 0) return false;
const status = windows.kernel32.WaitForMultipleObjects(
self.windows.active.count,
&self.windows.active.handles_buf,
0,
if (nanoseconds) |ns|
@min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
else
windows.INFINITE,
);
if (status == windows.WAIT_FAILED)
return windows.unexpectedError(windows.GetLastError());
if (status == windows.WAIT_TIMEOUT)
return true;
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
unreachable;
const active_idx = status - windows.WAIT_OBJECT_0;
const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
const handle = self.windows.active.handles_buf[active_idx];
const overlapped = &self.windows.overlapped[stream_idx];
const stream_reader = &self.readers[stream_idx];
const small_buf = &self.windows.small_bufs[stream_idx];
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => {
self.windows.active.removeAt(active_idx);
continue;
},
.aborted => unreachable,
};
const buf = small_buf[0..num_bytes_read];
const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len);
@memcpy(dest[0..buf.len], buf);
advanceBufferEnd(stream_reader, buf.len);
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
gpa,
handle,
overlapped,
stream_reader,
small_buf,
bump_amt,
)) {
.empty => {}, // irrelevant, we already got data from the small buffer
.populated => {},
.closed,
.closed_populated, // identical, since we already got data from the small buffer
=> self.windows.active.removeAt(active_idx),
}
return true;
}
}
fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
const gpa = self.gpa;
// We ask for ensureUnusedCapacity with this much extra space. This
// has more of an effect on small reads because once the reads
// start to get larger the amount of space an ArrayList will
// allocate grows exponentially.
const bump_amt = 512;
const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
else
-1);
if (events_len == 0) {
for (self.poll_fds) |poll_fd| {
if (poll_fd.fd != -1) return true;
} else return false;
}
var keep_polling = false;
for (&self.poll_fds, &self.readers) |*poll_fd, *r| {
// Try reading whatever is available before checking the error
// conditions.
// It's still possible to read after a POLL.HUP is received,
// always check if there's some data waiting to be read first.
if (poll_fd.revents & posix.POLL.IN != 0) {
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
const amt = try posix.read(poll_fd.fd, buf);
advanceBufferEnd(r, amt);
if (amt == 0) {
// Remove the fd when the EOF condition is met.
poll_fd.fd = -1;
} else {
keep_polling = true;
}
} else if (poll_fd.revents & err_mask != 0) {
// Exclude the fds that signaled an error.
poll_fd.fd = -1;
} else if (poll_fd.fd != -1) {
keep_polling = true;
}
}
return keep_polling;
}
/// Returns a slice into the unused capacity of `buffer` with at least
/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
///
/// After calling this function, typically the caller will follow up with a
/// call to `advanceBufferEnd` to report the actual number of bytes buffered.
fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 {
{
const unused = r.buffer[r.end..];
if (unused.len >= min_len) return unused;
}
if (r.seek > 0) {
const data = r.buffer[r.seek..r.end];
@memmove(r.buffer[0..data.len], data);
r.seek = 0;
r.end = data.len;
}
{
var list: std.ArrayList(u8) = .{
.items = r.buffer[0..r.end],
.capacity = r.buffer.len,
};
defer r.buffer = list.allocatedSlice();
try list.ensureUnusedCapacity(allocator, min_len);
}
const unused = r.buffer[r.end..];
assert(unused.len >= min_len);
return unused;
}
/// After writing directly into the unused capacity of `buffer`, this function
/// updates `end` so that users of `Reader` can receive the data.
fn advanceBufferEnd(r: *Reader, n: usize) void {
assert(n <= r.buffer.len - r.end);
r.end += n;
}
/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
/// compatibility, we point it to this dummy variables, which we never otherwise access.
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
var win_dummy_bytes_read: u32 = undefined;
/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
/// is available. `handle` must have no pending asynchronous operation.
fn windowsAsyncReadToFifoAndQueueSmallRead(
gpa: Allocator,
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
r: *Reader,
small_buf: *[128]u8,
bump_amt: usize,
) !enum { empty, populated, closed_populated, closed } {
var read_any_data = false;
while (true) {
const fifo_read_pending = while (true) {
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
if (0 == windows.kernel32.ReadFile(
handle,
buf.ptr,
buf_len,
&win_dummy_bytes_read,
overlapped,
)) switch (windows.GetLastError()) {
.IO_PENDING => break true,
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
else => |err| return windows.unexpectedError(err),
};
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => unreachable,
};
read_any_data = true;
advanceBufferEnd(r, num_bytes_read);
if (num_bytes_read == buf_len) {
// We filled the buffer, so there's probably more data available.
continue;
} else {
// We didn't fill the buffer, so assume we're out of data.
// There is no pending read.
break false;
}
};
if (fifo_read_pending) cancel_read: {
// Cancel the pending read into the FIFO.
_ = windows.kernel32.CancelIo(handle);
// We have to wait for the handle to be signalled, i.e. for the cancelation to complete.
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
windows.WAIT_OBJECT_0 => {},
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
else => unreachable,
}
// If it completed before we canceled, make sure to tell the FIFO!
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => break :cancel_read,
};
read_any_data = true;
advanceBufferEnd(r, num_bytes_read);
}
// Try to queue the 1-byte read.
if (0 == windows.kernel32.ReadFile(
handle,
small_buf,
small_buf.len,
&win_dummy_bytes_read,
overlapped,
)) switch (windows.GetLastError()) {
.IO_PENDING => {
// 1-byte read pending as intended
return if (read_any_data) .populated else .empty;
},
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
else => |err| return windows.unexpectedError(err),
};
// We got data back this time. Write it to the FIFO and run the main loop again.
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
.success => |n| n,
.closed => return if (read_any_data) .closed_populated else .closed,
.aborted => unreachable,
};
const buf = small_buf[0..num_bytes_read];
const dest = try writableSliceGreedyAlloc(r, gpa, buf.len);
@memcpy(dest[0..buf.len], buf);
advanceBufferEnd(r, buf.len);
read_any_data = true;
}
}
/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
///
/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
/// operation immediately returns data:
/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
/// erroneous results."
/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
/// get the actual number of bytes read."
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
fn windowsGetReadResult(
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
allow_aborted: bool,
) !union(enum) {
success: u32,
closed,
aborted,
} {
var num_bytes_read: u32 = undefined;
if (0 == windows.kernel32.GetOverlappedResult(
handle,
overlapped,
&num_bytes_read,
0,
)) switch (windows.GetLastError()) {
.BROKEN_PIPE => return .closed,
.OPERATION_ABORTED => |err| if (allow_aborted) {
return .aborted;
} else {
return windows.unexpectedError(err);
},
else => |err| return windows.unexpectedError(err),
};
return .{ .success = num_bytes_read };
}
};
}
/// Given an enum, returns a struct with fields of that enum, each field
/// representing an I/O stream for polling.
pub fn PollFiles(comptime StreamEnum: type) type {
return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(Io.File), &@splat(.{}));
}
userdata: ?*anyopaque,
vtable: *const VTable,
pub const Threaded = @import("Io/Threaded.zig");
pub const Evented = switch (builtin.os.tag) {
.linux => switch (builtin.cpu.arch) {
.x86_64, .aarch64 => IoUring,
else => void, // context-switching code not implemented yet
},
.dragonfly, .freebsd, .netbsd, .openbsd, .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => switch (builtin.cpu.arch) {
.x86_64, .aarch64 => Kqueue,
else => void, // context-switching code not implemented yet
},
else => void,
};
pub const Kqueue = @import("Io/Kqueue.zig");
pub const IoUring = @import("Io/IoUring.zig");
pub const Reader = @import("Io/Reader.zig");
pub const Writer = @import("Io/Writer.zig");
pub const net = @import("Io/net.zig");
pub const Dir = @import("Io/Dir.zig");
pub const File = @import("Io/File.zig");
pub const Terminal = @import("Io/Terminal.zig");
pub const VTable = struct {
/// If it returns `null` it means `result` has been already populated and
/// `await` will be a no-op.
///
/// When this function returns non-null, the implementation guarantees that
/// a unit of concurrency has been assigned to the returned task.
///
/// Thread-safe.
async: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// The pointer of this slice is an "eager" result value.
/// The length is the size in bytes of the result type.
/// This pointer's lifetime expires directly after the call to this function.
result: []u8,
result_alignment: std.mem.Alignment,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
/// Thread-safe.
concurrent: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
result_len: usize,
result_alignment: std.mem.Alignment,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ConcurrentError!*AnyFuture,
/// This function is only called when `async` returns a non-null value.
///
/// Thread-safe.
await: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// The same value that was returned from `async`.
any_future: *AnyFuture,
/// Points to a buffer where the result is written.
/// The length is equal to size in bytes of result type.
result: []u8,
result_alignment: std.mem.Alignment,
) void,
/// Equivalent to `await` but initiates cancel request.
///
/// This function is only called when `async` returns a non-null value.
///
/// Thread-safe.
cancel: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// The same value that was returned from `async`.
any_future: *AnyFuture,
/// Points to a buffer where the result is written.
/// The length is equal to size in bytes of result type.
result: []u8,
result_alignment: std.mem.Alignment,
) void,
/// When this function returns, implementation guarantees that `start` has
/// either already been called, or a unit of concurrency has been assigned
/// to the task of calling the function.
///
/// Thread-safe.
groupAsync: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Owner of the spawned async task.
group: *Group,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) Cancelable!void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
/// Corresponds to `Io.userdata`.
userdata: ?*anyopaque,
/// Owner of the spawned async task.
group: *Group,
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) Cancelable!void,
) ConcurrentError!void,
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
recancel: *const fn (?*anyopaque) void,
swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection,
checkCancel: *const fn (?*anyopaque) Cancelable!void,
/// Blocks until one of the futures from the list has a result ready, such
/// that awaiting it will not block. Returns that index.
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void,
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus,
dirCreateDirPathOpen: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions, Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir,
dirOpenDir: *const fn (?*anyopaque, Dir, []const u8, Dir.OpenOptions) Dir.OpenError!Dir,
dirStat: *const fn (?*anyopaque, Dir) Dir.StatError!Dir.Stat,
dirStatFile: *const fn (?*anyopaque, Dir, []const u8, Dir.StatFileOptions) Dir.StatFileError!File.Stat,
dirAccess: *const fn (?*anyopaque, Dir, []const u8, Dir.AccessOptions) Dir.AccessError!void,
dirCreateFile: *const fn (?*anyopaque, Dir, []const u8, File.CreateFlags) File.OpenError!File,
dirCreateFileAtomic: *const fn (?*anyopaque, Dir, []const u8, Dir.CreateFileAtomicOptions) Dir.CreateFileAtomicError!File.Atomic,
dirOpenFile: *const fn (?*anyopaque, Dir, []const u8, File.OpenFlags) File.OpenError!File,
dirClose: *const fn (?*anyopaque, []const Dir) void,
dirRead: *const fn (?*anyopaque, *Dir.Reader, []Dir.Entry) Dir.Reader.Error!usize,
dirRealPath: *const fn (?*anyopaque, Dir, out_buffer: []u8) Dir.RealPathError!usize,
dirRealPathFile: *const fn (?*anyopaque, Dir, path_name: []const u8, out_buffer: []u8) Dir.RealPathFileError!usize,
dirDeleteFile: *const fn (?*anyopaque, Dir, []const u8) Dir.DeleteFileError!void,
dirDeleteDir: *const fn (?*anyopaque, Dir, []const u8) Dir.DeleteDirError!void,
dirRename: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8) Dir.RenameError!void,
dirRenamePreserve: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8) Dir.RenamePreserveError!void,
dirSymLink: *const fn (?*anyopaque, Dir, target_path: []const u8, sym_link_path: []const u8, Dir.SymLinkFlags) Dir.SymLinkError!void,
dirReadLink: *const fn (?*anyopaque, Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLinkError!usize,
dirSetOwner: *const fn (?*anyopaque, Dir, ?File.Uid, ?File.Gid) Dir.SetOwnerError!void,
dirSetFileOwner: *const fn (?*anyopaque, Dir, []const u8, ?File.Uid, ?File.Gid, Dir.SetFileOwnerOptions) Dir.SetFileOwnerError!void,
dirSetPermissions: *const fn (?*anyopaque, Dir, Dir.Permissions) Dir.SetPermissionsError!void,
dirSetFilePermissions: *const fn (?*anyopaque, Dir, []const u8, File.Permissions, Dir.SetFilePermissionsOptions) Dir.SetFilePermissionsError!void,
dirSetTimestamps: *const fn (?*anyopaque, Dir, []const u8, Dir.SetTimestampsOptions) Dir.SetTimestampsError!void,
dirHardLink: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8, Dir.HardLinkOptions) Dir.HardLinkError!void,
fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
fileLength: *const fn (?*anyopaque, File) File.LengthError!u64,
fileClose: *const fn (?*anyopaque, []const File) void,
fileWriteStreaming: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize) File.Writer.Error!usize,
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
/// May return 0 reads which is different than `error.EndOfStream`.
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.ReadStreamingError!usize,
/// Returns 0 if reading at or past the end.
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
fileSync: *const fn (?*anyopaque, File) File.SyncError!void,
fileIsTty: *const fn (?*anyopaque, File) Cancelable!bool,
fileEnableAnsiEscapeCodes: *const fn (?*anyopaque, File) File.EnableAnsiEscapeCodesError!void,
fileSupportsAnsiEscapeCodes: *const fn (?*anyopaque, File) Cancelable!bool,
fileSetLength: *const fn (?*anyopaque, File, u64) File.SetLengthError!void,
fileSetOwner: *const fn (?*anyopaque, File, ?File.Uid, ?File.Gid) File.SetOwnerError!void,
fileSetPermissions: *const fn (?*anyopaque, File, File.Permissions) File.SetPermissionsError!void,
fileSetTimestamps: *const fn (?*anyopaque, File, File.SetTimestampsOptions) File.SetTimestampsError!void,
fileLock: *const fn (?*anyopaque, File, File.Lock) File.LockError!void,
fileTryLock: *const fn (?*anyopaque, File, File.Lock) File.LockError!bool,
fileUnlock: *const fn (?*anyopaque, File) void,
fileDowngradeLock: *const fn (?*anyopaque, File) File.DowngradeLockError!void,
fileRealPath: *const fn (?*anyopaque, File, out_buffer: []u8) File.RealPathError!usize,
fileHardLink: *const fn (?*anyopaque, File, Dir, []const u8, File.HardLinkOptions) File.HardLinkError!void,
fileMemoryMapCreate: *const fn (?*anyopaque, File, File.MemoryMap.CreateOptions) File.MemoryMap.CreateError!File.MemoryMap,
fileMemoryMapDestroy: *const fn (?*anyopaque, *File.MemoryMap) void,
fileMemoryMapSetLength: *const fn (?*anyopaque, *File.MemoryMap, usize) File.MemoryMap.SetLengthError!void,
fileMemoryMapRead: *const fn (?*anyopaque, *File.MemoryMap) File.ReadPositionalError!void,
fileMemoryMapWrite: *const fn (?*anyopaque, *File.MemoryMap) File.WritePositionalError!void,
processExecutableOpen: *const fn (?*anyopaque, File.OpenFlags) std.process.OpenExecutableError!File,
processExecutablePath: *const fn (?*anyopaque, buffer: []u8) std.process.ExecutablePathError!usize,
lockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!LockedStderr,
tryLockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!?LockedStderr,
unlockStderr: *const fn (?*anyopaque) void,
processSetCurrentDir: *const fn (?*anyopaque, Dir) std.process.SetCurrentDirError!void,
processReplace: *const fn (?*anyopaque, std.process.ReplaceOptions) std.process.ReplaceError,
processReplacePath: *const fn (?*anyopaque, Dir, std.process.ReplaceOptions) std.process.ReplaceError,
processSpawn: *const fn (?*anyopaque, std.process.SpawnOptions) std.process.SpawnError!std.process.Child,
processSpawnPath: *const fn (?*anyopaque, Dir, std.process.SpawnOptions) std.process.SpawnError!std.process.Child,
childWait: *const fn (?*anyopaque, *std.process.Child) std.process.Child.WaitError!std.process.Child.Term,
childKill: *const fn (?*anyopaque, *std.process.Child) void,
progressParentFile: *const fn (?*anyopaque) std.Progress.ParentFileError!File,
now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp,
sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
random: *const fn (?*anyopaque, buffer: []u8) void,
randomSecure: *const fn (?*anyopaque, buffer: []u8) RandomSecureError!void,
netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream,
netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
/// Returns 0 on end of stream.
netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize,
netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void,
netShutdown: *const fn (?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void,
netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
};
pub const Limit = enum(usize) {
nothing = 0,
unlimited = std.math.maxInt(usize),
_,
/// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`.
pub fn limited(n: usize) Limit {
return @enumFromInt(n);
}
/// Any value grater than `std.math.maxInt(usize)` is interpreted to mean
/// `.unlimited`.
pub fn limited64(n: u64) Limit {
return @enumFromInt(@min(n, std.math.maxInt(usize)));
}
pub fn countVec(data: []const []const u8) Limit {
var total: usize = 0;
for (data) |d| total += d.len;
return .limited(total);
}
pub fn min(a: Limit, b: Limit) Limit {
return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b)));
}
pub fn minInt(l: Limit, n: usize) usize {
return @min(n, @intFromEnum(l));
}
pub fn minInt64(l: Limit, n: u64) usize {
return @min(n, @intFromEnum(l));
}
pub fn slice(l: Limit, s: []u8) []u8 {
return s[0..l.minInt(s.len)];
}
pub fn sliceConst(l: Limit, s: []const u8) []const u8 {
return s[0..l.minInt(s.len)];
}
pub fn toInt(l: Limit) ?usize {
return switch (l) {
else => @intFromEnum(l),
.unlimited => null,
};
}
/// Reduces a slice to account for the limit, leaving room for one extra
/// byte above the limit, allowing for the use case of differentiating
/// between end-of-stream and reaching the limit.
pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 {
assert(non_empty_buffer.len >= 1);
return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)];
}
pub fn nonzero(l: Limit) bool {
return @intFromEnum(l) > 0;
}
/// Return a new limit reduced by `amount` or return `null` indicating
/// limit would be exceeded.
pub fn subtract(l: Limit, amount: usize) ?Limit {
if (l == .unlimited) return .unlimited;
if (amount > @intFromEnum(l)) return null;
return @enumFromInt(@intFromEnum(l) - amount);
}
};
pub const Cancelable = error{
/// Caller has requested the async operation to stop.
Canceled,
};
pub const UnexpectedError = error{
/// The Operating System returned an undocumented error code.
///
/// This error is in theory not possible, but it would be better
/// to handle this error than to invoke undefined behavior.
///
/// When this error code is observed, it usually means the Zig Standard
/// Library needs a small patch to add the error code to the error set for
/// the respective function.
Unexpected,
};
pub const Clock = enum {
/// A settable system-wide clock that measures real (i.e. wall-clock)
/// time. This clock is affected by discontinuous jumps in the system
/// time (e.g., if the system administrator manually changes the
/// clock), and by frequency adjustments performed by NTP and similar
/// applications.
///
/// This clock normally counts the number of seconds since 1970-01-01
/// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
/// leap seconds; near a leap second it is typically adjusted by NTP to
/// stay roughly in sync with UTC.
///
/// Timestamps returned by implementations of this clock represent time
/// elapsed since 1970-01-01T00:00:00Z, the POSIX/Unix epoch, ignoring
/// leap seconds. This is colloquially known as "Unix time". If the
/// underlying OS uses a different epoch for native timestamps (e.g.,
/// Windows, which uses 1601-01-01) they are translated accordingly.
real,
/// A nonsettable system-wide clock that represents time since some
/// unspecified point in the past.
///
/// Monotonic: Guarantees that the time returned by consecutive calls
/// will not go backwards, but successive calls may return identical
/// (not-increased) time values.
///
/// Not affected by discontinuous jumps in the system time (e.g., if
/// the system administrator manually changes the clock), but may be
/// affected by frequency adjustments.
///
/// This clock expresses intent to **exclude time that the system is
/// suspended**. However, implementations may be unable to satisify
/// this, and may include that time.
///
/// * On Linux, corresponds `CLOCK_MONOTONIC`.
/// * On macOS, corresponds to `CLOCK_UPTIME_RAW`.
awake,
/// Identical to `awake` except it expresses intent to **include time
/// that the system is suspended**, however, due to limitations it may
/// behave identically to `awake`.
///
/// * On Linux, corresponds `CLOCK_BOOTTIME`.
/// * On macOS, corresponds to `CLOCK_MONOTONIC_RAW`.
boot,
/// Tracks the amount of CPU in user or kernel mode used by the calling
/// process.
cpu_process,
/// Tracks the amount of CPU in user or kernel mode used by the calling
/// thread.
cpu_thread,
pub const Error = error{UnsupportedClock} || UnexpectedError;
/// This function is not cancelable because first of all it does not block,
/// but more importantly, the cancelation logic itself may want to check
/// the time.
pub fn now(clock: Clock, io: Io) Error!Io.Timestamp {
return io.vtable.now(io.userdata, clock);
}
pub const Timestamp = struct {
raw: Io.Timestamp,
clock: Clock,
/// This function is not cancelable because first of all it does not block,
/// but more importantly, the cancelation logic itself may want to check
/// the time.
pub fn now(io: Io, clock: Clock) Error!Clock.Timestamp {
return .{
.raw = try io.vtable.now(io.userdata, clock),
.clock = clock,
};
}
pub fn wait(t: Clock.Timestamp, io: Io) SleepError!void {
return io.vtable.sleep(io.userdata, .{ .deadline = t });
}
pub fn durationTo(from: Clock.Timestamp, to: Clock.Timestamp) Clock.Duration {
assert(from.clock == to.clock);
return .{
.raw = from.raw.durationTo(to.raw),
.clock = from.clock,
};
}
pub fn addDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
assert(from.clock == duration.clock);
return .{
.raw = from.raw.addDuration(duration.raw),
.clock = from.clock,
};
}
pub fn subDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
assert(from.clock == duration.clock);
return .{
.raw = from.raw.subDuration(duration.raw),
.clock = from.clock,
};
}
pub fn fromNow(io: Io, duration: Clock.Duration) Error!Clock.Timestamp {
return .{
.clock = duration.clock,
.raw = (try duration.clock.now(io)).addDuration(duration.raw),
};
}
pub fn untilNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
const now_ts = try Clock.Timestamp.now(io, timestamp.clock);
return timestamp.durationTo(now_ts);
}
pub fn durationFromNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
const now_ts = try timestamp.clock.now(io);
return .{
.clock = timestamp.clock,
.raw = now_ts.durationTo(timestamp.raw),
};
}
pub fn toClock(t: Clock.Timestamp, io: Io, clock: Clock) Error!Clock.Timestamp {
if (t.clock == clock) return t;
const now_old = try t.clock.now(io);
const now_new = try clock.now(io);
const duration = now_old.durationTo(t);
return .{
.clock = clock,
.raw = now_new.addDuration(duration),
};
}
pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool {
assert(lhs.clock == rhs.clock);
return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
}
};
pub const Duration = struct {
raw: Io.Duration,
clock: Clock,
pub fn sleep(duration: Clock.Duration, io: Io) SleepError!void {
return io.vtable.sleep(io.userdata, .{ .duration = duration });
}
};
};
pub const Timestamp = struct {
nanoseconds: i96,
pub const zero: Timestamp = .{ .nanoseconds = 0 };
pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
return .{ .nanoseconds = to.nanoseconds - from.nanoseconds };
}
pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
return .{ .nanoseconds = from.nanoseconds + duration.nanoseconds };
}
pub fn subDuration(from: Timestamp, duration: Duration) Timestamp {
return .{ .nanoseconds = from.nanoseconds - duration.nanoseconds };
}
pub fn withClock(t: Timestamp, clock: Clock) Clock.Timestamp {
return .{ .raw = t, .clock = clock };
}
pub fn fromNanoseconds(x: i96) Timestamp {
return .{ .nanoseconds = x };
}
pub fn toMilliseconds(t: Timestamp) i64 {
return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_ms));
}
pub fn toSeconds(t: Timestamp) i64 {
return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_s));
}
pub fn toNanoseconds(t: Timestamp) i96 {
return t.nanoseconds;
}
pub fn formatNumber(t: Timestamp, w: *std.Io.Writer, n: std.fmt.Number) std.Io.Writer.Error!void {
return w.printInt(t.nanoseconds, n.mode.base() orelse 10, n.case, .{
.precision = n.precision,
.width = n.width,
.alignment = n.alignment,
.fill = n.fill,
});
}
};
pub const Duration = struct {
nanoseconds: i96,
pub const zero: Duration = .{ .nanoseconds = 0 };
pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
pub fn fromNanoseconds(x: i96) Duration {
return .{ .nanoseconds = x };
}
pub fn fromMilliseconds(x: i64) Duration {
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
}
pub fn fromSeconds(x: i64) Duration {
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
}
pub fn toMilliseconds(d: Duration) i64 {
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms));
}
pub fn toSeconds(d: Duration) i64 {
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
}
pub fn toNanoseconds(d: Duration) i96 {
return d.nanoseconds;
}
};
/// Declares under what conditions an operation should return `error.Timeout`.
pub const Timeout = union(enum) {
none,
duration: Clock.Duration,
deadline: Clock.Timestamp,
pub const Error = error{ Timeout, UnsupportedClock };
pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
return switch (t) {
.none => null,
.duration => |d| try .fromNow(io, d),
.deadline => |d| d,
};
}
pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
return switch (t) {
.none => null,
.duration => |d| d,
.deadline => |d| try d.durationFromNow(io),
};
}
pub fn sleep(timeout: Timeout, io: Io) SleepError!void {
return io.vtable.sleep(io.userdata, timeout);
}
};
pub const AnyFuture = opaque {};
pub fn Future(Result: type) type {
return struct {
any_future: ?*AnyFuture,
result: Result,
/// Equivalent to `await` but places a cancelation request. This causes the task to receive
/// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
/// call to a function in `Io` which can return `error.Canceled`.
///
/// After cancelation of a task is requested, only the next cancelation point in that task
/// will return `error.Canceled`: future points will not re-signal the cancelation. As such,
/// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation
/// requests, see also `recancel` and `CancelProtection`.
///
/// Idempotent. Not threadsafe.
pub fn cancel(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
f.any_future = null;
return f.result;
}
/// Idempotent. Not threadsafe.
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
f.any_future = null;
return f.result;
}
};
}
/// An unordered set of tasks which can only be awaited or canceled as a whole.
/// Tasks are spawned in the group with `Group.async` and `Group.concurrent`.
///
/// The resources associated with each task are *guaranteed* to be released when
/// the individual task returns, as opposed to when the whole group completes or
/// is awaited. For this reason, it is not a resource leak to have a long-lived
/// group which concurrent tasks are repeatedly added to. However, asynchronous
/// tasks are not guaranteed to run until `Group.await` or `Group.cancel` is
/// called, so adding async tasks to a group without ever awaiting it may leak
/// resources.
pub const Group = struct {
/// This value indicates whether or not a group has pending tasks. `null`
/// means there are no pending tasks, and no resources associated with the
/// group, so `await` and `cancel` return immediately without calling the
/// implementation. This means that `token` must be accessed atomically to
/// avoid racing with the check in `await` and `cancel`.
token: std.atomic.Value(?*anyopaque),
/// This value is available for the implementation to use as it wishes.
state: usize,
pub const init: Group = .{ .token = .init(null), .state = 0 };
/// Equivalent to `Io.async`, except the task is spawned in this `Group`
/// instead of becoming associated with a `Future`.
///
/// The return type of `function` must be coercible to `Cancelable!void`.
///
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
return @call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}
/// Equivalent to `Io.concurrent`, except the task is spawned in this
/// `Group` instead of becoming associated with a `Future`.
///
/// The return type of `function` must be coercible to `Cancelable!void`.
///
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
return @call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}
/// Blocks until all tasks of the group finish. During this time,
/// cancelation requests propagate to all members of the group, and
/// will also cause `error.Canceled` to be returned when the group
/// does ultimately finish.
///
/// Idempotent. Not threadsafe.
///
/// It is safe to call this function concurrently with `Group.async` or
/// `Group.concurrent`, provided that the group does not complete until
/// the call to `Group.async` or `Group.concurrent` returns.
pub fn await(g: *Group, io: Io) Cancelable!void {
const token = g.token.load(.acquire) orelse return;
try io.vtable.groupAwait(io.userdata, g, token);
assert(g.token.raw == null);
}
/// Equivalent to `await` but immediately requests cancelation on all
/// members of the group.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// Idempotent. Not threadsafe.
///
/// It is safe to call this function concurrently with `Group.async` or
/// `Group.concurrent`, provided that the group does not complete until
/// the call to `Group.async` or `Group.concurrent` returns.
pub fn cancel(g: *Group, io: Io) void {
const token = g.token.load(.acquire) orelse return;
io.vtable.groupCancel(io.userdata, g, token);
assert(g.token.raw == null);
}
};
/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the
/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation
/// point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn recancel(io: Io) void {
io.vtable.recancel(io.userdata);
}
/// In rare cases, it is desirable to completely block cancelation notification, so that a region
/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every
/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce
/// cancelation points.
///
/// To modify a task's cancel protection state, see `swapCancelProtection`.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub const CancelProtection = enum {
/// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point.
///
/// This is the default state, which all tasks are created in.
unblocked,
/// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned).
blocked,
};
/// Updates the current task's cancel protection state (see `CancelProtection`).
///
/// The typical usage for this function is to protect a block of code from cancelation:
/// ```
/// const old_cancel_protect = io.swapCancelProtection(.blocked);
/// defer _ = io.swapCancelProtection(old_cancel_protect);
/// doSomeWork() catch |err| switch (err) {
/// error.Canceled => unreachable,
/// };
/// ```
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection {
return io.vtable.swapCancelProtection(io.userdata, new);
}
/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`)
/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding
/// non-blocked cancelation request, but otherwise is a no-op.
///
/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound
/// tasks which may need to respond to cancelation before completing. Short tasks, or those which
/// perform other `Io` operations (and hence have other cancelation points), will typically already
/// respond quickly to cancelation requests.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn checkCancel(io: Io) Cancelable!void {
return io.vtable.checkCancel(io.userdata);
}
pub fn Select(comptime U: type) type {
return struct {
io: Io,
group: Group,
queue: Queue(U),
outstanding: usize,
const S = @This();
pub const Union = U;
pub const Field = std.meta.FieldEnum(U);
pub fn init(io: Io, buffer: []U) S {
return .{
.io = io,
.queue = .init(buffer),
.group = .init,
.outstanding = 0,
};
}
/// Calls `function` with `args` asynchronously. The resource spawned is
/// owned by the select.
///
/// `function` must have return type matching the `field` field of `Union`.
///
/// `function` *may* be called immediately, before `async` returns.
///
/// When this function returns, it is guaranteed that `function` has
/// already been called and completed, or it has successfully been
/// assigned a unit of concurrency.
///
/// After this is called, `wait` or `cancel` must be called before the
/// select is deinitialized.
///
/// Threadsafe.
///
/// Related:
/// * `Io.async`
/// * `Group.async`
pub fn async(
s: *S,
comptime field: Field,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) void {
const Context = struct {
select: *S,
args: @TypeOf(args),
fn start(type_erased_context: *const anyopaque) Cancelable!void {
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args));
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
error.Closed => unreachable,
};
}
};
const context: Context = .{ .select = s, .args = args };
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
}
/// Blocks until another task of the select finishes.
///
/// Asserts there is at least one more `outstanding` task.
///
/// Not threadsafe.
pub fn await(s: *S) Cancelable!U {
s.outstanding -= 1;
return s.queue.getOne(s.io) catch |err| switch (err) {
error.Canceled => |e| return e,
error.Closed => unreachable,
};
}
/// Equivalent to `wait` but requests cancelation on all remaining
/// tasks owned by the select.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// It is illegal to call `wait` after this.
///
/// Idempotent. Not threadsafe.
pub fn cancel(s: *S) void {
s.outstanding = 0;
s.group.cancel(s.io);
}
};
}
/// Atomically checks if the value at `ptr` equals `expected`, and if so, blocks until either:
///
/// * a matching (same `ptr` argument) `futexWake` call occurs, or
/// * a spurious ("random") wakeup occurs.
///
/// Typically, `futexWake` should be called immediately after updating the value at `ptr.*`, to
/// unblock tasks using `futexWait` to wait for the value to change from what it previously was.
///
/// The caller is responsible for identifying spurious wakeups if necessary, typically by checking
/// the value at `ptr.*`.
///
/// Asserts that `T` is 4 bytes in length and has a well-defined layout with no padding bits.
pub fn futexWait(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) Cancelable!void {
return futexWaitTimeout(io, T, ptr, expected, .none);
}
/// Same as `futexWait`, except also unblocks if `timeout` expires. As with `futexWait`, spurious
/// wakeups are possible. It remains the caller's responsibility to differentiate between these
/// three possible wake-up reasons if necessary.
pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T, timeout: Timeout) Cancelable!void {
const expected_int: u32 = switch (@typeInfo(T)) {
.@"enum" => @bitCast(@intFromEnum(expected)),
else => @bitCast(expected),
};
return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_int, timeout);
}
/// Same as `futexWait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void {
const expected_int: u32 = switch (@typeInfo(T)) {
.@"enum" => @bitCast(@intFromEnum(expected)),
else => @bitCast(expected),
};
io.vtable.futexWaitUncancelable(io.userdata, @ptrCast(ptr), expected_int);
}
/// Unblocks pending futex waits on `ptr`, up to a limit of `max_waiters` calls.
pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, max_waiters: u32) void {
comptime assert(@sizeOf(T) == @sizeOf(u32));
if (max_waiters == 0) return;
return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters);
}
/// Mutex is a synchronization primitive which enforces atomic access to a
/// shared region of code known as the "critical section".
///
/// Mutex is an extern struct so that it may be used as a field inside another
/// extern struct. Having a guaranteed memory layout including mutexes is
/// important for IPC over shared memory (mmap).
pub const Mutex = extern struct {
state: std.atomic.Value(State),
pub const init: Mutex = .{ .state = .init(.unlocked) };
pub const State = enum(u32) {
unlocked,
locked_once,
contended,
};
pub fn tryLock(m: *Mutex) bool {
switch (m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse return true) {
.unlocked => unreachable,
.locked_once, .contended => return false,
}
}
pub fn lock(m: *Mutex, io: Io) Cancelable!void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
try io.futexWait(State, &m.state.raw, .contended);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
try io.futexWait(State, &m.state.raw, .contended);
}
}
/// Same as `lock`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn lockUncancelable(m: *Mutex, io: Io) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
io.futexWaitUncancelable(State, &m.state.raw, .contended);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
io.futexWaitUncancelable(State, &m.state.raw, .contended);
}
}
pub fn unlock(m: *Mutex, io: Io) void {
switch (m.state.swap(.unlocked, .release)) {
.unlocked => unreachable,
.locked_once => {},
.contended => {
@branchHint(.unlikely);
io.futexWake(State, &m.state.raw, 1);
},
}
}
};
pub const Condition = struct {
state: std.atomic.Value(State),
/// Incremented whenever the condition is signaled
epoch: std.atomic.Value(u32),
const State = packed struct(u32) {
waiters: u16,
signals: u16,
};
pub const init: Condition = .{
.state = .init(.{ .waiters = 0, .signals = 0 }),
.epoch = .init(0),
};
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
try waitInner(cond, io, mutex, false);
}
/// Same as `wait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
waitInner(cond, io, mutex, true) catch |err| switch (err) {
error.Canceled => unreachable,
};
}
fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void {
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters
}
mutex.unlock(io);
defer mutex.lockUncancelable(io);
while (true) {
const result = if (uncancelable)
io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch)
else
io.futexWait(u32, &cond.epoch.raw, epoch);
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
// Even on error, try to consume a pending signal first. Otherwise a race might
// cause a signal to get stuck in the state with no corresponding waiter.
{
var prev_state = cond.state.load(.monotonic);
while (prev_state.signals > 0) {
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters - 1,
.signals = prev_state.signals - 1,
}, .acquire, .monotonic) orelse {
// We successfully consumed a signal.
return;
};
}
}
// There are no more signals available; this was a spurious wakeup or an error. If it
// was an error, we will remove ourselves as a waiter and return that error. Otherwise,
// we'll loop back to the futex wait.
result catch |err| {
const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters > 0); // underflow caused by illegal state
return err;
};
}
}
pub fn signal(cond: *Condition, io: Io) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters,
.signals = prev_state.signals + 1,
}, .release, .monotonic) orelse {
// Update the epoch to tell the waiting threads that there are new signals for them.
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
// between it observing the epoch and sleeping on it, but this is extraordinarily
// unlikely due to the precise number of calls required.
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
io.futexWake(u32, &cond.epoch.raw, 1);
return;
};
}
}
pub fn broadcast(cond: *Condition, io: Io) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters,
.signals = prev_state.waiters,
}, .release, .monotonic) orelse {
// Update the epoch to tell the waiting threads that there are new signals for them.
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
// between it observing the epoch and sleeping on it, but this is extraordinarily
// unlikely due to the precise number of calls required.
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
io.futexWake(u32, &cond.epoch.raw, prev_state.waiters - prev_state.signals);
return;
};
}
}
};
/// Logical boolean flag which can be set and unset and supports a "wait until set" operation.
pub const Event = enum(u32) {
unset,
waiting,
is_set,
/// Returns whether the logical boolean is `true`.
pub fn isSet(event: *const Event) bool {
return switch (@atomicLoad(Event, event, .acquire)) {
.unset, .waiting => false,
.is_set => true,
};
}
/// Blocks until the logical boolean is `true`.
pub fn wait(event: *Event, io: Io) Io.Cancelable!void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => {},
.is_set => return,
};
errdefer {
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
// might be other threads waiting on the event. In theory we could track the *number* of
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
}
while (true) {
try io.futexWait(Event, event, .waiting);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => continue,
.is_set => return,
}
}
}
/// Same as `wait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(event: *Event, io: Io) void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => {},
.is_set => return,
};
while (true) {
io.futexWaitUncancelable(Event, event, .waiting);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => continue,
.is_set => return,
}
}
}
pub const WaitTimeoutError = error{Timeout} || Cancelable;
/// Blocks the calling thread until either the logical boolean is set, the timeout expires, or a
/// spurious wakeup occurs. If the timeout expires or a spurious wakeup occurs, `error.Timeout`
/// is returned.
pub fn waitTimeout(event: *Event, io: Io, timeout: Timeout) WaitTimeoutError!void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => assert(!builtin.single_threaded), // invalid state
.is_set => return,
};
errdefer {
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
// might be other threads waiting on the event. In theory we could track the *number* of
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
}
try io.futexWaitTimeout(Event, event, .waiting, timeout);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => return error.Timeout,
.is_set => return,
}
}
/// Sets the logical boolean to true, and hence unblocks any pending calls to `wait`. The
/// logical boolean remains true until `reset` is called, so future calls to `set` have no
/// semantic effect.
///
/// Any memory accesses prior to a `set` call are "released", so that if this `set` call causes
/// `isSet` to return `true` or a wait to finish, those tasks will be able to observe those
/// memory accesses.
pub fn set(e: *Event, io: Io) void {
switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) {
.unset, .is_set => {},
.waiting => io.futexWake(Event, e, std.math.maxInt(u32)),
}
}
/// Sets the logical boolean to false.
///
/// Assumes that there is no pending call to `wait` or `waitUncancelable`.
///
/// However, concurrent calls to `isSet`, `set`, and `reset` are allowed.
pub fn reset(e: *Event) void {
@atomicStore(Event, e, .unset, .monotonic);
}
};
pub const QueueClosedError = error{Closed};
pub const TypeErasedQueue = struct {
mutex: Mutex,
closed: bool,
/// Ring buffer. This data is logically *after* queued getters.
buffer: []u8,
start: usize,
len: usize,
putters: std.DoublyLinkedList,
getters: std.DoublyLinkedList,
const Put = struct {
remaining: []const u8,
needed: usize,
condition: Condition,
node: std.DoublyLinkedList.Node,
};
const Get = struct {
remaining: []u8,
needed: usize,
condition: Condition,
node: std.DoublyLinkedList.Node,
};
pub fn init(buffer: []u8) TypeErasedQueue {
return .{
.mutex = .init,
.closed = false,
.buffer = buffer,
.start = 0,
.len = 0,
.putters = .{},
.getters = .{},
};
}
pub fn close(q: *TypeErasedQueue, io: Io) void {
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
q.closed = true;
{
var it = q.getters.first;
while (it) |node| : (it = node.next) {
const getter: *Get = @alignCast(@fieldParentPtr("node", node));
getter.condition.signal(io);
}
}
{
var it = q.putters.first;
while (it) |node| : (it = node.next) {
const putter: *Put = @alignCast(@fieldParentPtr("node", node));
putter.condition.signal(io);
}
}
}
pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) (QueueClosedError || Cancelable)!usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
try q.mutex.lock(io);
defer q.mutex.unlock(io);
return q.putLocked(io, elements, min, false);
}
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) QueueClosedError!usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
return q.putLocked(io, elements, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
error.Closed => |e| return e,
};
}
fn puttableSlice(q: *const TypeErasedQueue) ?[]u8 {
const unwrapped_index = q.start + q.len;
const wrapped_index, const overflow = @subWithOverflow(unwrapped_index, q.buffer.len);
const slice = switch (overflow) {
1 => q.buffer[unwrapped_index..],
0 => q.buffer[wrapped_index..q.start],
};
return if (slice.len > 0) slice else null;
}
fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
// A closed queue cannot be added to, even if there is space in the buffer.
if (q.closed) return error.Closed;
// Getters have first priority on the data, and only when the getters
// queue is empty do we start populating the buffer.
// The number of elements we add immediately, before possibly blocking.
var n: usize = 0;
while (q.getters.popFirst()) |getter_node| {
const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
const copy_len = @min(getter.remaining.len, elements.len - n);
assert(copy_len > 0);
@memcpy(getter.remaining[0..copy_len], elements[n..][0..copy_len]);
getter.remaining = getter.remaining[copy_len..];
getter.needed -|= copy_len;
n += copy_len;
if (getter.needed == 0) {
getter.condition.signal(io);
} else {
assert(n == elements.len); // we didn't have enough elements for the getter
q.getters.prepend(getter_node);
}
if (n == elements.len) return elements.len;
}
while (q.puttableSlice()) |slice| {
const copy_len = @min(slice.len, elements.len - n);
assert(copy_len > 0);
@memcpy(slice[0..copy_len], elements[n..][0..copy_len]);
q.len += copy_len;
n += copy_len;
if (n == elements.len) return elements.len;
}
// Don't block if we hit the target.
if (n >= target) return n;
var pending: Put = .{
.remaining = elements[n..],
.needed = target - n,
.condition = .init,
.node = .{},
};
q.putters.append(&pending.node);
defer if (pending.needed > 0) q.putters.remove(&pending.node);
while (pending.needed > 0 and !q.closed) {
if (uncancelable) {
pending.condition.waitUncancelable(io, &q.mutex);
continue;
}
pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
error.Canceled => if (pending.remaining.len == elements.len) {
// Canceled while waiting, and appended no elements.
return error.Canceled;
} else {
// Canceled while waiting, but appended some elements, so report those first.
io.recancel();
return elements.len - pending.remaining.len;
},
};
}
if (pending.remaining.len == elements.len) {
// The queue was closed while we were waiting. We appended no elements.
assert(q.closed);
return error.Closed;
}
return elements.len - pending.remaining.len;
}
pub fn get(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) (QueueClosedError || Cancelable)!usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
try q.mutex.lock(io);
defer q.mutex.unlock(io);
return q.getLocked(io, buffer, min, false);
}
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) QueueClosedError!usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
error.Closed => |e| return e,
};
}
fn gettableSlice(q: *const TypeErasedQueue) ?[]const u8 {
const overlong_slice = q.buffer[q.start..];
const slice = overlong_slice[0..@min(overlong_slice.len, q.len)];
return if (slice.len > 0) slice else null;
}
fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
// The ring buffer gets first priority, then data should come from any
// queued putters, then finally the ring buffer should be filled with
// data from putters so they can be resumed.
// The number of elements we received immediately, before possibly blocking.
var n: usize = 0;
while (q.gettableSlice()) |slice| {
const copy_len = @min(slice.len, buffer.len - n);
assert(copy_len > 0);
@memcpy(buffer[n..][0..copy_len], slice[0..copy_len]);
q.start += copy_len;
if (q.buffer.len - q.start == 0) q.start = 0;
q.len -= copy_len;
n += copy_len;
if (n == buffer.len) {
q.fillRingBufferFromPutters(io);
return buffer.len;
}
}
// Copy directly from putters into buffer.
while (q.putters.popFirst()) |putter_node| {
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
const copy_len = @min(putter.remaining.len, buffer.len - n);
assert(copy_len > 0);
@memcpy(buffer[n..][0..copy_len], putter.remaining[0..copy_len]);
putter.remaining = putter.remaining[copy_len..];
putter.needed -|= copy_len;
n += copy_len;
if (putter.needed == 0) {
putter.condition.signal(io);
} else {
assert(n == buffer.len); // we didn't have enough space for the putter
q.putters.prepend(putter_node);
}
if (n == buffer.len) {
q.fillRingBufferFromPutters(io);
return buffer.len;
}
}
// No need to call `fillRingBufferFromPutters` from this point onwards,
// because we emptied the ring buffer *and* the putter queue!
// Don't block if we hit the target or if the queue is closed. Return how
// many elements we could get immediately, unless the queue was closed and
// empty, in which case report `error.Closed`.
if (n == 0 and q.closed) return error.Closed;
if (n >= target or q.closed) return n;
var pending: Get = .{
.remaining = buffer[n..],
.needed = target - n,
.condition = .init,
.node = .{},
};
q.getters.append(&pending.node);
defer if (pending.needed > 0) q.getters.remove(&pending.node);
while (pending.needed > 0 and !q.closed) {
if (uncancelable) {
pending.condition.waitUncancelable(io, &q.mutex);
continue;
}
pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
error.Canceled => if (pending.remaining.len == buffer.len) {
// Canceled while waiting, and received no elements.
return error.Canceled;
} else {
// Canceled while waiting, but received some elements, so report those first.
io.recancel();
return buffer.len - pending.remaining.len;
},
};
}
if (pending.remaining.len == buffer.len) {
// The queue was closed while we were waiting. We received no elements.
assert(q.closed);
return error.Closed;
}
return buffer.len - pending.remaining.len;
}
/// Called when there is nonzero space available in the ring buffer and
/// potentially putters waiting. The mutex is already held and the task is
/// to copy putter data to the ring buffer and signal any putters whose
/// buffers been fully copied.
fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
while (q.putters.popFirst()) |putter_node| {
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
while (q.puttableSlice()) |slice| {
const copy_len = @min(slice.len, putter.remaining.len);
assert(copy_len > 0);
@memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
q.len += copy_len;
putter.remaining = putter.remaining[copy_len..];
putter.needed -|= copy_len;
if (putter.needed == 0) {
putter.condition.signal(io);
break;
}
} else {
q.putters.prepend(putter_node);
break;
}
}
}
};
/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
/// When buffer is empty, consumers suspend and are resumed by producers.
/// When buffer is full, producers suspend and are resumed by consumers.
pub fn Queue(Elem: type) type {
return struct {
type_erased: TypeErasedQueue,
pub fn init(buffer: []Elem) @This() {
return .{ .type_erased = .init(@ptrCast(buffer)) };
}
pub fn close(q: *@This(), io: Io) void {
q.type_erased.close(io);
}
/// Appends elements to the end of the queue, potentially blocking if
/// there is insufficient capacity. Returns when any one of the
/// following conditions is satisfied:
///
/// * At least `target` elements have been added to the queue
/// * The queue is closed
/// * The current task is canceled
///
/// Returns how many of `elements` have been added to the queue, if any.
/// If an error is returned, no elements have been added.
///
/// If the queue is closed or the task is canceled, but some items were
/// already added before the closure or cancelation, then `put` may
/// return a number lower than `target`, in which case future calls are
/// guaranteed to return `error.Canceled` or `error.Closed`.
///
/// A return value of 0 is only possible if `target` is 0, in which case
/// the call is guaranteed to queue as many of `elements` as is possible
/// *without* blocking.
///
/// Asserts that `elements.len >= target`.
pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `put` but blocks until all elements have been added to the queue.
///
/// If the queue is closed or canceled, `error.Closed` or `error.Canceled`
/// is returned, and it is unspecified how many, if any, of `elements` were
/// added to the queue prior to cancelation or closure.
pub fn putAll(q: *@This(), io: Io, elements: []const Elem) (QueueClosedError || Cancelable)!void {
const n = try q.put(io, elements, elements.len);
if (n != elements.len) {
_ = try q.put(io, elements[n..], elements.len - n);
unreachable; // partial `put` implies queue was closed or we were canceled
}
}
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) QueueClosedError!usize {
return @divExact(try q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Appends `item` to the end of the queue, blocking if the queue is full.
pub fn putOne(q: *@This(), io: Io, item: Elem) (QueueClosedError || Cancelable)!void {
assert(try q.put(io, &.{item}, 1) == 1);
}
/// Same as `putOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) QueueClosedError!void {
assert(try q.putUncancelable(io, &.{item}, 1) == 1);
}
/// Receives elements from the beginning of the queue, potentially blocking
/// if there are insufficient elements currently in the queue. Returns when
/// any one of the following conditions is satisfied:
///
/// * At least `target` elements have been received from the queue
/// * The queue is closed and contains no buffered elements
/// * The current task is canceled
///
/// Returns how many elements of `buffer` have been populated, if any.
/// If an error is returned, no elements have been populated.
///
/// If the queue is closed or the task is canceled, but some items were
/// already received before the closure or cancelation, then `get` may
/// return a number lower than `target`, in which case future calls are
/// guaranteed to return `error.Canceled` or `error.Closed`.
///
/// A return value of 0 is only possible if `target` is 0, in which case
/// the call is guaranteed to fill as much of `buffer` as is possible
/// *without* blocking.
///
/// Asserts that `buffer.len >= target`.
pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) QueueClosedError!usize {
return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Receives one element from the beginning of the queue, blocking if the queue is empty.
pub fn getOne(q: *@This(), io: Io) (QueueClosedError || Cancelable)!Elem {
var buf: [1]Elem = undefined;
assert(try q.get(io, &buf, 1) == 1);
return buf[0];
}
/// Same as `getOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getOneUncancelable(q: *@This(), io: Io) QueueClosedError!Elem {
var buf: [1]Elem = undefined;
assert(try q.getUncancelable(io, &buf, 1) == 1);
return buf[0];
}
/// Returns buffer length in `Elem` units.
pub fn capacity(q: *const @This()) usize {
return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));
}
};
}
/// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called.
///
/// `function` *may* be called immediately, before `async` returns. This has
/// weaker guarantees than `concurrent`, making more portable and reusable.
///
/// When this function returns, it is guaranteed that `function` has already
/// been called and completed, or it has successfully been assigned a unit of
/// concurrency.
///
/// See also:
/// * `Group`
pub fn async(
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque, result: *anyopaque) void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const result_casted: *Result = @ptrCast(@alignCast(result));
result_casted.* = @call(.auto, function, args_casted.*);
}
};
var future: Future(Result) = undefined;
future.any_future = io.vtable.async(
io.userdata,
@ptrCast(&future.result),
.of(Result),
@ptrCast(&args),
.of(Args),
TypeErased.start,
);
return future;
}
pub const ConcurrentError = error{
/// May occur due to a temporary condition such as resource exhaustion, or
/// to the Io implementation not supporting concurrency.
ConcurrencyUnavailable,
};
/// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called, allowing the caller
/// to progress while waiting for any `Io` operations.
///
/// This has stronger guarantee than `async`, placing restrictions on what kind
/// of `Io` implementations are supported. By calling `async` instead, one
/// allows, for example, stackful single-threaded blocking I/O.
pub fn concurrent(
io: Io,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(context: *const anyopaque, result: *anyopaque) void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const result_casted: *Result = @ptrCast(@alignCast(result));
result_casted.* = @call(.auto, function, args_casted.*);
}
};
var future: Future(Result) = undefined;
future.any_future = try io.vtable.concurrent(
io.userdata,
@sizeOf(Result),
.of(Result),
@ptrCast(&args),
.of(Args),
TypeErased.start,
);
return future;
}
pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {
return io.vtable.sleep(io.userdata, .{ .duration = .{
.raw = duration,
.clock = clock,
} });
}
/// Given a struct with each field a `*Future`, returns a union with the same
/// fields, each field type the future's result.
pub fn SelectUnion(S: type) type {
const struct_fields = @typeInfo(S).@"struct".fields;
var names: [struct_fields.len][]const u8 = undefined;
var types: [struct_fields.len]type = undefined;
for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| {
const FieldFuture = @typeInfo(struct_field.type).pointer.child;
union_field_name.* = struct_field.name;
UnionFieldType.* = @FieldType(FieldFuture, "result");
}
return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{}));
}
/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
/// and can be different for each field.
pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {
const U = SelectUnion(@TypeOf(s));
const S = @TypeOf(s);
const fields = @typeInfo(S).@"struct".fields;
var futures: [fields.len]*AnyFuture = undefined;
inline for (fields, &futures) |field, *any_future| {
const future = @field(s, field.name);
any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result);
}
switch (try io.vtable.select(io.userdata, &futures)) {
inline 0...(fields.len - 1) => |selected_index| {
const field_name = fields[selected_index].name;
return @unionInit(U, field_name, @field(s, field_name).await(io));
},
else => unreachable,
}
}
pub const LockedStderr = struct {
file_writer: *File.Writer,
terminal_mode: Terminal.Mode,
pub fn terminal(ls: LockedStderr) Terminal {
return .{
.writer = &ls.file_writer.interface,
.mode = ls.terminal_mode,
};
}
pub fn clear(ls: LockedStderr, buffer: []u8) Cancelable!void {
const fw = ls.file_writer;
std.Progress.clearWrittenWithEscapeCodes(fw) catch |err| switch (err) {
error.WriteFailed => switch (fw.err.?) {
error.Canceled => |e| return e,
else => {},
},
};
fw.interface.flush() catch |err| switch (err) {
error.WriteFailed => switch (fw.err.?) {
error.Canceled => |e| return e,
else => {},
},
};
fw.interface.buffer = buffer;
}
};
/// For doing application-level writes to the standard error stream.
/// Coordinates also with debug-level writes that are ignorant of Io interface
/// and implementations. When this returns, `std.process.stderr_thread_mutex`
/// will be locked.
///
/// See also:
/// * `tryLockStderr`
pub fn lockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!LockedStderr {
const ls = try io.vtable.lockStderr(io.userdata, terminal_mode);
try ls.clear(buffer);
return ls;
}
/// Same as `lockStderr` but non-blocking.
pub fn tryLockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!?LockedStderr {
const ls = (try io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode)) orelse return null;
try ls.clear(buffer);
return ls;
}
pub fn unlockStderr(io: Io) void {
return io.vtable.unlockStderr(io.userdata);
}
/// Obtains entropy from a cryptographically secure pseudo-random number
/// generator.
///
/// The implementation *may* store RNG state in process memory and use it to
/// fill `buffer`.
///
/// The randomness is seeded by `randomSecure`, or a less secure mechanism upon
/// failure.
///
/// Threadsafe.
///
/// See also `randomSecure`.
pub fn random(io: Io, buffer: []u8) void {
return io.vtable.random(io.userdata, buffer);
}
pub const RandomSecureError = error{EntropyUnavailable} || Cancelable;
/// Obtains cryptographically secure entropy from outside the process.
///
/// Always makes a syscall, or otherwise avoids dependency on process memory,
/// in order to obtain fresh randomness. Does not rely on stored RNG state.
///
/// Does not have any fallback mechanisms; returns `error.EntropyUnavailable`
/// if any problems occur.
///
/// Threadsafe.
///
/// See also `random`.
pub fn randomSecure(io: Io, buffer: []u8) RandomSecureError!void {
return io.vtable.randomSecure(io.userdata, buffer);
}
test {
_ = net;
_ = File;
_ = Dir;
_ = Reader;
_ = Writer;
_ = Evented;
_ = Threaded;
_ = @import("Io/test.zig");
}