mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 04:24:33 +01:00
breaking change: the error for renaming over a non-empty directory now returns error.DirNotEmpty rather than error.PathAlreadyExists.
2281 lines
95 KiB
Zig
2281 lines
95 KiB
Zig
const builtin = @import("builtin");
|
|
const is_windows = builtin.os.tag == .windows;
|
|
|
|
const std = @import("std.zig");
|
|
const windows = std.os.windows;
|
|
const posix = std.posix;
|
|
const math = std.math;
|
|
const assert = std.debug.assert;
|
|
const Allocator = std.mem.Allocator;
|
|
const Alignment = std.mem.Alignment;
|
|
|
|
pub const Limit = enum(usize) {
|
|
nothing = 0,
|
|
unlimited = std.math.maxInt(usize),
|
|
_,
|
|
|
|
/// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`.
|
|
pub fn limited(n: usize) Limit {
|
|
return @enumFromInt(n);
|
|
}
|
|
|
|
/// Any value grater than `std.math.maxInt(usize)` is interpreted to mean
|
|
/// `.unlimited`.
|
|
pub fn limited64(n: u64) Limit {
|
|
return @enumFromInt(@min(n, std.math.maxInt(usize)));
|
|
}
|
|
|
|
pub fn countVec(data: []const []const u8) Limit {
|
|
var total: usize = 0;
|
|
for (data) |d| total += d.len;
|
|
return .limited(total);
|
|
}
|
|
|
|
pub fn min(a: Limit, b: Limit) Limit {
|
|
return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b)));
|
|
}
|
|
|
|
pub fn minInt(l: Limit, n: usize) usize {
|
|
return @min(n, @intFromEnum(l));
|
|
}
|
|
|
|
pub fn minInt64(l: Limit, n: u64) usize {
|
|
return @min(n, @intFromEnum(l));
|
|
}
|
|
|
|
pub fn slice(l: Limit, s: []u8) []u8 {
|
|
return s[0..l.minInt(s.len)];
|
|
}
|
|
|
|
pub fn sliceConst(l: Limit, s: []const u8) []const u8 {
|
|
return s[0..l.minInt(s.len)];
|
|
}
|
|
|
|
pub fn toInt(l: Limit) ?usize {
|
|
return switch (l) {
|
|
else => @intFromEnum(l),
|
|
.unlimited => null,
|
|
};
|
|
}
|
|
|
|
/// Reduces a slice to account for the limit, leaving room for one extra
|
|
/// byte above the limit, allowing for the use case of differentiating
|
|
/// between end-of-stream and reaching the limit.
|
|
pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 {
|
|
assert(non_empty_buffer.len >= 1);
|
|
return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)];
|
|
}
|
|
|
|
pub fn nonzero(l: Limit) bool {
|
|
return @intFromEnum(l) > 0;
|
|
}
|
|
|
|
/// Return a new limit reduced by `amount` or return `null` indicating
|
|
/// limit would be exceeded.
|
|
pub fn subtract(l: Limit, amount: usize) ?Limit {
|
|
if (l == .unlimited) return .unlimited;
|
|
if (amount > @intFromEnum(l)) return null;
|
|
return @enumFromInt(@intFromEnum(l) - amount);
|
|
}
|
|
};
|
|
|
|
pub const Reader = @import("Io/Reader.zig");
|
|
pub const Writer = @import("Io/Writer.zig");
|
|
|
|
pub fn poll(
|
|
gpa: Allocator,
|
|
comptime StreamEnum: type,
|
|
files: PollFiles(StreamEnum),
|
|
) Poller(StreamEnum) {
|
|
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
|
|
var result: Poller(StreamEnum) = .{
|
|
.gpa = gpa,
|
|
.readers = @splat(.failing),
|
|
.poll_fds = undefined,
|
|
.windows = if (is_windows) .{
|
|
.first_read_done = false,
|
|
.overlapped = [1]windows.OVERLAPPED{
|
|
std.mem.zeroes(windows.OVERLAPPED),
|
|
} ** enum_fields.len,
|
|
.small_bufs = undefined,
|
|
.active = .{
|
|
.count = 0,
|
|
.handles_buf = undefined,
|
|
.stream_map = undefined,
|
|
},
|
|
} else {},
|
|
};
|
|
|
|
inline for (enum_fields, 0..) |field, i| {
|
|
if (is_windows) {
|
|
result.windows.active.handles_buf[i] = @field(files, field.name).handle;
|
|
} else {
|
|
result.poll_fds[i] = .{
|
|
.fd = @field(files, field.name).handle,
|
|
.events = posix.POLL.IN,
|
|
.revents = undefined,
|
|
};
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
pub fn Poller(comptime StreamEnum: type) type {
|
|
return struct {
|
|
const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
|
|
const PollFd = if (is_windows) void else posix.pollfd;
|
|
|
|
gpa: Allocator,
|
|
readers: [enum_fields.len]Reader,
|
|
poll_fds: [enum_fields.len]PollFd,
|
|
windows: if (is_windows) struct {
|
|
first_read_done: bool,
|
|
overlapped: [enum_fields.len]windows.OVERLAPPED,
|
|
small_bufs: [enum_fields.len][128]u8,
|
|
active: struct {
|
|
count: math.IntFittingRange(0, enum_fields.len),
|
|
handles_buf: [enum_fields.len]windows.HANDLE,
|
|
stream_map: [enum_fields.len]StreamEnum,
|
|
|
|
pub fn removeAt(self: *@This(), index: u32) void {
|
|
assert(index < self.count);
|
|
for (index + 1..self.count) |i| {
|
|
self.handles_buf[i - 1] = self.handles_buf[i];
|
|
self.stream_map[i - 1] = self.stream_map[i];
|
|
}
|
|
self.count -= 1;
|
|
}
|
|
},
|
|
} else void,
|
|
|
|
const Self = @This();
|
|
|
|
pub fn deinit(self: *Self) void {
|
|
const gpa = self.gpa;
|
|
if (is_windows) {
|
|
// cancel any pending IO to prevent clobbering OVERLAPPED value
|
|
for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
|
|
_ = windows.kernel32.CancelIo(h);
|
|
}
|
|
}
|
|
inline for (&self.readers) |*r| gpa.free(r.buffer);
|
|
self.* = undefined;
|
|
}
|
|
|
|
pub fn poll(self: *Self) !bool {
|
|
if (is_windows) {
|
|
return pollWindows(self, null);
|
|
} else {
|
|
return pollPosix(self, null);
|
|
}
|
|
}
|
|
|
|
pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
|
|
if (is_windows) {
|
|
return pollWindows(self, nanoseconds);
|
|
} else {
|
|
return pollPosix(self, nanoseconds);
|
|
}
|
|
}
|
|
|
|
pub fn reader(self: *Self, which: StreamEnum) *Reader {
|
|
return &self.readers[@intFromEnum(which)];
|
|
}
|
|
|
|
pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 {
|
|
const gpa = self.gpa;
|
|
const r = reader(self, which);
|
|
if (r.seek == 0) {
|
|
const new = try gpa.realloc(r.buffer, r.end);
|
|
r.buffer = &.{};
|
|
r.end = 0;
|
|
return new;
|
|
}
|
|
const new = try gpa.dupe(u8, r.buffered());
|
|
gpa.free(r.buffer);
|
|
r.buffer = &.{};
|
|
r.seek = 0;
|
|
r.end = 0;
|
|
return new;
|
|
}
|
|
|
|
fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
|
|
const bump_amt = 512;
|
|
const gpa = self.gpa;
|
|
|
|
if (!self.windows.first_read_done) {
|
|
var already_read_data = false;
|
|
for (0..enum_fields.len) |i| {
|
|
const handle = self.windows.active.handles_buf[i];
|
|
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
|
|
gpa,
|
|
handle,
|
|
&self.windows.overlapped[i],
|
|
&self.readers[i],
|
|
&self.windows.small_bufs[i],
|
|
bump_amt,
|
|
)) {
|
|
.populated, .empty => |state| {
|
|
if (state == .populated) already_read_data = true;
|
|
self.windows.active.handles_buf[self.windows.active.count] = handle;
|
|
self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
|
|
self.windows.active.count += 1;
|
|
},
|
|
.closed => {}, // don't add to the wait_objects list
|
|
.closed_populated => {
|
|
// don't add to the wait_objects list, but we did already get data
|
|
already_read_data = true;
|
|
},
|
|
}
|
|
}
|
|
self.windows.first_read_done = true;
|
|
if (already_read_data) return true;
|
|
}
|
|
|
|
while (true) {
|
|
if (self.windows.active.count == 0) return false;
|
|
|
|
const status = windows.kernel32.WaitForMultipleObjects(
|
|
self.windows.active.count,
|
|
&self.windows.active.handles_buf,
|
|
0,
|
|
if (nanoseconds) |ns|
|
|
@min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
|
|
else
|
|
windows.INFINITE,
|
|
);
|
|
if (status == windows.WAIT_FAILED)
|
|
return windows.unexpectedError(windows.GetLastError());
|
|
if (status == windows.WAIT_TIMEOUT)
|
|
return true;
|
|
|
|
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
|
|
unreachable;
|
|
|
|
const active_idx = status - windows.WAIT_OBJECT_0;
|
|
|
|
const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
|
|
const handle = self.windows.active.handles_buf[active_idx];
|
|
|
|
const overlapped = &self.windows.overlapped[stream_idx];
|
|
const stream_reader = &self.readers[stream_idx];
|
|
const small_buf = &self.windows.small_bufs[stream_idx];
|
|
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => {
|
|
self.windows.active.removeAt(active_idx);
|
|
continue;
|
|
},
|
|
.aborted => unreachable,
|
|
};
|
|
const buf = small_buf[0..num_bytes_read];
|
|
const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len);
|
|
@memcpy(dest[0..buf.len], buf);
|
|
advanceBufferEnd(stream_reader, buf.len);
|
|
|
|
switch (try windowsAsyncReadToFifoAndQueueSmallRead(
|
|
gpa,
|
|
handle,
|
|
overlapped,
|
|
stream_reader,
|
|
small_buf,
|
|
bump_amt,
|
|
)) {
|
|
.empty => {}, // irrelevant, we already got data from the small buffer
|
|
.populated => {},
|
|
.closed,
|
|
.closed_populated, // identical, since we already got data from the small buffer
|
|
=> self.windows.active.removeAt(active_idx),
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
|
|
fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
|
|
const gpa = self.gpa;
|
|
// We ask for ensureUnusedCapacity with this much extra space. This
|
|
// has more of an effect on small reads because once the reads
|
|
// start to get larger the amount of space an ArrayList will
|
|
// allocate grows exponentially.
|
|
const bump_amt = 512;
|
|
|
|
const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
|
|
|
|
const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
|
|
std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
|
|
else
|
|
-1);
|
|
if (events_len == 0) {
|
|
for (self.poll_fds) |poll_fd| {
|
|
if (poll_fd.fd != -1) return true;
|
|
} else return false;
|
|
}
|
|
|
|
var keep_polling = false;
|
|
for (&self.poll_fds, &self.readers) |*poll_fd, *r| {
|
|
// Try reading whatever is available before checking the error
|
|
// conditions.
|
|
// It's still possible to read after a POLL.HUP is received,
|
|
// always check if there's some data waiting to be read first.
|
|
if (poll_fd.revents & posix.POLL.IN != 0) {
|
|
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
|
|
const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
|
|
error.BrokenPipe => 0, // Handle the same as EOF.
|
|
else => |e| return e,
|
|
};
|
|
advanceBufferEnd(r, amt);
|
|
if (amt == 0) {
|
|
// Remove the fd when the EOF condition is met.
|
|
poll_fd.fd = -1;
|
|
} else {
|
|
keep_polling = true;
|
|
}
|
|
} else if (poll_fd.revents & err_mask != 0) {
|
|
// Exclude the fds that signaled an error.
|
|
poll_fd.fd = -1;
|
|
} else if (poll_fd.fd != -1) {
|
|
keep_polling = true;
|
|
}
|
|
}
|
|
return keep_polling;
|
|
}
|
|
|
|
/// Returns a slice into the unused capacity of `buffer` with at least
|
|
/// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
|
|
///
|
|
/// After calling this function, typically the caller will follow up with a
|
|
/// call to `advanceBufferEnd` to report the actual number of bytes buffered.
|
|
fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 {
|
|
{
|
|
const unused = r.buffer[r.end..];
|
|
if (unused.len >= min_len) return unused;
|
|
}
|
|
if (r.seek > 0) {
|
|
const data = r.buffer[r.seek..r.end];
|
|
@memmove(r.buffer[0..data.len], data);
|
|
r.seek = 0;
|
|
r.end = data.len;
|
|
}
|
|
{
|
|
var list: std.ArrayList(u8) = .{
|
|
.items = r.buffer[0..r.end],
|
|
.capacity = r.buffer.len,
|
|
};
|
|
defer r.buffer = list.allocatedSlice();
|
|
try list.ensureUnusedCapacity(allocator, min_len);
|
|
}
|
|
const unused = r.buffer[r.end..];
|
|
assert(unused.len >= min_len);
|
|
return unused;
|
|
}
|
|
|
|
/// After writing directly into the unused capacity of `buffer`, this function
|
|
/// updates `end` so that users of `Reader` can receive the data.
|
|
fn advanceBufferEnd(r: *Reader, n: usize) void {
|
|
assert(n <= r.buffer.len - r.end);
|
|
r.end += n;
|
|
}
|
|
|
|
/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
|
|
/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
|
|
/// compatibility, we point it to this dummy variables, which we never otherwise access.
|
|
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
|
|
var win_dummy_bytes_read: u32 = undefined;
|
|
|
|
/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
|
|
/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
|
|
/// is available. `handle` must have no pending asynchronous operation.
|
|
fn windowsAsyncReadToFifoAndQueueSmallRead(
|
|
gpa: Allocator,
|
|
handle: windows.HANDLE,
|
|
overlapped: *windows.OVERLAPPED,
|
|
r: *Reader,
|
|
small_buf: *[128]u8,
|
|
bump_amt: usize,
|
|
) !enum { empty, populated, closed_populated, closed } {
|
|
var read_any_data = false;
|
|
while (true) {
|
|
const fifo_read_pending = while (true) {
|
|
const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
|
|
const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
|
|
|
|
if (0 == windows.kernel32.ReadFile(
|
|
handle,
|
|
buf.ptr,
|
|
buf_len,
|
|
&win_dummy_bytes_read,
|
|
overlapped,
|
|
)) switch (windows.GetLastError()) {
|
|
.IO_PENDING => break true,
|
|
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => unreachable,
|
|
};
|
|
|
|
read_any_data = true;
|
|
advanceBufferEnd(r, num_bytes_read);
|
|
|
|
if (num_bytes_read == buf_len) {
|
|
// We filled the buffer, so there's probably more data available.
|
|
continue;
|
|
} else {
|
|
// We didn't fill the buffer, so assume we're out of data.
|
|
// There is no pending read.
|
|
break false;
|
|
}
|
|
};
|
|
|
|
if (fifo_read_pending) cancel_read: {
|
|
// Cancel the pending read into the FIFO.
|
|
_ = windows.kernel32.CancelIo(handle);
|
|
|
|
// We have to wait for the handle to be signalled, i.e. for the cancelation to complete.
|
|
switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
|
|
windows.WAIT_OBJECT_0 => {},
|
|
windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
|
|
else => unreachable,
|
|
}
|
|
|
|
// If it completed before we canceled, make sure to tell the FIFO!
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => break :cancel_read,
|
|
};
|
|
read_any_data = true;
|
|
advanceBufferEnd(r, num_bytes_read);
|
|
}
|
|
|
|
// Try to queue the 1-byte read.
|
|
if (0 == windows.kernel32.ReadFile(
|
|
handle,
|
|
small_buf,
|
|
small_buf.len,
|
|
&win_dummy_bytes_read,
|
|
overlapped,
|
|
)) switch (windows.GetLastError()) {
|
|
.IO_PENDING => {
|
|
// 1-byte read pending as intended
|
|
return if (read_any_data) .populated else .empty;
|
|
},
|
|
.BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
|
|
// We got data back this time. Write it to the FIFO and run the main loop again.
|
|
const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
|
|
.success => |n| n,
|
|
.closed => return if (read_any_data) .closed_populated else .closed,
|
|
.aborted => unreachable,
|
|
};
|
|
const buf = small_buf[0..num_bytes_read];
|
|
const dest = try writableSliceGreedyAlloc(r, gpa, buf.len);
|
|
@memcpy(dest[0..buf.len], buf);
|
|
advanceBufferEnd(r, buf.len);
|
|
read_any_data = true;
|
|
}
|
|
}
|
|
|
|
/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
|
|
/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
|
|
///
|
|
/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
|
|
/// operation immediately returns data:
|
|
/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
|
|
/// erroneous results."
|
|
/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
|
|
/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
|
|
/// get the actual number of bytes read."
|
|
/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
|
|
fn windowsGetReadResult(
|
|
handle: windows.HANDLE,
|
|
overlapped: *windows.OVERLAPPED,
|
|
allow_aborted: bool,
|
|
) !union(enum) {
|
|
success: u32,
|
|
closed,
|
|
aborted,
|
|
} {
|
|
var num_bytes_read: u32 = undefined;
|
|
if (0 == windows.kernel32.GetOverlappedResult(
|
|
handle,
|
|
overlapped,
|
|
&num_bytes_read,
|
|
0,
|
|
)) switch (windows.GetLastError()) {
|
|
.BROKEN_PIPE => return .closed,
|
|
.OPERATION_ABORTED => |err| if (allow_aborted) {
|
|
return .aborted;
|
|
} else {
|
|
return windows.unexpectedError(err);
|
|
},
|
|
else => |err| return windows.unexpectedError(err),
|
|
};
|
|
return .{ .success = num_bytes_read };
|
|
}
|
|
};
|
|
}
|
|
|
|
/// Given an enum, returns a struct with fields of that enum, each field
|
|
/// representing an I/O stream for polling.
|
|
pub fn PollFiles(comptime StreamEnum: type) type {
|
|
return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(Io.File), &@splat(.{}));
|
|
}
|
|
|
|
test {
|
|
_ = net;
|
|
_ = Reader;
|
|
_ = Writer;
|
|
_ = Evented;
|
|
_ = Threaded;
|
|
_ = @import("Io/test.zig");
|
|
}
|
|
|
|
const Io = @This();
|
|
|
|
pub const Evented = switch (builtin.os.tag) {
|
|
.linux => switch (builtin.cpu.arch) {
|
|
.x86_64, .aarch64 => @import("Io/IoUring.zig"),
|
|
else => void, // context-switching code not implemented yet
|
|
},
|
|
.dragonfly, .freebsd, .netbsd, .openbsd, .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => switch (builtin.cpu.arch) {
|
|
.x86_64, .aarch64 => @import("Io/Kqueue.zig"),
|
|
else => void, // context-switching code not implemented yet
|
|
},
|
|
else => void,
|
|
};
|
|
pub const Threaded = @import("Io/Threaded.zig");
|
|
pub const net = @import("Io/net.zig");
|
|
|
|
userdata: ?*anyopaque,
|
|
vtable: *const VTable,
|
|
|
|
pub const VTable = struct {
|
|
/// If it returns `null` it means `result` has been already populated and
|
|
/// `await` will be a no-op.
|
|
///
|
|
/// When this function returns non-null, the implementation guarantees that
|
|
/// a unit of concurrency has been assigned to the returned task.
|
|
///
|
|
/// Thread-safe.
|
|
async: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
/// The pointer of this slice is an "eager" result value.
|
|
/// The length is the size in bytes of the result type.
|
|
/// This pointer's lifetime expires directly after the call to this function.
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
/// Copied and then passed to `start`.
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) ?*AnyFuture,
|
|
/// Thread-safe.
|
|
concurrent: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
result_len: usize,
|
|
result_alignment: std.mem.Alignment,
|
|
/// Copied and then passed to `start`.
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) ConcurrentError!*AnyFuture,
|
|
/// This function is only called when `async` returns a non-null value.
|
|
///
|
|
/// Thread-safe.
|
|
await: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
/// The same value that was returned from `async`.
|
|
any_future: *AnyFuture,
|
|
/// Points to a buffer where the result is written.
|
|
/// The length is equal to size in bytes of result type.
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void,
|
|
/// Equivalent to `await` but initiates cancel request.
|
|
///
|
|
/// This function is only called when `async` returns a non-null value.
|
|
///
|
|
/// Thread-safe.
|
|
cancel: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
/// The same value that was returned from `async`.
|
|
any_future: *AnyFuture,
|
|
/// Points to a buffer where the result is written.
|
|
/// The length is equal to size in bytes of result type.
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void,
|
|
|
|
/// When this function returns, implementation guarantees that `start` has
|
|
/// either already been called, or a unit of concurrency has been assigned
|
|
/// to the task of calling the function.
|
|
///
|
|
/// Thread-safe.
|
|
groupAsync: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
/// Owner of the spawned async task.
|
|
group: *Group,
|
|
/// Copied and then passed to `start`.
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque) Cancelable!void,
|
|
) void,
|
|
/// Thread-safe.
|
|
groupConcurrent: *const fn (
|
|
/// Corresponds to `Io.userdata`.
|
|
userdata: ?*anyopaque,
|
|
/// Owner of the spawned async task.
|
|
group: *Group,
|
|
/// Copied and then passed to `start`.
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque) Cancelable!void,
|
|
) ConcurrentError!void,
|
|
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
|
|
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
|
|
|
|
recancel: *const fn (?*anyopaque) void,
|
|
swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection,
|
|
checkCancel: *const fn (?*anyopaque) Cancelable!void,
|
|
|
|
/// Blocks until one of the futures from the list has a result ready, such
|
|
/// that awaiting it will not block. Returns that index.
|
|
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
|
|
|
|
futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void,
|
|
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
|
|
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
|
|
|
|
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
|
|
dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus,
|
|
dirCreateDirPathOpen: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions, Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir,
|
|
dirOpenDir: *const fn (?*anyopaque, Dir, []const u8, Dir.OpenOptions) Dir.OpenError!Dir,
|
|
dirStat: *const fn (?*anyopaque, Dir) Dir.StatError!Dir.Stat,
|
|
dirStatFile: *const fn (?*anyopaque, Dir, []const u8, Dir.StatFileOptions) Dir.StatFileError!File.Stat,
|
|
dirAccess: *const fn (?*anyopaque, Dir, []const u8, Dir.AccessOptions) Dir.AccessError!void,
|
|
dirCreateFile: *const fn (?*anyopaque, Dir, []const u8, File.CreateFlags) File.OpenError!File,
|
|
dirCreateFileAtomic: *const fn (?*anyopaque, Dir, []const u8, Dir.CreateFileAtomicOptions) Dir.CreateFileAtomicError!File.Atomic,
|
|
dirOpenFile: *const fn (?*anyopaque, Dir, []const u8, File.OpenFlags) File.OpenError!File,
|
|
dirClose: *const fn (?*anyopaque, []const Dir) void,
|
|
dirRead: *const fn (?*anyopaque, *Dir.Reader, []Dir.Entry) Dir.Reader.Error!usize,
|
|
dirRealPath: *const fn (?*anyopaque, Dir, out_buffer: []u8) Dir.RealPathError!usize,
|
|
dirRealPathFile: *const fn (?*anyopaque, Dir, path_name: []const u8, out_buffer: []u8) Dir.RealPathFileError!usize,
|
|
dirDeleteFile: *const fn (?*anyopaque, Dir, []const u8) Dir.DeleteFileError!void,
|
|
dirDeleteDir: *const fn (?*anyopaque, Dir, []const u8) Dir.DeleteDirError!void,
|
|
dirRename: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8) Dir.RenameError!void,
|
|
dirRenamePreserve: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8) Dir.RenamePreserveError!void,
|
|
dirSymLink: *const fn (?*anyopaque, Dir, target_path: []const u8, sym_link_path: []const u8, Dir.SymLinkFlags) Dir.SymLinkError!void,
|
|
dirReadLink: *const fn (?*anyopaque, Dir, sub_path: []const u8, buffer: []u8) Dir.ReadLinkError!usize,
|
|
dirSetOwner: *const fn (?*anyopaque, Dir, ?File.Uid, ?File.Gid) Dir.SetOwnerError!void,
|
|
dirSetFileOwner: *const fn (?*anyopaque, Dir, []const u8, ?File.Uid, ?File.Gid, Dir.SetFileOwnerOptions) Dir.SetFileOwnerError!void,
|
|
dirSetPermissions: *const fn (?*anyopaque, Dir, Dir.Permissions) Dir.SetPermissionsError!void,
|
|
dirSetFilePermissions: *const fn (?*anyopaque, Dir, []const u8, File.Permissions, Dir.SetFilePermissionsOptions) Dir.SetFilePermissionsError!void,
|
|
dirSetTimestamps: *const fn (?*anyopaque, Dir, []const u8, Dir.SetTimestampsOptions) Dir.SetTimestampsError!void,
|
|
dirHardLink: *const fn (?*anyopaque, old_dir: Dir, old_sub_path: []const u8, new_dir: Dir, new_sub_path: []const u8, Dir.HardLinkOptions) Dir.HardLinkError!void,
|
|
|
|
fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
|
|
fileLength: *const fn (?*anyopaque, File) File.LengthError!u64,
|
|
fileClose: *const fn (?*anyopaque, []const File) void,
|
|
fileWriteStreaming: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize) File.Writer.Error!usize,
|
|
fileWritePositional: *const fn (?*anyopaque, File, header: []const u8, data: []const []const u8, splat: usize, offset: u64) File.WritePositionalError!usize,
|
|
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
|
|
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
|
|
/// Returns 0 on end of stream.
|
|
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.Reader.Error!usize,
|
|
/// Returns 0 on end of stream.
|
|
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
|
|
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
|
|
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
|
|
fileSync: *const fn (?*anyopaque, File) File.SyncError!void,
|
|
fileIsTty: *const fn (?*anyopaque, File) Cancelable!bool,
|
|
fileEnableAnsiEscapeCodes: *const fn (?*anyopaque, File) File.EnableAnsiEscapeCodesError!void,
|
|
fileSupportsAnsiEscapeCodes: *const fn (?*anyopaque, File) Cancelable!bool,
|
|
fileSetLength: *const fn (?*anyopaque, File, u64) File.SetLengthError!void,
|
|
fileSetOwner: *const fn (?*anyopaque, File, ?File.Uid, ?File.Gid) File.SetOwnerError!void,
|
|
fileSetPermissions: *const fn (?*anyopaque, File, File.Permissions) File.SetPermissionsError!void,
|
|
fileSetTimestamps: *const fn (?*anyopaque, File, File.SetTimestampsOptions) File.SetTimestampsError!void,
|
|
fileLock: *const fn (?*anyopaque, File, File.Lock) File.LockError!void,
|
|
fileTryLock: *const fn (?*anyopaque, File, File.Lock) File.LockError!bool,
|
|
fileUnlock: *const fn (?*anyopaque, File) void,
|
|
fileDowngradeLock: *const fn (?*anyopaque, File) File.DowngradeLockError!void,
|
|
fileRealPath: *const fn (?*anyopaque, File, out_buffer: []u8) File.RealPathError!usize,
|
|
fileHardLink: *const fn (?*anyopaque, File, Dir, []const u8, File.HardLinkOptions) File.HardLinkError!void,
|
|
|
|
processExecutableOpen: *const fn (?*anyopaque, File.OpenFlags) std.process.OpenExecutableError!File,
|
|
processExecutablePath: *const fn (?*anyopaque, buffer: []u8) std.process.ExecutablePathError!usize,
|
|
lockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!LockedStderr,
|
|
tryLockStderr: *const fn (?*anyopaque, ?Terminal.Mode) Cancelable!?LockedStderr,
|
|
unlockStderr: *const fn (?*anyopaque) void,
|
|
processSetCurrentDir: *const fn (?*anyopaque, Dir) std.process.SetCurrentDirError!void,
|
|
processReplace: *const fn (?*anyopaque, std.process.ReplaceOptions) std.process.ReplaceError,
|
|
processReplacePath: *const fn (?*anyopaque, Dir, std.process.ReplaceOptions) std.process.ReplaceError,
|
|
processSpawn: *const fn (?*anyopaque, std.process.SpawnOptions) std.process.SpawnError!std.process.Child,
|
|
processSpawnPath: *const fn (?*anyopaque, Dir, std.process.SpawnOptions) std.process.SpawnError!std.process.Child,
|
|
childWait: *const fn (?*anyopaque, *std.process.Child) std.process.Child.WaitError!std.process.Child.Term,
|
|
childKill: *const fn (?*anyopaque, *std.process.Child) void,
|
|
|
|
progressParentFile: *const fn (?*anyopaque) std.Progress.ParentFileError!File,
|
|
|
|
now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp,
|
|
sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
|
|
|
|
random: *const fn (?*anyopaque, buffer: []u8) void,
|
|
randomSecure: *const fn (?*anyopaque, buffer: []u8) RandomSecureError!void,
|
|
|
|
netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
|
|
netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream,
|
|
netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
|
|
netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
|
|
netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
|
|
netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
|
|
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
|
|
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
|
|
/// Returns 0 on end of stream.
|
|
netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
|
|
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
|
|
netWriteFile: *const fn (?*anyopaque, net.Socket.Handle, header: []const u8, *Io.File.Reader, Io.Limit) net.Stream.Writer.WriteFileError!usize,
|
|
netClose: *const fn (?*anyopaque, handle: []const net.Socket.Handle) void,
|
|
netShutdown: *const fn (?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void,
|
|
netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
|
|
netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
|
|
netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
|
|
};
|
|
|
|
pub const Cancelable = error{
|
|
/// Caller has requested the async operation to stop.
|
|
Canceled,
|
|
};
|
|
|
|
pub const UnexpectedError = error{
|
|
/// The Operating System returned an undocumented error code.
|
|
///
|
|
/// This error is in theory not possible, but it would be better
|
|
/// to handle this error than to invoke undefined behavior.
|
|
///
|
|
/// When this error code is observed, it usually means the Zig Standard
|
|
/// Library needs a small patch to add the error code to the error set for
|
|
/// the respective function.
|
|
Unexpected,
|
|
};
|
|
|
|
pub const Dir = @import("Io/Dir.zig");
|
|
pub const File = @import("Io/File.zig");
|
|
pub const Terminal = @import("Io/Terminal.zig");
|
|
|
|
pub const Clock = enum {
|
|
/// A settable system-wide clock that measures real (i.e. wall-clock)
|
|
/// time. This clock is affected by discontinuous jumps in the system
|
|
/// time (e.g., if the system administrator manually changes the
|
|
/// clock), and by frequency adjustments performed by NTP and similar
|
|
/// applications.
|
|
///
|
|
/// This clock normally counts the number of seconds since 1970-01-01
|
|
/// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
|
|
/// leap seconds; near a leap second it is typically adjusted by NTP to
|
|
/// stay roughly in sync with UTC.
|
|
///
|
|
/// Timestamps returned by implementations of this clock represent time
|
|
/// elapsed since 1970-01-01T00:00:00Z, the POSIX/Unix epoch, ignoring
|
|
/// leap seconds. This is colloquially known as "Unix time". If the
|
|
/// underlying OS uses a different epoch for native timestamps (e.g.,
|
|
/// Windows, which uses 1601-01-01) they are translated accordingly.
|
|
real,
|
|
/// A nonsettable system-wide clock that represents time since some
|
|
/// unspecified point in the past.
|
|
///
|
|
/// Monotonic: Guarantees that the time returned by consecutive calls
|
|
/// will not go backwards, but successive calls may return identical
|
|
/// (not-increased) time values.
|
|
///
|
|
/// Not affected by discontinuous jumps in the system time (e.g., if
|
|
/// the system administrator manually changes the clock), but may be
|
|
/// affected by frequency adjustments.
|
|
///
|
|
/// This clock expresses intent to **exclude time that the system is
|
|
/// suspended**. However, implementations may be unable to satisify
|
|
/// this, and may include that time.
|
|
///
|
|
/// * On Linux, corresponds `CLOCK_MONOTONIC`.
|
|
/// * On macOS, corresponds to `CLOCK_UPTIME_RAW`.
|
|
awake,
|
|
/// Identical to `awake` except it expresses intent to **include time
|
|
/// that the system is suspended**, however, due to limitations it may
|
|
/// behave identically to `awake`.
|
|
///
|
|
/// * On Linux, corresponds `CLOCK_BOOTTIME`.
|
|
/// * On macOS, corresponds to `CLOCK_MONOTONIC_RAW`.
|
|
boot,
|
|
/// Tracks the amount of CPU in user or kernel mode used by the calling
|
|
/// process.
|
|
cpu_process,
|
|
/// Tracks the amount of CPU in user or kernel mode used by the calling
|
|
/// thread.
|
|
cpu_thread,
|
|
|
|
pub const Error = error{UnsupportedClock} || UnexpectedError;
|
|
|
|
/// This function is not cancelable because first of all it does not block,
|
|
/// but more importantly, the cancelation logic itself may want to check
|
|
/// the time.
|
|
pub fn now(clock: Clock, io: Io) Error!Io.Timestamp {
|
|
return io.vtable.now(io.userdata, clock);
|
|
}
|
|
|
|
pub const Timestamp = struct {
|
|
raw: Io.Timestamp,
|
|
clock: Clock,
|
|
|
|
/// This function is not cancelable because first of all it does not block,
|
|
/// but more importantly, the cancelation logic itself may want to check
|
|
/// the time.
|
|
pub fn now(io: Io, clock: Clock) Error!Clock.Timestamp {
|
|
return .{
|
|
.raw = try io.vtable.now(io.userdata, clock),
|
|
.clock = clock,
|
|
};
|
|
}
|
|
|
|
pub fn wait(t: Clock.Timestamp, io: Io) SleepError!void {
|
|
return io.vtable.sleep(io.userdata, .{ .deadline = t });
|
|
}
|
|
|
|
pub fn durationTo(from: Clock.Timestamp, to: Clock.Timestamp) Clock.Duration {
|
|
assert(from.clock == to.clock);
|
|
return .{
|
|
.raw = from.raw.durationTo(to.raw),
|
|
.clock = from.clock,
|
|
};
|
|
}
|
|
|
|
pub fn addDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
|
|
assert(from.clock == duration.clock);
|
|
return .{
|
|
.raw = from.raw.addDuration(duration.raw),
|
|
.clock = from.clock,
|
|
};
|
|
}
|
|
|
|
pub fn subDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
|
|
assert(from.clock == duration.clock);
|
|
return .{
|
|
.raw = from.raw.subDuration(duration.raw),
|
|
.clock = from.clock,
|
|
};
|
|
}
|
|
|
|
pub fn fromNow(io: Io, duration: Clock.Duration) Error!Clock.Timestamp {
|
|
return .{
|
|
.clock = duration.clock,
|
|
.raw = (try duration.clock.now(io)).addDuration(duration.raw),
|
|
};
|
|
}
|
|
|
|
pub fn untilNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
|
|
const now_ts = try Clock.Timestamp.now(io, timestamp.clock);
|
|
return timestamp.durationTo(now_ts);
|
|
}
|
|
|
|
pub fn durationFromNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
|
|
const now_ts = try timestamp.clock.now(io);
|
|
return .{
|
|
.clock = timestamp.clock,
|
|
.raw = now_ts.durationTo(timestamp.raw),
|
|
};
|
|
}
|
|
|
|
pub fn toClock(t: Clock.Timestamp, io: Io, clock: Clock) Error!Clock.Timestamp {
|
|
if (t.clock == clock) return t;
|
|
const now_old = try t.clock.now(io);
|
|
const now_new = try clock.now(io);
|
|
const duration = now_old.durationTo(t);
|
|
return .{
|
|
.clock = clock,
|
|
.raw = now_new.addDuration(duration),
|
|
};
|
|
}
|
|
|
|
pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool {
|
|
assert(lhs.clock == rhs.clock);
|
|
return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
|
|
}
|
|
};
|
|
|
|
pub const Duration = struct {
|
|
raw: Io.Duration,
|
|
clock: Clock,
|
|
|
|
pub fn sleep(duration: Clock.Duration, io: Io) SleepError!void {
|
|
return io.vtable.sleep(io.userdata, .{ .duration = duration });
|
|
}
|
|
};
|
|
};
|
|
|
|
pub const Timestamp = struct {
|
|
nanoseconds: i96,
|
|
|
|
pub const zero: Timestamp = .{ .nanoseconds = 0 };
|
|
|
|
pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
|
|
return .{ .nanoseconds = to.nanoseconds - from.nanoseconds };
|
|
}
|
|
|
|
pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
|
|
return .{ .nanoseconds = from.nanoseconds + duration.nanoseconds };
|
|
}
|
|
|
|
pub fn subDuration(from: Timestamp, duration: Duration) Timestamp {
|
|
return .{ .nanoseconds = from.nanoseconds - duration.nanoseconds };
|
|
}
|
|
|
|
pub fn withClock(t: Timestamp, clock: Clock) Clock.Timestamp {
|
|
return .{ .raw = t, .clock = clock };
|
|
}
|
|
|
|
pub fn fromNanoseconds(x: i96) Timestamp {
|
|
return .{ .nanoseconds = x };
|
|
}
|
|
|
|
pub fn toMilliseconds(t: Timestamp) i64 {
|
|
return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_ms));
|
|
}
|
|
|
|
pub fn toSeconds(t: Timestamp) i64 {
|
|
return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_s));
|
|
}
|
|
|
|
pub fn toNanoseconds(t: Timestamp) i96 {
|
|
return t.nanoseconds;
|
|
}
|
|
|
|
pub fn formatNumber(t: Timestamp, w: *std.Io.Writer, n: std.fmt.Number) std.Io.Writer.Error!void {
|
|
return w.printInt(t.nanoseconds, n.mode.base() orelse 10, n.case, .{
|
|
.precision = n.precision,
|
|
.width = n.width,
|
|
.alignment = n.alignment,
|
|
.fill = n.fill,
|
|
});
|
|
}
|
|
};
|
|
|
|
pub const Duration = struct {
|
|
nanoseconds: i96,
|
|
|
|
pub const zero: Duration = .{ .nanoseconds = 0 };
|
|
pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
|
|
|
|
pub fn fromNanoseconds(x: i96) Duration {
|
|
return .{ .nanoseconds = x };
|
|
}
|
|
|
|
pub fn fromMilliseconds(x: i64) Duration {
|
|
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
|
|
}
|
|
|
|
pub fn fromSeconds(x: i64) Duration {
|
|
return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
|
|
}
|
|
|
|
pub fn toMilliseconds(d: Duration) i64 {
|
|
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms));
|
|
}
|
|
|
|
pub fn toSeconds(d: Duration) i64 {
|
|
return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
|
|
}
|
|
|
|
pub fn toNanoseconds(d: Duration) i96 {
|
|
return d.nanoseconds;
|
|
}
|
|
};
|
|
|
|
/// Declares under what conditions an operation should return `error.Timeout`.
|
|
pub const Timeout = union(enum) {
|
|
none,
|
|
duration: Clock.Duration,
|
|
deadline: Clock.Timestamp,
|
|
|
|
pub const Error = error{ Timeout, UnsupportedClock };
|
|
|
|
pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
|
|
return switch (t) {
|
|
.none => null,
|
|
.duration => |d| try .fromNow(io, d),
|
|
.deadline => |d| d,
|
|
};
|
|
}
|
|
|
|
pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
|
|
return switch (t) {
|
|
.none => null,
|
|
.duration => |d| d,
|
|
.deadline => |d| try d.durationFromNow(io),
|
|
};
|
|
}
|
|
|
|
pub fn sleep(timeout: Timeout, io: Io) SleepError!void {
|
|
return io.vtable.sleep(io.userdata, timeout);
|
|
}
|
|
};
|
|
|
|
pub const AnyFuture = opaque {};
|
|
|
|
pub fn Future(Result: type) type {
|
|
return struct {
|
|
any_future: ?*AnyFuture,
|
|
result: Result,
|
|
|
|
/// Equivalent to `await` but places a cancelation request. This causes the task to receive
|
|
/// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
|
|
/// call to a function in `Io` which can return `error.Canceled`.
|
|
///
|
|
/// After cancelation of a task is requested, only the next cancelation point in that task
|
|
/// will return `error.Canceled`: future points will not re-signal the cancelation. As such,
|
|
/// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation
|
|
/// requests, see also `recancel` and `CancelProtection`.
|
|
///
|
|
/// Idempotent. Not threadsafe.
|
|
pub fn cancel(f: *@This(), io: Io) Result {
|
|
const any_future = f.any_future orelse return f.result;
|
|
io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
|
|
f.any_future = null;
|
|
return f.result;
|
|
}
|
|
|
|
/// Idempotent. Not threadsafe.
|
|
pub fn await(f: *@This(), io: Io) Result {
|
|
const any_future = f.any_future orelse return f.result;
|
|
io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
|
|
f.any_future = null;
|
|
return f.result;
|
|
}
|
|
};
|
|
}
|
|
|
|
/// An unordered set of tasks which can only be awaited or canceled as a whole.
|
|
/// Tasks are spawned in the group with `Group.async` and `Group.concurrent`.
|
|
///
|
|
/// The resources associated with each task are *guaranteed* to be released when
|
|
/// the individual task returns, as opposed to when the whole group completes or
|
|
/// is awaited. For this reason, it is not a resource leak to have a long-lived
|
|
/// group which concurrent tasks are repeatedly added to. However, asynchronous
|
|
/// tasks are not guaranteed to run until `Group.await` or `Group.cancel` is
|
|
/// called, so adding async tasks to a group without ever awaiting it may leak
|
|
/// resources.
|
|
pub const Group = struct {
|
|
/// This value indicates whether or not a group has pending tasks. `null`
|
|
/// means there are no pending tasks, and no resources associated with the
|
|
/// group, so `await` and `cancel` return immediately without calling the
|
|
/// implementation. This means that `token` must be accessed atomically to
|
|
/// avoid racing with the check in `await` and `cancel`.
|
|
token: std.atomic.Value(?*anyopaque),
|
|
/// This value is available for the implementation to use as it wishes.
|
|
state: usize,
|
|
|
|
pub const init: Group = .{ .token = .init(null), .state = 0 };
|
|
|
|
/// Equivalent to `Io.async`, except the task is spawned in this `Group`
|
|
/// instead of becoming associated with a `Future`.
|
|
///
|
|
/// The return type of `function` must be coercible to `Cancelable!void`.
|
|
///
|
|
/// Once this function is called, there are resources associated with the
|
|
/// group. To release those resources, `Group.await` or `Group.cancel` must
|
|
/// eventually be called.
|
|
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
|
|
const Args = @TypeOf(args);
|
|
const TypeErased = struct {
|
|
fn start(context: *const anyopaque) Cancelable!void {
|
|
const args_casted: *const Args = @ptrCast(@alignCast(context));
|
|
return @call(.auto, function, args_casted.*);
|
|
}
|
|
};
|
|
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
|
|
}
|
|
|
|
/// Equivalent to `Io.concurrent`, except the task is spawned in this
|
|
/// `Group` instead of becoming associated with a `Future`.
|
|
///
|
|
/// The return type of `function` must be coercible to `Cancelable!void`.
|
|
///
|
|
/// Once this function is called, there are resources associated with the
|
|
/// group. To release those resources, `Group.await` or `Group.cancel` must
|
|
/// eventually be called.
|
|
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
|
|
const Args = @TypeOf(args);
|
|
const TypeErased = struct {
|
|
fn start(context: *const anyopaque) Cancelable!void {
|
|
const args_casted: *const Args = @ptrCast(@alignCast(context));
|
|
return @call(.auto, function, args_casted.*);
|
|
}
|
|
};
|
|
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
|
|
}
|
|
|
|
/// Blocks until all tasks of the group finish. During this time,
|
|
/// cancelation requests propagate to all members of the group, and
|
|
/// will also cause `error.Canceled` to be returned when the group
|
|
/// does ultimately finish.
|
|
///
|
|
/// Idempotent. Not threadsafe.
|
|
///
|
|
/// It is safe to call this function concurrently with `Group.async` or
|
|
/// `Group.concurrent`, provided that the group does not complete until
|
|
/// the call to `Group.async` or `Group.concurrent` returns.
|
|
pub fn await(g: *Group, io: Io) Cancelable!void {
|
|
const token = g.token.load(.acquire) orelse return;
|
|
try io.vtable.groupAwait(io.userdata, g, token);
|
|
assert(g.token.raw == null);
|
|
}
|
|
|
|
/// Equivalent to `await` but immediately requests cancelation on all
|
|
/// members of the group.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
///
|
|
/// Idempotent. Not threadsafe.
|
|
///
|
|
/// It is safe to call this function concurrently with `Group.async` or
|
|
/// `Group.concurrent`, provided that the group does not complete until
|
|
/// the call to `Group.async` or `Group.concurrent` returns.
|
|
pub fn cancel(g: *Group, io: Io) void {
|
|
const token = g.token.load(.acquire) orelse return;
|
|
io.vtable.groupCancel(io.userdata, g, token);
|
|
assert(g.token.raw == null);
|
|
}
|
|
};
|
|
|
|
/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the
|
|
/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation
|
|
/// point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn recancel(io: Io) void {
|
|
io.vtable.recancel(io.userdata);
|
|
}
|
|
|
|
/// In rare cases, it is desirable to completely block cancelation notification, so that a region
|
|
/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every
|
|
/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce
|
|
/// cancelation points.
|
|
///
|
|
/// To modify a task's cancel protection state, see `swapCancelProtection`.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub const CancelProtection = enum {
|
|
/// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point.
|
|
///
|
|
/// This is the default state, which all tasks are created in.
|
|
unblocked,
|
|
/// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned).
|
|
blocked,
|
|
};
|
|
/// Updates the current task's cancel protection state (see `CancelProtection`).
|
|
///
|
|
/// The typical usage for this function is to protect a block of code from cancelation:
|
|
/// ```
|
|
/// const old_cancel_protect = io.swapCancelProtection(.blocked);
|
|
/// defer _ = io.swapCancelProtection(old_cancel_protect);
|
|
/// doSomeWork() catch |err| switch (err) {
|
|
/// error.Canceled => unreachable,
|
|
/// };
|
|
/// ```
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection {
|
|
return io.vtable.swapCancelProtection(io.userdata, new);
|
|
}
|
|
|
|
/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`)
|
|
/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding
|
|
/// non-blocked cancelation request, but otherwise is a no-op.
|
|
///
|
|
/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound
|
|
/// tasks which may need to respond to cancelation before completing. Short tasks, or those which
|
|
/// perform other `Io` operations (and hence have other cancelation points), will typically already
|
|
/// respond quickly to cancelation requests.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn checkCancel(io: Io) Cancelable!void {
|
|
return io.vtable.checkCancel(io.userdata);
|
|
}
|
|
|
|
pub fn Select(comptime U: type) type {
|
|
return struct {
|
|
io: Io,
|
|
group: Group,
|
|
queue: Queue(U),
|
|
outstanding: usize,
|
|
|
|
const S = @This();
|
|
|
|
pub const Union = U;
|
|
|
|
pub const Field = std.meta.FieldEnum(U);
|
|
|
|
pub fn init(io: Io, buffer: []U) S {
|
|
return .{
|
|
.io = io,
|
|
.queue = .init(buffer),
|
|
.group = .init,
|
|
.outstanding = 0,
|
|
};
|
|
}
|
|
|
|
/// Calls `function` with `args` asynchronously. The resource spawned is
|
|
/// owned by the select.
|
|
///
|
|
/// `function` must have return type matching the `field` field of `Union`.
|
|
///
|
|
/// `function` *may* be called immediately, before `async` returns.
|
|
///
|
|
/// When this function returns, it is guaranteed that `function` has
|
|
/// already been called and completed, or it has successfully been
|
|
/// assigned a unit of concurrency.
|
|
///
|
|
/// After this is called, `wait` or `cancel` must be called before the
|
|
/// select is deinitialized.
|
|
///
|
|
/// Threadsafe.
|
|
///
|
|
/// Related:
|
|
/// * `Io.async`
|
|
/// * `Group.async`
|
|
pub fn async(
|
|
s: *S,
|
|
comptime field: Field,
|
|
function: anytype,
|
|
args: std.meta.ArgsTuple(@TypeOf(function)),
|
|
) void {
|
|
const Context = struct {
|
|
select: *S,
|
|
args: @TypeOf(args),
|
|
fn start(type_erased_context: *const anyopaque) Cancelable!void {
|
|
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
|
|
const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args));
|
|
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
|
|
error.Closed => unreachable,
|
|
};
|
|
}
|
|
};
|
|
const context: Context = .{ .select = s, .args = args };
|
|
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
|
|
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
|
|
}
|
|
|
|
/// Blocks until another task of the select finishes.
|
|
///
|
|
/// Asserts there is at least one more `outstanding` task.
|
|
///
|
|
/// Not threadsafe.
|
|
pub fn await(s: *S) Cancelable!U {
|
|
s.outstanding -= 1;
|
|
return s.queue.getOne(s.io) catch |err| switch (err) {
|
|
error.Canceled => |e| return e,
|
|
error.Closed => unreachable,
|
|
};
|
|
}
|
|
|
|
/// Equivalent to `wait` but requests cancelation on all remaining
|
|
/// tasks owned by the select.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
///
|
|
/// It is illegal to call `wait` after this.
|
|
///
|
|
/// Idempotent. Not threadsafe.
|
|
pub fn cancel(s: *S) void {
|
|
s.outstanding = 0;
|
|
s.group.cancel(s.io);
|
|
}
|
|
};
|
|
}
|
|
|
|
/// Atomically checks if the value at `ptr` equals `expected`, and if so, blocks until either:
|
|
///
|
|
/// * a matching (same `ptr` argument) `futexWake` call occurs, or
|
|
/// * a spurious ("random") wakeup occurs.
|
|
///
|
|
/// Typically, `futexWake` should be called immediately after updating the value at `ptr.*`, to
|
|
/// unblock tasks using `futexWait` to wait for the value to change from what it previously was.
|
|
///
|
|
/// The caller is responsible for identifying spurious wakeups if necessary, typically by checking
|
|
/// the value at `ptr.*`.
|
|
///
|
|
/// Asserts that `T` is 4 bytes in length and has a well-defined layout with no padding bits.
|
|
pub fn futexWait(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) Cancelable!void {
|
|
return futexWaitTimeout(io, T, ptr, expected, .none);
|
|
}
|
|
/// Same as `futexWait`, except also unblocks if `timeout` expires. As with `futexWait`, spurious
|
|
/// wakeups are possible. It remains the caller's responsibility to differentiate between these
|
|
/// three possible wake-up reasons if necessary.
|
|
pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T, timeout: Timeout) Cancelable!void {
|
|
const expected_int: u32 = switch (@typeInfo(T)) {
|
|
.@"enum" => @bitCast(@intFromEnum(expected)),
|
|
else => @bitCast(expected),
|
|
};
|
|
return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_int, timeout);
|
|
}
|
|
/// Same as `futexWait`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void {
|
|
const expected_int: u32 = switch (@typeInfo(T)) {
|
|
.@"enum" => @bitCast(@intFromEnum(expected)),
|
|
else => @bitCast(expected),
|
|
};
|
|
io.vtable.futexWaitUncancelable(io.userdata, @ptrCast(ptr), expected_int);
|
|
}
|
|
/// Unblocks pending futex waits on `ptr`, up to a limit of `max_waiters` calls.
|
|
pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, max_waiters: u32) void {
|
|
comptime assert(@sizeOf(T) == @sizeOf(u32));
|
|
if (max_waiters == 0) return;
|
|
return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters);
|
|
}
|
|
|
|
/// Mutex is a synchronization primitive which enforces atomic access to a
|
|
/// shared region of code known as the "critical section".
|
|
///
|
|
/// Mutex is an extern struct so that it may be used as a field inside another
|
|
/// extern struct. Having a guaranteed memory layout including mutexes is
|
|
/// important for IPC over shared memory (mmap).
|
|
pub const Mutex = extern struct {
|
|
state: std.atomic.Value(State),
|
|
|
|
pub const init: Mutex = .{ .state = .init(.unlocked) };
|
|
|
|
const State = enum(u32) {
|
|
unlocked,
|
|
locked_once,
|
|
contended,
|
|
};
|
|
|
|
pub fn tryLock(m: *Mutex) bool {
|
|
switch (m.state.cmpxchgWeak(
|
|
.unlocked,
|
|
.locked_once,
|
|
.acquire,
|
|
.monotonic,
|
|
) orelse return true) {
|
|
.unlocked => unreachable,
|
|
.locked_once, .contended => return false,
|
|
}
|
|
}
|
|
|
|
pub fn lock(m: *Mutex, io: Io) Cancelable!void {
|
|
const initial_state = m.state.cmpxchgWeak(
|
|
.unlocked,
|
|
.locked_once,
|
|
.acquire,
|
|
.monotonic,
|
|
) orelse {
|
|
@branchHint(.likely);
|
|
return;
|
|
};
|
|
if (initial_state == .contended) {
|
|
try io.futexWait(State, &m.state.raw, .contended);
|
|
}
|
|
while (m.state.swap(.contended, .acquire) != .unlocked) {
|
|
try io.futexWait(State, &m.state.raw, .contended);
|
|
}
|
|
}
|
|
|
|
/// Same as `lock`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn lockUncancelable(m: *Mutex, io: Io) void {
|
|
const initial_state = m.state.cmpxchgWeak(
|
|
.unlocked,
|
|
.locked_once,
|
|
.acquire,
|
|
.monotonic,
|
|
) orelse {
|
|
@branchHint(.likely);
|
|
return;
|
|
};
|
|
if (initial_state == .contended) {
|
|
io.futexWaitUncancelable(State, &m.state.raw, .contended);
|
|
}
|
|
while (m.state.swap(.contended, .acquire) != .unlocked) {
|
|
io.futexWaitUncancelable(State, &m.state.raw, .contended);
|
|
}
|
|
}
|
|
|
|
pub fn unlock(m: *Mutex, io: Io) void {
|
|
switch (m.state.swap(.unlocked, .release)) {
|
|
.unlocked => unreachable,
|
|
.locked_once => {},
|
|
.contended => {
|
|
@branchHint(.unlikely);
|
|
io.futexWake(State, &m.state.raw, 1);
|
|
},
|
|
}
|
|
}
|
|
};
|
|
|
|
pub const Condition = struct {
|
|
state: std.atomic.Value(State),
|
|
/// Incremented whenever the condition is signaled
|
|
epoch: std.atomic.Value(u32),
|
|
|
|
const State = packed struct(u32) {
|
|
waiters: u16,
|
|
signals: u16,
|
|
};
|
|
|
|
pub const init: Condition = .{
|
|
.state = .init(.{ .waiters = 0, .signals = 0 }),
|
|
.epoch = .init(0),
|
|
};
|
|
|
|
pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
|
|
try waitInner(cond, io, mutex, false);
|
|
}
|
|
|
|
/// Same as `wait`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
|
|
waitInner(cond, io, mutex, true) catch |err| switch (err) {
|
|
error.Canceled => unreachable,
|
|
};
|
|
}
|
|
|
|
fn waitInner(cond: *Condition, io: Io, mutex: *Mutex, uncancelable: bool) Cancelable!void {
|
|
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
|
|
|
|
{
|
|
const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic);
|
|
assert(prev_state.waiters < math.maxInt(u16)); // overflow caused by too many waiters
|
|
}
|
|
|
|
mutex.unlock(io);
|
|
defer mutex.lockUncancelable(io);
|
|
|
|
while (true) {
|
|
const result = if (uncancelable)
|
|
io.futexWaitUncancelable(u32, &cond.epoch.raw, epoch)
|
|
else
|
|
io.futexWait(u32, &cond.epoch.raw, epoch);
|
|
|
|
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
|
|
|
|
// Even on error, try to consume a pending signal first. Otherwise a race might
|
|
// cause a signal to get stuck in the state with no corresponding waiter.
|
|
{
|
|
var prev_state = cond.state.load(.monotonic);
|
|
while (prev_state.signals > 0) {
|
|
prev_state = cond.state.cmpxchgWeak(prev_state, .{
|
|
.waiters = prev_state.waiters - 1,
|
|
.signals = prev_state.signals - 1,
|
|
}, .acquire, .monotonic) orelse {
|
|
// We successfully consumed a signal.
|
|
return;
|
|
};
|
|
}
|
|
}
|
|
|
|
// There are no more signals available; this was a spurious wakeup or an error. If it
|
|
// was an error, we will remove ourselves as a waiter and return that error. Otherwise,
|
|
// we'll loop back to the futex wait.
|
|
result catch |err| {
|
|
const prev_state = cond.state.fetchSub(.{ .waiters = 1, .signals = 0 }, .monotonic);
|
|
assert(prev_state.waiters > 0); // underflow caused by illegal state
|
|
return err;
|
|
};
|
|
}
|
|
}
|
|
|
|
pub fn signal(cond: *Condition, io: Io) void {
|
|
var prev_state = cond.state.load(.monotonic);
|
|
while (prev_state.waiters > prev_state.signals) {
|
|
@branchHint(.unlikely);
|
|
prev_state = cond.state.cmpxchgWeak(prev_state, .{
|
|
.waiters = prev_state.waiters,
|
|
.signals = prev_state.signals + 1,
|
|
}, .release, .monotonic) orelse {
|
|
// Update the epoch to tell the waiting threads that there are new signals for them.
|
|
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
|
|
// between it observing the epoch and sleeping on it, but this is extraordinarily
|
|
// unlikely due to the precise number of calls required.
|
|
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
|
|
io.futexWake(u32, &cond.epoch.raw, 1);
|
|
return;
|
|
};
|
|
}
|
|
}
|
|
|
|
pub fn broadcast(cond: *Condition, io: Io) void {
|
|
var prev_state = cond.state.load(.monotonic);
|
|
while (prev_state.waiters > prev_state.signals) {
|
|
@branchHint(.unlikely);
|
|
prev_state = cond.state.cmpxchgWeak(prev_state, .{
|
|
.waiters = prev_state.waiters,
|
|
.signals = prev_state.waiters,
|
|
}, .release, .monotonic) orelse {
|
|
// Update the epoch to tell the waiting threads that there are new signals for them.
|
|
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
|
|
// between it observing the epoch and sleeping on it, but this is extraordinarily
|
|
// unlikely due to the precise number of calls required.
|
|
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
|
|
io.futexWake(u32, &cond.epoch.raw, prev_state.waiters - prev_state.signals);
|
|
return;
|
|
};
|
|
}
|
|
}
|
|
};
|
|
|
|
/// Logical boolean flag which can be set and unset and supports a "wait until set" operation.
|
|
pub const Event = enum(u32) {
|
|
unset,
|
|
waiting,
|
|
is_set,
|
|
|
|
/// Returns whether the logical boolean is `true`.
|
|
pub fn isSet(event: *const Event) bool {
|
|
return switch (@atomicLoad(Event, event, .acquire)) {
|
|
.unset, .waiting => false,
|
|
.is_set => true,
|
|
};
|
|
}
|
|
|
|
/// Blocks until the logical boolean is `true`.
|
|
pub fn wait(event: *Event, io: Io) Io.Cancelable!void {
|
|
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
|
|
.unset => unreachable,
|
|
.waiting => {},
|
|
.is_set => return,
|
|
};
|
|
errdefer {
|
|
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
|
|
// might be other threads waiting on the event. In theory we could track the *number* of
|
|
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
|
|
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
|
|
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
|
|
}
|
|
while (true) {
|
|
try io.futexWait(Event, event, .waiting);
|
|
switch (@atomicLoad(Event, event, .acquire)) {
|
|
.unset => unreachable, // `reset` called before pending `wait` returned
|
|
.waiting => continue,
|
|
.is_set => return,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Same as `wait`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn waitUncancelable(event: *Event, io: Io) void {
|
|
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
|
|
.unset => unreachable,
|
|
.waiting => {},
|
|
.is_set => return,
|
|
};
|
|
while (true) {
|
|
io.futexWaitUncancelable(Event, event, .waiting);
|
|
switch (@atomicLoad(Event, event, .acquire)) {
|
|
.unset => unreachable, // `reset` called before pending `wait` returned
|
|
.waiting => continue,
|
|
.is_set => return,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub const WaitTimeoutError = error{Timeout} || Cancelable;
|
|
|
|
/// Blocks the calling thread until either the logical boolean is set, the timeout expires, or a
|
|
/// spurious wakeup occurs. If the timeout expires or a spurious wakeup occurs, `error.Timeout`
|
|
/// is returned.
|
|
pub fn waitTimeout(event: *Event, io: Io, timeout: Timeout) WaitTimeoutError!void {
|
|
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
|
|
.unset => unreachable,
|
|
.waiting => assert(!builtin.single_threaded), // invalid state
|
|
.is_set => return,
|
|
};
|
|
errdefer {
|
|
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
|
|
// might be other threads waiting on the event. In theory we could track the *number* of
|
|
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
|
|
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
|
|
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
|
|
}
|
|
try io.futexWaitTimeout(Event, event, .waiting, timeout);
|
|
switch (@atomicLoad(Event, event, .acquire)) {
|
|
.unset => unreachable, // `reset` called before pending `wait` returned
|
|
.waiting => return error.Timeout,
|
|
.is_set => return,
|
|
}
|
|
}
|
|
|
|
/// Sets the logical boolean to true, and hence unblocks any pending calls to `wait`. The
|
|
/// logical boolean remains true until `reset` is called, so future calls to `set` have no
|
|
/// semantic effect.
|
|
///
|
|
/// Any memory accesses prior to a `set` call are "released", so that if this `set` call causes
|
|
/// `isSet` to return `true` or a wait to finish, those tasks will be able to observe those
|
|
/// memory accesses.
|
|
pub fn set(e: *Event, io: Io) void {
|
|
switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) {
|
|
.unset, .is_set => {},
|
|
.waiting => io.futexWake(Event, e, std.math.maxInt(u32)),
|
|
}
|
|
}
|
|
|
|
/// Sets the logical boolean to false.
|
|
///
|
|
/// Assumes that there is no pending call to `wait` or `waitUncancelable`.
|
|
///
|
|
/// However, concurrent calls to `isSet`, `set`, and `reset` are allowed.
|
|
pub fn reset(e: *Event) void {
|
|
@atomicStore(Event, e, .unset, .monotonic);
|
|
}
|
|
};
|
|
|
|
pub const QueueClosedError = error{Closed};
|
|
|
|
pub const TypeErasedQueue = struct {
|
|
mutex: Mutex,
|
|
closed: bool,
|
|
|
|
/// Ring buffer. This data is logically *after* queued getters.
|
|
buffer: []u8,
|
|
start: usize,
|
|
len: usize,
|
|
|
|
putters: std.DoublyLinkedList,
|
|
getters: std.DoublyLinkedList,
|
|
|
|
const Put = struct {
|
|
remaining: []const u8,
|
|
needed: usize,
|
|
condition: Condition,
|
|
node: std.DoublyLinkedList.Node,
|
|
};
|
|
|
|
const Get = struct {
|
|
remaining: []u8,
|
|
needed: usize,
|
|
condition: Condition,
|
|
node: std.DoublyLinkedList.Node,
|
|
};
|
|
|
|
pub fn init(buffer: []u8) TypeErasedQueue {
|
|
return .{
|
|
.mutex = .init,
|
|
.closed = false,
|
|
.buffer = buffer,
|
|
.start = 0,
|
|
.len = 0,
|
|
.putters = .{},
|
|
.getters = .{},
|
|
};
|
|
}
|
|
|
|
pub fn close(q: *TypeErasedQueue, io: Io) void {
|
|
q.mutex.lockUncancelable(io);
|
|
defer q.mutex.unlock(io);
|
|
q.closed = true;
|
|
{
|
|
var it = q.getters.first;
|
|
while (it) |node| : (it = node.next) {
|
|
const getter: *Get = @alignCast(@fieldParentPtr("node", node));
|
|
getter.condition.signal(io);
|
|
}
|
|
}
|
|
{
|
|
var it = q.putters.first;
|
|
while (it) |node| : (it = node.next) {
|
|
const putter: *Put = @alignCast(@fieldParentPtr("node", node));
|
|
putter.condition.signal(io);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) (QueueClosedError || Cancelable)!usize {
|
|
assert(elements.len >= min);
|
|
if (elements.len == 0) return 0;
|
|
try q.mutex.lock(io);
|
|
defer q.mutex.unlock(io);
|
|
return q.putLocked(io, elements, min, false);
|
|
}
|
|
|
|
/// Same as `put`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) QueueClosedError!usize {
|
|
assert(elements.len >= min);
|
|
if (elements.len == 0) return 0;
|
|
q.mutex.lockUncancelable(io);
|
|
defer q.mutex.unlock(io);
|
|
return q.putLocked(io, elements, min, true) catch |err| switch (err) {
|
|
error.Canceled => unreachable,
|
|
error.Closed => |e| return e,
|
|
};
|
|
}
|
|
|
|
fn puttableSlice(q: *const TypeErasedQueue) ?[]u8 {
|
|
const unwrapped_index = q.start + q.len;
|
|
const wrapped_index, const overflow = @subWithOverflow(unwrapped_index, q.buffer.len);
|
|
const slice = switch (overflow) {
|
|
1 => q.buffer[unwrapped_index..],
|
|
0 => q.buffer[wrapped_index..q.start],
|
|
};
|
|
return if (slice.len > 0) slice else null;
|
|
}
|
|
|
|
fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
|
|
// A closed queue cannot be added to, even if there is space in the buffer.
|
|
if (q.closed) return error.Closed;
|
|
|
|
// Getters have first priority on the data, and only when the getters
|
|
// queue is empty do we start populating the buffer.
|
|
|
|
// The number of elements we add immediately, before possibly blocking.
|
|
var n: usize = 0;
|
|
|
|
while (q.getters.popFirst()) |getter_node| {
|
|
const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
|
|
const copy_len = @min(getter.remaining.len, elements.len - n);
|
|
assert(copy_len > 0);
|
|
@memcpy(getter.remaining[0..copy_len], elements[n..][0..copy_len]);
|
|
getter.remaining = getter.remaining[copy_len..];
|
|
getter.needed -|= copy_len;
|
|
n += copy_len;
|
|
if (getter.needed == 0) {
|
|
getter.condition.signal(io);
|
|
} else {
|
|
assert(n == elements.len); // we didn't have enough elements for the getter
|
|
q.getters.prepend(getter_node);
|
|
}
|
|
if (n == elements.len) return elements.len;
|
|
}
|
|
|
|
while (q.puttableSlice()) |slice| {
|
|
const copy_len = @min(slice.len, elements.len - n);
|
|
assert(copy_len > 0);
|
|
@memcpy(slice[0..copy_len], elements[n..][0..copy_len]);
|
|
q.len += copy_len;
|
|
n += copy_len;
|
|
if (n == elements.len) return elements.len;
|
|
}
|
|
|
|
// Don't block if we hit the target.
|
|
if (n >= target) return n;
|
|
|
|
var pending: Put = .{
|
|
.remaining = elements[n..],
|
|
.needed = target - n,
|
|
.condition = .init,
|
|
.node = .{},
|
|
};
|
|
q.putters.append(&pending.node);
|
|
defer if (pending.needed > 0) q.putters.remove(&pending.node);
|
|
|
|
while (pending.needed > 0 and !q.closed) {
|
|
if (uncancelable) {
|
|
pending.condition.waitUncancelable(io, &q.mutex);
|
|
continue;
|
|
}
|
|
pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
|
|
error.Canceled => if (pending.remaining.len == elements.len) {
|
|
// Canceled while waiting, and appended no elements.
|
|
return error.Canceled;
|
|
} else {
|
|
// Canceled while waiting, but appended some elements, so report those first.
|
|
io.recancel();
|
|
return elements.len - pending.remaining.len;
|
|
},
|
|
};
|
|
}
|
|
if (pending.remaining.len == elements.len) {
|
|
// The queue was closed while we were waiting. We appended no elements.
|
|
assert(q.closed);
|
|
return error.Closed;
|
|
}
|
|
return elements.len - pending.remaining.len;
|
|
}
|
|
|
|
pub fn get(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) (QueueClosedError || Cancelable)!usize {
|
|
assert(buffer.len >= min);
|
|
if (buffer.len == 0) return 0;
|
|
try q.mutex.lock(io);
|
|
defer q.mutex.unlock(io);
|
|
return q.getLocked(io, buffer, min, false);
|
|
}
|
|
|
|
/// Same as `get`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) QueueClosedError!usize {
|
|
assert(buffer.len >= min);
|
|
if (buffer.len == 0) return 0;
|
|
q.mutex.lockUncancelable(io);
|
|
defer q.mutex.unlock(io);
|
|
return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
|
|
error.Canceled => unreachable,
|
|
error.Closed => |e| return e,
|
|
};
|
|
}
|
|
|
|
fn gettableSlice(q: *const TypeErasedQueue) ?[]const u8 {
|
|
const overlong_slice = q.buffer[q.start..];
|
|
const slice = overlong_slice[0..@min(overlong_slice.len, q.len)];
|
|
return if (slice.len > 0) slice else null;
|
|
}
|
|
|
|
fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
|
|
// The ring buffer gets first priority, then data should come from any
|
|
// queued putters, then finally the ring buffer should be filled with
|
|
// data from putters so they can be resumed.
|
|
|
|
// The number of elements we received immediately, before possibly blocking.
|
|
var n: usize = 0;
|
|
|
|
while (q.gettableSlice()) |slice| {
|
|
const copy_len = @min(slice.len, buffer.len - n);
|
|
assert(copy_len > 0);
|
|
@memcpy(buffer[n..][0..copy_len], slice[0..copy_len]);
|
|
q.start += copy_len;
|
|
if (q.buffer.len - q.start == 0) q.start = 0;
|
|
q.len -= copy_len;
|
|
n += copy_len;
|
|
if (n == buffer.len) {
|
|
q.fillRingBufferFromPutters(io);
|
|
return buffer.len;
|
|
}
|
|
}
|
|
|
|
// Copy directly from putters into buffer.
|
|
while (q.putters.popFirst()) |putter_node| {
|
|
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
|
|
const copy_len = @min(putter.remaining.len, buffer.len - n);
|
|
assert(copy_len > 0);
|
|
@memcpy(buffer[n..][0..copy_len], putter.remaining[0..copy_len]);
|
|
putter.remaining = putter.remaining[copy_len..];
|
|
putter.needed -|= copy_len;
|
|
n += copy_len;
|
|
if (putter.needed == 0) {
|
|
putter.condition.signal(io);
|
|
} else {
|
|
assert(n == buffer.len); // we didn't have enough space for the putter
|
|
q.putters.prepend(putter_node);
|
|
}
|
|
if (n == buffer.len) {
|
|
q.fillRingBufferFromPutters(io);
|
|
return buffer.len;
|
|
}
|
|
}
|
|
|
|
// No need to call `fillRingBufferFromPutters` from this point onwards,
|
|
// because we emptied the ring buffer *and* the putter queue!
|
|
|
|
// Don't block if we hit the target or if the queue is closed. Return how
|
|
// many elements we could get immediately, unless the queue was closed and
|
|
// empty, in which case report `error.Closed`.
|
|
if (n == 0 and q.closed) return error.Closed;
|
|
if (n >= target or q.closed) return n;
|
|
|
|
var pending: Get = .{
|
|
.remaining = buffer[n..],
|
|
.needed = target - n,
|
|
.condition = .init,
|
|
.node = .{},
|
|
};
|
|
q.getters.append(&pending.node);
|
|
defer if (pending.needed > 0) q.getters.remove(&pending.node);
|
|
|
|
while (pending.needed > 0 and !q.closed) {
|
|
if (uncancelable) {
|
|
pending.condition.waitUncancelable(io, &q.mutex);
|
|
continue;
|
|
}
|
|
pending.condition.wait(io, &q.mutex) catch |err| switch (err) {
|
|
error.Canceled => if (pending.remaining.len == buffer.len) {
|
|
// Canceled while waiting, and received no elements.
|
|
return error.Canceled;
|
|
} else {
|
|
// Canceled while waiting, but received some elements, so report those first.
|
|
io.recancel();
|
|
return buffer.len - pending.remaining.len;
|
|
},
|
|
};
|
|
}
|
|
if (pending.remaining.len == buffer.len) {
|
|
// The queue was closed while we were waiting. We received no elements.
|
|
assert(q.closed);
|
|
return error.Closed;
|
|
}
|
|
return buffer.len - pending.remaining.len;
|
|
}
|
|
|
|
/// Called when there is nonzero space available in the ring buffer and
|
|
/// potentially putters waiting. The mutex is already held and the task is
|
|
/// to copy putter data to the ring buffer and signal any putters whose
|
|
/// buffers been fully copied.
|
|
fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
|
|
while (q.putters.popFirst()) |putter_node| {
|
|
const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
|
|
while (q.puttableSlice()) |slice| {
|
|
const copy_len = @min(slice.len, putter.remaining.len);
|
|
assert(copy_len > 0);
|
|
@memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
|
|
q.len += copy_len;
|
|
putter.remaining = putter.remaining[copy_len..];
|
|
putter.needed -|= copy_len;
|
|
if (putter.needed == 0) {
|
|
putter.condition.signal(io);
|
|
break;
|
|
}
|
|
} else {
|
|
q.putters.prepend(putter_node);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
|
|
/// When buffer is empty, consumers suspend and are resumed by producers.
|
|
/// When buffer is full, producers suspend and are resumed by consumers.
|
|
pub fn Queue(Elem: type) type {
|
|
return struct {
|
|
type_erased: TypeErasedQueue,
|
|
|
|
pub fn init(buffer: []Elem) @This() {
|
|
return .{ .type_erased = .init(@ptrCast(buffer)) };
|
|
}
|
|
|
|
pub fn close(q: *@This(), io: Io) void {
|
|
q.type_erased.close(io);
|
|
}
|
|
|
|
/// Appends elements to the end of the queue, potentially blocking if
|
|
/// there is insufficient capacity. Returns when any one of the
|
|
/// following conditions is satisfied:
|
|
///
|
|
/// * At least `target` elements have been added to the queue
|
|
/// * The queue is closed
|
|
/// * The current task is canceled
|
|
///
|
|
/// Returns how many of `elements` have been added to the queue, if any.
|
|
/// If an error is returned, no elements have been added.
|
|
///
|
|
/// If the queue is closed or the task is canceled, but some items were
|
|
/// already added before the closure or cancelation, then `put` may
|
|
/// return a number lower than `target`, in which case future calls are
|
|
/// guaranteed to return `error.Canceled` or `error.Closed`.
|
|
///
|
|
/// A return value of 0 is only possible if `target` is 0, in which case
|
|
/// the call is guaranteed to queue as many of `elements` as is possible
|
|
/// *without* blocking.
|
|
///
|
|
/// Asserts that `elements.len >= target`.
|
|
pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize {
|
|
return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem));
|
|
}
|
|
|
|
/// Same as `put` but blocks until all elements have been added to the queue.
|
|
///
|
|
/// If the queue is closed or canceled, `error.Closed` or `error.Canceled`
|
|
/// is returned, and it is unspecified how many, if any, of `elements` were
|
|
/// added to the queue prior to cancelation or closure.
|
|
pub fn putAll(q: *@This(), io: Io, elements: []const Elem) (QueueClosedError || Cancelable)!void {
|
|
const n = try q.put(io, elements, elements.len);
|
|
if (n != elements.len) {
|
|
_ = try q.put(io, elements[n..], elements.len - n);
|
|
unreachable; // partial `put` implies queue was closed or we were canceled
|
|
}
|
|
}
|
|
|
|
/// Same as `put`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) QueueClosedError!usize {
|
|
return @divExact(try q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
|
|
}
|
|
|
|
/// Appends `item` to the end of the queue, blocking if the queue is full.
|
|
pub fn putOne(q: *@This(), io: Io, item: Elem) (QueueClosedError || Cancelable)!void {
|
|
assert(try q.put(io, &.{item}, 1) == 1);
|
|
}
|
|
|
|
/// Same as `putOne`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) QueueClosedError!void {
|
|
assert(try q.putUncancelable(io, &.{item}, 1) == 1);
|
|
}
|
|
|
|
/// Receives elements from the beginning of the queue, potentially blocking
|
|
/// if there are insufficient elements currently in the queue. Returns when
|
|
/// any one of the following conditions is satisfied:
|
|
///
|
|
/// * At least `target` elements have been received from the queue
|
|
/// * The queue is closed and contains no buffered elements
|
|
/// * The current task is canceled
|
|
///
|
|
/// Returns how many elements of `buffer` have been populated, if any.
|
|
/// If an error is returned, no elements have been populated.
|
|
///
|
|
/// If the queue is closed or the task is canceled, but some items were
|
|
/// already received before the closure or cancelation, then `get` may
|
|
/// return a number lower than `target`, in which case future calls are
|
|
/// guaranteed to return `error.Canceled` or `error.Closed`.
|
|
///
|
|
/// A return value of 0 is only possible if `target` is 0, in which case
|
|
/// the call is guaranteed to fill as much of `buffer` as is possible
|
|
/// *without* blocking.
|
|
///
|
|
/// Asserts that `buffer.len >= target`.
|
|
pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize {
|
|
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem));
|
|
}
|
|
|
|
/// Same as `get`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) QueueClosedError!usize {
|
|
return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
|
|
}
|
|
|
|
/// Receives one element from the beginning of the queue, blocking if the queue is empty.
|
|
pub fn getOne(q: *@This(), io: Io) (QueueClosedError || Cancelable)!Elem {
|
|
var buf: [1]Elem = undefined;
|
|
assert(try q.get(io, &buf, 1) == 1);
|
|
return buf[0];
|
|
}
|
|
|
|
/// Same as `getOne`, except does not introduce a cancelation point.
|
|
///
|
|
/// For a description of cancelation and cancelation points, see `Future.cancel`.
|
|
pub fn getOneUncancelable(q: *@This(), io: Io) QueueClosedError!Elem {
|
|
var buf: [1]Elem = undefined;
|
|
assert(try q.getUncancelable(io, &buf, 1) == 1);
|
|
return buf[0];
|
|
}
|
|
|
|
/// Returns buffer length in `Elem` units.
|
|
pub fn capacity(q: *const @This()) usize {
|
|
return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));
|
|
}
|
|
};
|
|
}
|
|
|
|
/// Calls `function` with `args`, such that the return value of the function is
|
|
/// not guaranteed to be available until `await` is called.
|
|
///
|
|
/// `function` *may* be called immediately, before `async` returns. This has
|
|
/// weaker guarantees than `concurrent`, making more portable and reusable.
|
|
///
|
|
/// When this function returns, it is guaranteed that `function` has already
|
|
/// been called and completed, or it has successfully been assigned a unit of
|
|
/// concurrency.
|
|
///
|
|
/// See also:
|
|
/// * `Group`
|
|
pub fn async(
|
|
io: Io,
|
|
function: anytype,
|
|
args: std.meta.ArgsTuple(@TypeOf(function)),
|
|
) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
|
|
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
|
|
const Args = @TypeOf(args);
|
|
const TypeErased = struct {
|
|
fn start(context: *const anyopaque, result: *anyopaque) void {
|
|
const args_casted: *const Args = @ptrCast(@alignCast(context));
|
|
const result_casted: *Result = @ptrCast(@alignCast(result));
|
|
result_casted.* = @call(.auto, function, args_casted.*);
|
|
}
|
|
};
|
|
var future: Future(Result) = undefined;
|
|
future.any_future = io.vtable.async(
|
|
io.userdata,
|
|
@ptrCast(&future.result),
|
|
.of(Result),
|
|
@ptrCast(&args),
|
|
.of(Args),
|
|
TypeErased.start,
|
|
);
|
|
return future;
|
|
}
|
|
|
|
pub const ConcurrentError = error{
|
|
/// May occur due to a temporary condition such as resource exhaustion, or
|
|
/// to the Io implementation not supporting concurrency.
|
|
ConcurrencyUnavailable,
|
|
};
|
|
|
|
/// Calls `function` with `args`, such that the return value of the function is
|
|
/// not guaranteed to be available until `await` is called, allowing the caller
|
|
/// to progress while waiting for any `Io` operations.
|
|
///
|
|
/// This has stronger guarantee than `async`, placing restrictions on what kind
|
|
/// of `Io` implementations are supported. By calling `async` instead, one
|
|
/// allows, for example, stackful single-threaded blocking I/O.
|
|
pub fn concurrent(
|
|
io: Io,
|
|
function: anytype,
|
|
args: std.meta.ArgsTuple(@TypeOf(function)),
|
|
) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
|
|
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
|
|
const Args = @TypeOf(args);
|
|
const TypeErased = struct {
|
|
fn start(context: *const anyopaque, result: *anyopaque) void {
|
|
const args_casted: *const Args = @ptrCast(@alignCast(context));
|
|
const result_casted: *Result = @ptrCast(@alignCast(result));
|
|
result_casted.* = @call(.auto, function, args_casted.*);
|
|
}
|
|
};
|
|
var future: Future(Result) = undefined;
|
|
future.any_future = try io.vtable.concurrent(
|
|
io.userdata,
|
|
@sizeOf(Result),
|
|
.of(Result),
|
|
@ptrCast(&args),
|
|
.of(Args),
|
|
TypeErased.start,
|
|
);
|
|
return future;
|
|
}
|
|
|
|
pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
|
|
|
|
pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {
|
|
return io.vtable.sleep(io.userdata, .{ .duration = .{
|
|
.raw = duration,
|
|
.clock = clock,
|
|
} });
|
|
}
|
|
|
|
/// Given a struct with each field a `*Future`, returns a union with the same
|
|
/// fields, each field type the future's result.
|
|
pub fn SelectUnion(S: type) type {
|
|
const struct_fields = @typeInfo(S).@"struct".fields;
|
|
var names: [struct_fields.len][]const u8 = undefined;
|
|
var types: [struct_fields.len]type = undefined;
|
|
for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| {
|
|
const FieldFuture = @typeInfo(struct_field.type).pointer.child;
|
|
union_field_name.* = struct_field.name;
|
|
UnionFieldType.* = @FieldType(FieldFuture, "result");
|
|
}
|
|
return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{}));
|
|
}
|
|
|
|
/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
|
|
/// and can be different for each field.
|
|
pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {
|
|
const U = SelectUnion(@TypeOf(s));
|
|
const S = @TypeOf(s);
|
|
const fields = @typeInfo(S).@"struct".fields;
|
|
var futures: [fields.len]*AnyFuture = undefined;
|
|
inline for (fields, &futures) |field, *any_future| {
|
|
const future = @field(s, field.name);
|
|
any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result);
|
|
}
|
|
switch (try io.vtable.select(io.userdata, &futures)) {
|
|
inline 0...(fields.len - 1) => |selected_index| {
|
|
const field_name = fields[selected_index].name;
|
|
return @unionInit(U, field_name, @field(s, field_name).await(io));
|
|
},
|
|
else => unreachable,
|
|
}
|
|
}
|
|
|
|
pub const LockedStderr = struct {
|
|
file_writer: *File.Writer,
|
|
terminal_mode: Terminal.Mode,
|
|
|
|
pub fn terminal(ls: LockedStderr) Terminal {
|
|
return .{
|
|
.writer = &ls.file_writer.interface,
|
|
.mode = ls.terminal_mode,
|
|
};
|
|
}
|
|
|
|
pub fn clear(ls: LockedStderr, buffer: []u8) Cancelable!void {
|
|
const fw = ls.file_writer;
|
|
std.Progress.clearWrittenWithEscapeCodes(fw) catch |err| switch (err) {
|
|
error.WriteFailed => switch (fw.err.?) {
|
|
error.Canceled => |e| return e,
|
|
else => {},
|
|
},
|
|
};
|
|
fw.interface.flush() catch |err| switch (err) {
|
|
error.WriteFailed => switch (fw.err.?) {
|
|
error.Canceled => |e| return e,
|
|
else => {},
|
|
},
|
|
};
|
|
fw.interface.buffer = buffer;
|
|
}
|
|
};
|
|
|
|
/// For doing application-level writes to the standard error stream.
|
|
/// Coordinates also with debug-level writes that are ignorant of Io interface
|
|
/// and implementations. When this returns, `std.process.stderr_thread_mutex`
|
|
/// will be locked.
|
|
///
|
|
/// See also:
|
|
/// * `tryLockStderr`
|
|
pub fn lockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!LockedStderr {
|
|
const ls = try io.vtable.lockStderr(io.userdata, terminal_mode);
|
|
try ls.clear(buffer);
|
|
return ls;
|
|
}
|
|
|
|
/// Same as `lockStderr` but non-blocking.
|
|
pub fn tryLockStderr(io: Io, buffer: []u8, terminal_mode: ?Terminal.Mode) Cancelable!?LockedStderr {
|
|
const ls = (try io.vtable.tryLockStderr(io.userdata, buffer, terminal_mode)) orelse return null;
|
|
try ls.clear(buffer);
|
|
return ls;
|
|
}
|
|
|
|
pub fn unlockStderr(io: Io) void {
|
|
return io.vtable.unlockStderr(io.userdata);
|
|
}
|
|
|
|
/// Obtains entropy from a cryptographically secure pseudo-random number
|
|
/// generator.
|
|
///
|
|
/// The implementation *may* store RNG state in process memory and use it to
|
|
/// fill `buffer`.
|
|
///
|
|
/// The randomness is seeded by `randomSecure`, or a less secure mechanism upon
|
|
/// failure.
|
|
///
|
|
/// Threadsafe.
|
|
///
|
|
/// See also `randomSecure`.
|
|
pub fn random(io: Io, buffer: []u8) void {
|
|
return io.vtable.random(io.userdata, buffer);
|
|
}
|
|
|
|
pub const RandomSecureError = error{EntropyUnavailable} || Cancelable;
|
|
|
|
/// Obtains cryptographically secure entropy from outside the process.
|
|
///
|
|
/// Always makes a syscall, or otherwise avoids dependency on process memory,
|
|
/// in order to obtain fresh randomness. Does not rely on stored RNG state.
|
|
///
|
|
/// Does not have any fallback mechanisms; returns `error.EntropyUnavailable`
|
|
/// if any problems occur.
|
|
///
|
|
/// Threadsafe.
|
|
///
|
|
/// See also `random`.
|
|
pub fn randomSecure(io: Io, buffer: []u8) RandomSecureError!void {
|
|
return io.vtable.randomSecure(io.userdata, buffer);
|
|
}
|