std.Io: add new cancelation APIs

Also, better document how cancelation actually works.
This commit is contained in:
Matthew Lugg 2025-12-20 14:46:29 +00:00
parent 9bf65f6e05
commit fa7e818e14
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
3 changed files with 251 additions and 13 deletions

View file

@ -650,6 +650,10 @@ pub const VTable = struct {
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
recancel: *const fn (?*anyopaque) void,
swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection,
checkCancel: *const fn (?*anyopaque) Cancelable!void,
/// Blocks until one of the futures from the list has a result ready, such
/// that awaiting it will not block. Returns that index.
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
@ -982,7 +986,14 @@ pub fn Future(Result: type) type {
any_future: ?*AnyFuture,
result: Result,
/// Equivalent to `await` but places a cancellation request.
/// Equivalent to `await` but places a cancellation request. This causes the task to receive
/// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
/// call to a function in `Io` which can return `error.Canceled`.
///
/// After cancelation of a task is requested, only the next cancelation point in that task
/// will return `error.Canceled`: future points will not re-signal the cancelation. As such,
/// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation
/// requests, see also `recancel` and `CancelProtection`.
///
/// Idempotent. Not threadsafe.
pub fn cancel(f: *@This(), io: Io) Result {
@ -1079,6 +1090,8 @@ pub const Group = struct {
/// Equivalent to `wait` but immediately requests cancellation on all
/// members of the group.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// Idempotent. Not threadsafe.
pub fn cancel(g: *Group, io: Io) void {
const token = g.token orelse return;
@ -1087,6 +1100,61 @@ pub const Group = struct {
}
};
/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the
/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation
/// point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn recancel(io: Io) void {
io.vtable.recancel(io.userdata);
}
/// In rare cases, it is desirable to completely block cancelation notification, so that a region
/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every
/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce
/// cancelation points.
///
/// To modify a task's cancel protection state, see `swapCancelProtection`.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub const CancelProtection = enum {
/// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point.
///
/// This is the default state, which all tasks are created in.
unblocked,
/// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned).
blocked,
};
/// Updates the current task's cancel protection state (see `CancelProtection`).
///
/// The typical usage for this function is to protect a block of code from cancelation:
/// ```
/// const old_cancel_protect = io.swapCancelProtection(.blocked);
/// defer _ = io.swapCancelProtection(old_cancel_protect);
/// doSomeWork() catch |err| switch (err) {
/// error.Canceled => unreachable,
/// };
/// ```
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection {
return io.vtable.swapCancelProtection(io.userdata, new);
}
/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`)
/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding
/// non-blocked cancelation request, but otherwise is a no-op.
///
/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound
/// tasks which may need to respond to cancelation before completing. Short tasks, or those which
/// perform other `Io` operations (and hence have other cancelation points), will typically already
/// respond quickly to cancelation requests.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn checkCancel(io: Io) Cancelable!void {
return io.vtable.checkCancel(io.userdata);
}
pub fn Select(comptime U: type) type {
return struct {
io: Io,
@ -1160,6 +1228,8 @@ pub fn Select(comptime U: type) type {
/// Equivalent to `wait` but requests cancellation on all remaining
/// tasks owned by the select.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// It is illegal to call `wait` after this.
///
/// Idempotent. Not threadsafe.
@ -1193,7 +1263,9 @@ pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) con
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_raw.*, timeout);
}
/// Same as `futexWait`, except is not affected by task cancelation.
/// Same as `futexWait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void {
comptime assert(@sizeOf(T) == @sizeOf(u32));
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
@ -1247,6 +1319,9 @@ pub const Mutex = struct {
}
}
/// Same as `lock`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn lockUncancelable(m: *Mutex, io: Io) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
@ -1296,6 +1371,9 @@ pub const Condition = struct {
try waitInner(cond, io, mutex, false);
}
/// Same as `wait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
waitInner(cond, io, mutex, true) catch |err| switch (err) {
error.Canceled => unreachable,
@ -1424,7 +1502,9 @@ pub const Event = enum(u32) {
}
}
/// Same as `wait` except uninterruptible.
/// Same as `wait`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(event: *Event, io: Io) void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
@ -1531,7 +1611,9 @@ pub const TypeErasedQueue = struct {
return q.putLocked(io, elements, min, false);
}
/// Same as `put` but cannot be canceled.
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
@ -1602,7 +1684,10 @@ pub const TypeErasedQueue = struct {
return q.getLocked(io, buffer, min, false);
}
pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
@ -1722,7 +1807,9 @@ pub fn Queue(Elem: type) type {
assert(try q.put(io, elements, elements.len) == elements.len);
}
/// Same as `put` but cannot be interrupted.
/// Same as `put`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
@ -1731,6 +1818,9 @@ pub fn Queue(Elem: type) type {
assert(try q.put(io, &.{item}, 1) == 1);
}
/// Same as `putOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
assert(q.putUncancelable(io, &.{item}, 1) == 1);
}
@ -1746,8 +1836,11 @@ pub fn Queue(Elem: type) type {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `get`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
@ -1756,6 +1849,9 @@ pub fn Queue(Elem: type) type {
return buf[0];
}
/// Same as `getOne`, except does not introduce a cancelation point.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
var buf: [1]Elem = undefined;
assert(q.getUncancelable(io, &buf, 1) == 1);
@ -1846,10 +1942,6 @@ pub fn concurrent(
return future;
}
pub fn cancelRequested(io: Io) bool {
return io.vtable.cancelRequested(io.userdata);
}
pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {

View file

@ -86,7 +86,9 @@ const Thread = struct {
/// The value that needs to be passed to pthread_kill or tgkill in order to
/// send a signal.
signal_id: SignaleeId,
current_closure: ?*Closure = null,
current_closure: ?*Closure,
/// Only populated if `current_closure != null`. Indicates the current cancel protection mode.
cancel_protection: Io.CancelProtection,
const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id;
@ -98,6 +100,12 @@ const Thread = struct {
fn checkCancel(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
switch (thread.cancel_protection) {
.unblocked => {},
.blocked => return,
}
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@ -115,6 +123,11 @@ const Thread = struct {
fn beginSyscall(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
switch (thread.cancel_protection) {
.unblocked => {},
.blocked => return,
}
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@ -135,6 +148,12 @@ const Thread = struct {
fn endSyscall(thread: *Thread) void {
const closure = thread.current_closure orelse return;
switch (thread.cancel_protection) {
.unblocked => {},
.blocked => return,
}
_ = @cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@ -512,6 +531,8 @@ pub fn init(
.have_signal_handler = false,
.main_thread = .{
.signal_id = Thread.currentSignalId(),
.current_closure = null,
.cancel_protection = undefined,
},
};
@ -546,7 +567,11 @@ pub const init_single_threaded: Threaded = .{
.old_sig_io = undefined,
.old_sig_pipe = undefined,
.have_signal_handler = false,
.main_thread = .{ .signal_id = undefined },
.main_thread = .{
.signal_id = undefined,
.current_closure = null,
.cancel_protection = undefined,
},
};
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
@ -581,6 +606,8 @@ fn join(t: *Threaded) void {
fn worker(t: *Threaded) void {
var thread: Thread = .{
.signal_id = Thread.currentSignalId(),
.current_closure = null,
.cancel_protection = undefined,
};
Thread.current = &thread;
@ -617,6 +644,10 @@ pub fn io(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
.recancel = recancel,
.swapCancelProtection = swapCancelProtection,
.checkCancel = checkCancel,
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
@ -709,6 +740,10 @@ pub fn ioBasic(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
.recancel = recancel,
.swapCancelProtection = swapCancelProtection,
.checkCancel = checkCancel,
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
@ -794,9 +829,14 @@ const AsyncClosure = struct {
fn start(closure: *Closure, t: *Threaded) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const current_thread = Thread.getCurrent(t);
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;
ac.func(ac.contextPointer(), ac.resultPointer());
current_thread.current_closure = null;
current_thread.cancel_protection = undefined;
if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| {
assert(select_event != done_event);
@ -978,9 +1018,14 @@ const GroupClosure = struct {
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const event: *Io.Event = @ptrCast(&group.context);
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;
gc.func(group, gc.contextPointer());
current_thread.current_closure = null;
current_thread.cancel_protection = undefined;
const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel);
assert((prev_state / sync_one_pending) > 0);
@ -1201,6 +1246,32 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
}
}
fn recancel(userdata: ?*anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread: *Thread = .getCurrent(t);
const cancel_status = &current_thread.current_closure.?.cancel_status;
switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) {
.none => unreachable, // called `recancel` when not canceled
.requested => unreachable, // called `recancel` when cancelation was already outstanding
.acknowledged => {},
_ => unreachable, // invalid state: not in a syscall
}
@atomicStore(CancelStatus, cancel_status, .requested, .monotonic);
}
fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread: *Thread = .getCurrent(t);
const old = current_thread.cancel_protection;
current_thread.cancel_protection = new;
return old;
}
fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
return Thread.getCurrent(t).checkCancel();
}
fn await(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,

View file

@ -291,3 +291,78 @@ test "Event" {
try std.testing.expectError(error.Canceled, future.cancel(io));
}
}
test "recancel" {
const global = struct {
fn worker(io: Io) Io.Cancelable!void {
var dummy_event: Io.Event = .unset;
if (dummy_event.wait(io)) {
return;
} else |err| switch (err) {
error.Canceled => io.recancel(),
}
// Now we expect to see `error.Canceled` again.
return dummy_event.wait(io);
}
};
const io = std.testing.io;
var future = io.concurrent(global.worker, .{io}) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
if (future.cancel(io)) {
return error.UnexpectedSuccess; // both `wait` calls should have returned `error.Canceled`
} else |err| switch (err) {
error.Canceled => {},
}
}
test "swapCancelProtection" {
const global = struct {
fn waitTwice(
io: Io,
event: *Io.Event,
) error{ Canceled, CanceledWhileProtected }!void {
// Wait for `event` while protected from cancelation.
{
const old_prot = io.swapCancelProtection(.blocked);
defer _ = io.swapCancelProtection(old_prot);
event.wait(io) catch |err| switch (err) {
error.Canceled => return error.CanceledWhileProtected,
};
}
// Reset the event (it will never be set again), and this time wait for it without protection.
event.reset();
_ = try event.wait(io);
}
fn sleepThenSet(io: Io, event: *Io.Event) !void {
// Give `waitTwice` a chance to get canceled.
try io.sleep(.fromMilliseconds(200), .awake);
event.set(io);
}
};
const io = std.testing.io;
var event: Io.Event = .unset;
var wait_future = io.concurrent(global.waitTwice, .{ io, &event }) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
defer wait_future.cancel(io) catch {};
var set_future = try io.concurrent(global.sleepThenSet, .{ io, &event });
defer set_future.cancel(io) catch {};
if (wait_future.cancel(io)) {
return error.UnexpectedSuccess; // there was no `set` call to unblock the second `wait`
} else |err| switch (err) {
error.Canceled => {},
error.CanceledWhileProtected => |e| return e,
}
// Because it reached the `set`, it should be too late for `sleepThenSet` to see `error.Canceled`.
try set_future.cancel(io);
}