Io.Dispatch.Mutex: fix deadlock conditions

This commit is contained in:
Jacob Young 2026-02-14 04:49:03 -05:00
parent f996d28666
commit b7f93695f9
2 changed files with 150 additions and 131 deletions

View file

@ -280,7 +280,7 @@ const Fiber = struct {
.select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) {
ev.queue.async(fiber, &Fiber.@"resume");
},
_ => |awaiting| awaiting.toCancelable().canceled(),
_ => |awaiting| awaiting.toCancelable().async(),
}
}
@ -484,7 +484,7 @@ pub fn io(ev: *Evented) Io {
pub const InitOptions = struct {
backing_allocator_needs_mutex: bool = true,
queue: ?c.dispatch.queue_t = null,
target_queue: ?c.dispatch.queue_t = .TARGET_DEFAULT,
/// Upper limit on the allowable delay in processing timeouts in order to improve power
/// consumption and system performance.
leeway: Io.Duration = .fromMilliseconds(10),
@ -499,11 +499,11 @@ pub const InitOptions = struct {
};
pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !void {
const queue = if (options.queue) |queue| queue: {
queue.as_object().retain();
break :queue queue;
} else c.dispatch.queue_create("org.ziglang.std.Io.Dispatch", .CONCURRENT()) orelse
return error.SystemResources;
const queue = c.dispatch.queue_create_with_target(
"org.ziglang.std.Io.Dispatch",
.CONCURRENT(),
options.target_queue,
) orelse return error.SystemResources;
errdefer queue.as_object().release();
const main_loop_stack = try backing_allocator.alignedAlloc(
u8,
@ -727,9 +727,9 @@ const Cancelable = struct {
const blocked: Cancelable = .{ .queue = undefined, .cancel = is_blocked };
const AwaitError = error{CancelRequested};
const RequestedError = error{CancelRequested};
fn await(cancelable: *Cancelable, fiber: *Fiber) AwaitError!void {
fn enter(cancelable: *Cancelable, fiber: *Fiber) RequestedError!void {
const function = cancelable.cancel;
assert(function != is_requested);
if (function == is_blocked) {
@ -750,13 +750,42 @@ const Cancelable = struct {
}
}
fn canceled(cancelable: *Cancelable) void {
assert(cancelable.cancel != is_blocked);
assert(cancelable.cancel != is_requested);
cancelable.queue.async(cancelable, cancelable.cancel);
fn leave(cancelable: *Cancelable, fiber: *Fiber) RequestedError!void {
const function = cancelable.cancel;
assert(function != is_requested);
if (function == is_blocked) {
@branchHint(.unlikely);
return;
}
const cancel_status = @atomicRmw(Fiber.CancelStatus, &fiber.cancel_status, .And, .{
.requested = true,
.awaiting = .nothing,
}, .monotonic);
assert(cancel_status.awaiting.toCancelable() == cancelable);
if (cancel_status.requested) return error.CancelRequested;
}
fn check(cancelable: *Cancelable, fiber: *Fiber) Io.Cancelable!void {
fn async(cancelable: *Cancelable) void {
const function = cancelable.cancel;
assert(function != is_blocked and function != is_requested);
cancelable.queue.async(cancelable, function);
}
fn requested(cancelable: *Cancelable, fiber: *Fiber) void {
const function = cancelable.cancel;
assert(function != is_blocked and function != is_requested);
assert(@atomicLoad(Fiber.CancelStatus, &fiber.cancel_status, .monotonic) == Fiber.CancelStatus{
.requested = true,
.awaiting = .fromCancelable(cancelable),
});
cancelable.cancel = is_requested;
@atomicStore(Fiber.CancelStatus, &fiber.cancel_status, .{
.requested = true,
.awaiting = .nothing,
}, .monotonic);
}
fn acknowledge(cancelable: *Cancelable, fiber: *Fiber) Io.Cancelable!void {
if (cancelable.cancel == is_requested) {
@branchHint(.unlikely);
fiber.cancel_protection.acknowledge();
@ -784,81 +813,97 @@ const Sleeper = struct {
};
const Mutex = struct {
/// including the locker
num_waiters: usize,
state: State,
queue: c.dispatch.queue_t,
waiters: std.DoublyLinkedList,
const State = packed struct(usize) {
locked: bool,
num_waiters: NumWaiters,
const NumWaiters = @Int(.unsigned, @bitSizeOf(usize) - 1);
};
const Waiter = struct {
sleeper: Sleeper = undefined,
cancelable: Cancelable,
mutex: *Mutex,
node: std.DoublyLinkedList.Node = .{},
node: std.DoublyLinkedList.Node = undefined,
fn add(context: ?*anyopaque) callconv(.c) void {
const waiter: *Waiter = @ptrCast(@alignCast(context));
waiter.tryAdd() catch |err| switch (err) {
waiter.cancelable.enter(waiter.sleeper.fiber) catch |err| switch (err) {
error.CancelRequested => return waiter.wake(),
};
var state = @atomicRmw(State, &waiter.mutex.state, .Add, .{
.locked = false,
.num_waiters = 1,
}, .monotonic);
state.num_waiters += 1;
while (!state.locked) {
@branchHint(.unlikely);
state = @cmpxchgWeak(State, &waiter.mutex.state, state, .{
.locked = true,
.num_waiters = state.num_waiters - 1,
}, .acquire, .monotonic) orelse break;
} else return waiter.mutex.waiters.append(&waiter.node);
waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) {
error.CancelRequested => {
waiter.wake();
assert(@atomicRmw(usize, &waiter.mutex.num_waiters, .Sub, 1, .monotonic) >= 1);
waiter.node.next = &waiter.node;
return;
},
};
}
fn tryAdd(waiter: *Waiter) Cancelable.AwaitError!void {
switch (@atomicLoad(usize, &waiter.mutex.num_waiters, .acquire)) {
0 => unreachable,
1 => return waiter.wake(), // already locked exclusively
else => try waiter.cancelable.await(waiter.sleeper.fiber),
}
waiter.mutex.waiters.append(&waiter.node);
waiter.wake();
}
fn canceled(context: ?*anyopaque) callconv(.c) void {
const cancelable: *Cancelable = @ptrCast(@alignCast(context));
cancelable.cancel = Cancelable.is_requested;
const waiter: *Waiter = @fieldParentPtr("cancelable", cancelable);
assert(@atomicRmw(
Fiber.CancelStatus,
&waiter.sleeper.fiber.cancel_status,
.Xchg,
.{ .requested = true, .awaiting = .nothing },
.monotonic,
) == Fiber.CancelStatus{ .requested = true, .awaiting = .fromCancelable(cancelable) });
cancelable.requested(waiter.sleeper.fiber);
const mutex = waiter.mutex;
mutex.waiters.remove(&waiter.node);
if (waiter.node.next != &waiter.node) {
@branchHint(.likely);
mutex.waiters.remove(&waiter.node);
assert(@atomicRmw(State, &mutex.state, .Sub, .{
.locked = false,
.num_waiters = 1,
}, .monotonic).num_waiters >= 1);
}
waiter.node = undefined;
waiter.wake();
assert(@atomicRmw(usize, &mutex.num_waiters, .Sub, 1, .monotonic) >= 1);
}
fn remove(context: ?*anyopaque) callconv(.c) void {
const mutex: *Mutex = @ptrCast(@alignCast(context));
var stop_node: ?*std.DoublyLinkedList.Node = null;
while (mutex.waiters.first != stop_node) {
var state = @atomicLoad(State, &mutex.state, .monotonic);
while (!state.locked and state.num_waiters > 0) {
@branchHint(.likely);
const waiter: *Waiter = @fieldParentPtr("node", mutex.waiters.popFirst().?);
if (waiter.cancelable.cancel != Cancelable.is_blocked) {
@branchHint(.likely);
const cancel_status = @atomicRmw(
Fiber.CancelStatus,
&waiter.sleeper.fiber.cancel_status,
.And,
.{ .requested = true, .awaiting = .nothing },
.monotonic,
);
assert(cancel_status.awaiting.toCancelable() == &waiter.cancelable);
if (cancel_status.requested) {
@branchHint(.unlikely);
// carefully place the hot potato out of the way
mutex.waiters.append(&waiter.node);
if (stop_node == null) stop_node = &waiter.node;
state = @cmpxchgWeak(State, &mutex.state, state, .{
.locked = true,
.num_waiters = state.num_waiters - 1,
}, .acquire, .monotonic) orelse break;
} else return;
var num_removed: State.NumWaiters = 0;
while (mutex.waiters.popFirst()) |node| {
@branchHint(.likely);
const waiter: *Waiter = @fieldParentPtr("node", node);
node.* = undefined;
waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) {
error.CancelRequested => {
num_removed += 1;
node.next = node;
continue;
}
}
waiter.wake();
return;
},
};
break;
}
if (num_removed > 0) {
@branchHint(.unlikely);
assert(@atomicRmw(State, &mutex.state, .Sub, .{
.locked = false,
.num_waiters = num_removed,
}, .monotonic).num_waiters >= num_removed);
}
// everyone is about to die, nobody will wake up ;-(
}
fn wake(waiter: *Waiter) void {
@ -868,7 +913,7 @@ const Mutex = struct {
fn init(mutex: *Mutex, queue: c.dispatch.queue_t) error{SystemResources}!void {
mutex.* = .{
.num_waiters = 0,
.state = .{ .locked = false, .num_waiters = 0 },
.queue = c.dispatch.queue_create_with_target(
"org.ziglang.std.Io.Dispatch.Mutex",
.SERIAL(),
@ -879,56 +924,48 @@ const Mutex = struct {
}
fn deinit(mutex: *Mutex) void {
assert(mutex.num_waiters == 0 and mutex.waiters.first == null and mutex.waiters.last == null);
assert(mutex.state == State{ .locked = false, .num_waiters = 0 });
assert(mutex.waiters.first == null and mutex.waiters.last == null);
mutex.queue.as_object().release();
mutex.* = undefined;
}
fn tryLock(mutex: *Mutex) bool {
if (@cmpxchgWeak(usize, &mutex.num_waiters, 0, 1, .acquire, .monotonic) == null) {
@branchHint(.likely);
return true;
const state =
@atomicRmw(State, &mutex.state, .Or, .{ .locked = true, .num_waiters = 0 }, .acquire);
if (state.locked) {
@branchHint(.unlikely);
}
return false;
return !state.locked;
}
fn lock(mutex: *Mutex, ev: *Evented) Io.Cancelable!void {
switch (@atomicRmw(usize, &mutex.num_waiters, .Add, 1, .acquire)) {
0 => {},
else => {
@branchHint(.unlikely);
var waiter: Waiter = .{
.cancelable = .{ .queue = mutex.queue, .cancel = &Mutex.Waiter.canceled },
.mutex = mutex,
};
ev.yield(.{ .mutex_wait = &waiter });
try waiter.cancelable.check(waiter.sleeper.fiber);
},
}
if (mutex.tryLock()) return;
var waiter: Waiter = .{
.cancelable = .{ .queue = mutex.queue, .cancel = &Mutex.Waiter.canceled },
.mutex = mutex,
};
ev.yield(.{ .mutex_wait = &waiter });
try waiter.cancelable.acknowledge(waiter.sleeper.fiber);
}
fn lockUncancelable(mutex: *Mutex, ev: *Evented) void {
switch (@atomicRmw(usize, &mutex.num_waiters, .Add, 1, .acquire)) {
0 => {},
else => {
@branchHint(.unlikely);
var waiter: Waiter = .{ .cancelable = .blocked, .mutex = mutex };
ev.yield(.{ .mutex_wait = &waiter });
waiter.cancelable.check(waiter.sleeper.fiber) catch |err| switch (err) {
error.Canceled => unreachable, // blocked
};
},
}
if (mutex.tryLock()) return;
var waiter: Waiter = .{ .cancelable = .blocked, .mutex = mutex };
ev.yield(.{ .mutex_wait = &waiter });
waiter.cancelable.acknowledge(waiter.sleeper.fiber) catch |err| switch (err) {
error.Canceled => unreachable, // blocked
};
}
fn unlock(mutex: *Mutex) void {
switch (@atomicRmw(usize, &mutex.num_waiters, .Sub, 1, .release)) {
0 => unreachable,
1 => {},
else => {
@branchHint(.unlikely);
mutex.queue.async(mutex, &Waiter.remove);
},
const state = @atomicRmw(State, &mutex.state, .And, .{
.locked = false,
.num_waiters = std.math.maxInt(State.NumWaiters),
}, .release);
if (state.num_waiters > 0) {
@branchHint(.unlikely);
mutex.queue.async(mutex, &Waiter.remove);
}
}
};
@ -1517,10 +1554,10 @@ const Futex = struct {
};
}
fn tryAdd(waiter: *Waiter) Cancelable.AwaitError!void {
fn tryAdd(waiter: *Waiter) Cancelable.RequestedError!void {
if (@atomicLoad(u32, waiter.ptr, .monotonic) != waiter.expected)
return error.CancelRequested;
try waiter.cancelable.await(waiter.sleeper.fiber);
try waiter.cancelable.enter(waiter.sleeper.fiber);
const futex = waiter.futex;
switch (waiter.timeout) {
.FOREVER => {},
@ -1542,46 +1579,28 @@ const Futex = struct {
fn canceled(context: ?*anyopaque) callconv(.c) void {
const cancelable: *Cancelable = @ptrCast(@alignCast(context));
cancelable.cancel = Cancelable.is_requested;
const waiter: *Waiter = @fieldParentPtr("cancelable", cancelable);
assert(@atomicRmw(
Fiber.CancelStatus,
&waiter.sleeper.fiber.cancel_status,
.Xchg,
.{ .requested = true, .awaiting = .nothing },
.monotonic,
) == Fiber.CancelStatus{ .requested = true, .awaiting = .fromCancelable(cancelable) });
cancelable.requested(waiter.sleeper.fiber);
const futex = waiter.futex;
waiter.removeUncancelable();
waiter.remove();
assert(@atomicRmw(usize, &futex.num_waiters, .Sub, 1, .monotonic) >= 1);
}
fn timedOut(context: ?*anyopaque) callconv(.c) void {
const waiter: *Waiter = @ptrCast(@alignCast(context));
const futex = waiter.futex;
waiter.remove() catch |err| switch (err) {
waiter.tryRemove() catch |err| switch (err) {
error.CancelRequested => return,
};
assert(@atomicRmw(usize, &futex.num_waiters, .Sub, 1, .monotonic) >= 1);
}
fn remove(waiter: *Waiter) Cancelable.AwaitError!void {
if (waiter.cancelable.cancel != Cancelable.is_blocked) {
@branchHint(.likely);
const cancel_status = @atomicRmw(
Fiber.CancelStatus,
&waiter.sleeper.fiber.cancel_status,
.And,
.{ .requested = true, .awaiting = .nothing },
.monotonic,
);
assert(cancel_status.awaiting.toCancelable() == &waiter.cancelable);
if (cancel_status.requested) return error.CancelRequested;
}
waiter.removeUncancelable();
fn tryRemove(waiter: *Waiter) Cancelable.RequestedError!void {
try waiter.cancelable.leave(waiter.sleeper.fiber);
waiter.remove();
}
fn removeUncancelable(waiter: *Waiter) void {
fn remove(waiter: *Waiter) void {
waiter.futex.waiters.remove(&waiter.node);
if (waiter.timer) |timer| timer.cancel() else wake(waiter);
}
@ -1614,7 +1633,7 @@ const Futex = struct {
@branchHint(.unlikely);
continue;
}
waiter.remove() catch |err| switch (err) {
waiter.tryRemove() catch |err| switch (err) {
error.CancelRequested => continue,
};
num_removed += 1;
@ -1720,7 +1739,7 @@ fn futexWait(
.leeway = ev.leeway,
};
ev.yield(.{ .futex_wait = &waiter });
try waiter.cancelable.check(waiter.sleeper.fiber);
try waiter.cancelable.acknowledge(waiter.sleeper.fiber);
}
fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void {
@ -1734,7 +1753,7 @@ fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32)
.leeway = ev.leeway,
};
ev.yield(.{ .futex_wait = &waiter });
waiter.cancelable.check(waiter.sleeper.fiber) catch |err| switch (err) {
waiter.cancelable.acknowledge(waiter.sleeper.fiber) catch |err| switch (err) {
error.Canceled => unreachable, // blocked
};
}

View file

@ -39,14 +39,13 @@ pub const once_t = enum(isize) {
_,
pub inline fn once(predicate: *once_t, context: ?*anyopaque, function: function_t) void {
if (@atomicLoad(once_t, predicate, .unordered) != .done) {
if (predicate.* != .done) {
@branchHint(.unlikely);
once_f(predicate, context, function);
} else asm volatile ("" ::: .{ .memory = true });
switch (builtin.mode) {
.Debug, .ReleaseSafe => {},
.ReleaseFast, .ReleaseSmall => if (@atomicLoad(once_t, predicate, .unordered) != .done)
unreachable,
.ReleaseFast, .ReleaseSmall => if (predicate.* != .done) unreachable,
}
}
};
@ -110,6 +109,7 @@ const queue_s = opaque {
pub const get_current = get_current_queue;
pub const get_main = get_main_queue;
pub const get_global = get_global_queue;
pub const TARGET_DEFAULT = TARGET_QUEUE_DEFAULT;
pub const create_with_target = queue_create_with_target;
pub const create = queue_create;
pub const get_label = queue_get_label;