diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig index 49df71ef94..81b9d207f3 100644 --- a/lib/std/Io/Dispatch.zig +++ b/lib/std/Io/Dispatch.zig @@ -280,7 +280,7 @@ const Fiber = struct { .select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) { ev.queue.async(fiber, &Fiber.@"resume"); }, - _ => |awaiting| awaiting.toCancelable().canceled(), + _ => |awaiting| awaiting.toCancelable().async(), } } @@ -484,7 +484,7 @@ pub fn io(ev: *Evented) Io { pub const InitOptions = struct { backing_allocator_needs_mutex: bool = true, - queue: ?c.dispatch.queue_t = null, + target_queue: ?c.dispatch.queue_t = .TARGET_DEFAULT, /// Upper limit on the allowable delay in processing timeouts in order to improve power /// consumption and system performance. leeway: Io.Duration = .fromMilliseconds(10), @@ -499,11 +499,11 @@ pub const InitOptions = struct { }; pub fn init(ev: *Evented, backing_allocator: Allocator, options: InitOptions) !void { - const queue = if (options.queue) |queue| queue: { - queue.as_object().retain(); - break :queue queue; - } else c.dispatch.queue_create("org.ziglang.std.Io.Dispatch", .CONCURRENT()) orelse - return error.SystemResources; + const queue = c.dispatch.queue_create_with_target( + "org.ziglang.std.Io.Dispatch", + .CONCURRENT(), + options.target_queue, + ) orelse return error.SystemResources; errdefer queue.as_object().release(); const main_loop_stack = try backing_allocator.alignedAlloc( u8, @@ -727,9 +727,9 @@ const Cancelable = struct { const blocked: Cancelable = .{ .queue = undefined, .cancel = is_blocked }; - const AwaitError = error{CancelRequested}; + const RequestedError = error{CancelRequested}; - fn await(cancelable: *Cancelable, fiber: *Fiber) AwaitError!void { + fn enter(cancelable: *Cancelable, fiber: *Fiber) RequestedError!void { const function = cancelable.cancel; assert(function != is_requested); if (function == is_blocked) { @@ -750,13 +750,42 @@ const Cancelable = struct { } } - fn canceled(cancelable: *Cancelable) void { - assert(cancelable.cancel != is_blocked); - assert(cancelable.cancel != is_requested); - cancelable.queue.async(cancelable, cancelable.cancel); + fn leave(cancelable: *Cancelable, fiber: *Fiber) RequestedError!void { + const function = cancelable.cancel; + assert(function != is_requested); + if (function == is_blocked) { + @branchHint(.unlikely); + return; + } + const cancel_status = @atomicRmw(Fiber.CancelStatus, &fiber.cancel_status, .And, .{ + .requested = true, + .awaiting = .nothing, + }, .monotonic); + assert(cancel_status.awaiting.toCancelable() == cancelable); + if (cancel_status.requested) return error.CancelRequested; } - fn check(cancelable: *Cancelable, fiber: *Fiber) Io.Cancelable!void { + fn async(cancelable: *Cancelable) void { + const function = cancelable.cancel; + assert(function != is_blocked and function != is_requested); + cancelable.queue.async(cancelable, function); + } + + fn requested(cancelable: *Cancelable, fiber: *Fiber) void { + const function = cancelable.cancel; + assert(function != is_blocked and function != is_requested); + assert(@atomicLoad(Fiber.CancelStatus, &fiber.cancel_status, .monotonic) == Fiber.CancelStatus{ + .requested = true, + .awaiting = .fromCancelable(cancelable), + }); + cancelable.cancel = is_requested; + @atomicStore(Fiber.CancelStatus, &fiber.cancel_status, .{ + .requested = true, + .awaiting = .nothing, + }, .monotonic); + } + + fn acknowledge(cancelable: *Cancelable, fiber: *Fiber) Io.Cancelable!void { if (cancelable.cancel == is_requested) { @branchHint(.unlikely); fiber.cancel_protection.acknowledge(); @@ -784,81 +813,97 @@ const Sleeper = struct { }; const Mutex = struct { - /// including the locker - num_waiters: usize, + state: State, queue: c.dispatch.queue_t, waiters: std.DoublyLinkedList, + const State = packed struct(usize) { + locked: bool, + num_waiters: NumWaiters, + + const NumWaiters = @Int(.unsigned, @bitSizeOf(usize) - 1); + }; + const Waiter = struct { sleeper: Sleeper = undefined, cancelable: Cancelable, mutex: *Mutex, - node: std.DoublyLinkedList.Node = .{}, + node: std.DoublyLinkedList.Node = undefined, fn add(context: ?*anyopaque) callconv(.c) void { const waiter: *Waiter = @ptrCast(@alignCast(context)); - waiter.tryAdd() catch |err| switch (err) { + waiter.cancelable.enter(waiter.sleeper.fiber) catch |err| switch (err) { + error.CancelRequested => return waiter.wake(), + }; + var state = @atomicRmw(State, &waiter.mutex.state, .Add, .{ + .locked = false, + .num_waiters = 1, + }, .monotonic); + state.num_waiters += 1; + while (!state.locked) { + @branchHint(.unlikely); + state = @cmpxchgWeak(State, &waiter.mutex.state, state, .{ + .locked = true, + .num_waiters = state.num_waiters - 1, + }, .acquire, .monotonic) orelse break; + } else return waiter.mutex.waiters.append(&waiter.node); + waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) { error.CancelRequested => { - waiter.wake(); - assert(@atomicRmw(usize, &waiter.mutex.num_waiters, .Sub, 1, .monotonic) >= 1); + waiter.node.next = &waiter.node; + return; }, }; - } - - fn tryAdd(waiter: *Waiter) Cancelable.AwaitError!void { - switch (@atomicLoad(usize, &waiter.mutex.num_waiters, .acquire)) { - 0 => unreachable, - 1 => return waiter.wake(), // already locked exclusively - else => try waiter.cancelable.await(waiter.sleeper.fiber), - } - waiter.mutex.waiters.append(&waiter.node); + waiter.wake(); } fn canceled(context: ?*anyopaque) callconv(.c) void { const cancelable: *Cancelable = @ptrCast(@alignCast(context)); - cancelable.cancel = Cancelable.is_requested; const waiter: *Waiter = @fieldParentPtr("cancelable", cancelable); - assert(@atomicRmw( - Fiber.CancelStatus, - &waiter.sleeper.fiber.cancel_status, - .Xchg, - .{ .requested = true, .awaiting = .nothing }, - .monotonic, - ) == Fiber.CancelStatus{ .requested = true, .awaiting = .fromCancelable(cancelable) }); + cancelable.requested(waiter.sleeper.fiber); const mutex = waiter.mutex; - mutex.waiters.remove(&waiter.node); + if (waiter.node.next != &waiter.node) { + @branchHint(.likely); + mutex.waiters.remove(&waiter.node); + assert(@atomicRmw(State, &mutex.state, .Sub, .{ + .locked = false, + .num_waiters = 1, + }, .monotonic).num_waiters >= 1); + } + waiter.node = undefined; waiter.wake(); - assert(@atomicRmw(usize, &mutex.num_waiters, .Sub, 1, .monotonic) >= 1); } fn remove(context: ?*anyopaque) callconv(.c) void { const mutex: *Mutex = @ptrCast(@alignCast(context)); - var stop_node: ?*std.DoublyLinkedList.Node = null; - while (mutex.waiters.first != stop_node) { + var state = @atomicLoad(State, &mutex.state, .monotonic); + while (!state.locked and state.num_waiters > 0) { @branchHint(.likely); - const waiter: *Waiter = @fieldParentPtr("node", mutex.waiters.popFirst().?); - if (waiter.cancelable.cancel != Cancelable.is_blocked) { - @branchHint(.likely); - const cancel_status = @atomicRmw( - Fiber.CancelStatus, - &waiter.sleeper.fiber.cancel_status, - .And, - .{ .requested = true, .awaiting = .nothing }, - .monotonic, - ); - assert(cancel_status.awaiting.toCancelable() == &waiter.cancelable); - if (cancel_status.requested) { - @branchHint(.unlikely); - // carefully place the hot potato out of the way - mutex.waiters.append(&waiter.node); - if (stop_node == null) stop_node = &waiter.node; + state = @cmpxchgWeak(State, &mutex.state, state, .{ + .locked = true, + .num_waiters = state.num_waiters - 1, + }, .acquire, .monotonic) orelse break; + } else return; + var num_removed: State.NumWaiters = 0; + while (mutex.waiters.popFirst()) |node| { + @branchHint(.likely); + const waiter: *Waiter = @fieldParentPtr("node", node); + node.* = undefined; + waiter.cancelable.leave(waiter.sleeper.fiber) catch |err| switch (err) { + error.CancelRequested => { + num_removed += 1; + node.next = node; continue; - } - } - waiter.wake(); - return; + }, + }; + break; + } + if (num_removed > 0) { + @branchHint(.unlikely); + assert(@atomicRmw(State, &mutex.state, .Sub, .{ + .locked = false, + .num_waiters = num_removed, + }, .monotonic).num_waiters >= num_removed); } - // everyone is about to die, nobody will wake up ;-( } fn wake(waiter: *Waiter) void { @@ -868,7 +913,7 @@ const Mutex = struct { fn init(mutex: *Mutex, queue: c.dispatch.queue_t) error{SystemResources}!void { mutex.* = .{ - .num_waiters = 0, + .state = .{ .locked = false, .num_waiters = 0 }, .queue = c.dispatch.queue_create_with_target( "org.ziglang.std.Io.Dispatch.Mutex", .SERIAL(), @@ -879,56 +924,48 @@ const Mutex = struct { } fn deinit(mutex: *Mutex) void { - assert(mutex.num_waiters == 0 and mutex.waiters.first == null and mutex.waiters.last == null); + assert(mutex.state == State{ .locked = false, .num_waiters = 0 }); + assert(mutex.waiters.first == null and mutex.waiters.last == null); mutex.queue.as_object().release(); mutex.* = undefined; } fn tryLock(mutex: *Mutex) bool { - if (@cmpxchgWeak(usize, &mutex.num_waiters, 0, 1, .acquire, .monotonic) == null) { - @branchHint(.likely); - return true; + const state = + @atomicRmw(State, &mutex.state, .Or, .{ .locked = true, .num_waiters = 0 }, .acquire); + if (state.locked) { + @branchHint(.unlikely); } - return false; + return !state.locked; } fn lock(mutex: *Mutex, ev: *Evented) Io.Cancelable!void { - switch (@atomicRmw(usize, &mutex.num_waiters, .Add, 1, .acquire)) { - 0 => {}, - else => { - @branchHint(.unlikely); - var waiter: Waiter = .{ - .cancelable = .{ .queue = mutex.queue, .cancel = &Mutex.Waiter.canceled }, - .mutex = mutex, - }; - ev.yield(.{ .mutex_wait = &waiter }); - try waiter.cancelable.check(waiter.sleeper.fiber); - }, - } + if (mutex.tryLock()) return; + var waiter: Waiter = .{ + .cancelable = .{ .queue = mutex.queue, .cancel = &Mutex.Waiter.canceled }, + .mutex = mutex, + }; + ev.yield(.{ .mutex_wait = &waiter }); + try waiter.cancelable.acknowledge(waiter.sleeper.fiber); } fn lockUncancelable(mutex: *Mutex, ev: *Evented) void { - switch (@atomicRmw(usize, &mutex.num_waiters, .Add, 1, .acquire)) { - 0 => {}, - else => { - @branchHint(.unlikely); - var waiter: Waiter = .{ .cancelable = .blocked, .mutex = mutex }; - ev.yield(.{ .mutex_wait = &waiter }); - waiter.cancelable.check(waiter.sleeper.fiber) catch |err| switch (err) { - error.Canceled => unreachable, // blocked - }; - }, - } + if (mutex.tryLock()) return; + var waiter: Waiter = .{ .cancelable = .blocked, .mutex = mutex }; + ev.yield(.{ .mutex_wait = &waiter }); + waiter.cancelable.acknowledge(waiter.sleeper.fiber) catch |err| switch (err) { + error.Canceled => unreachable, // blocked + }; } fn unlock(mutex: *Mutex) void { - switch (@atomicRmw(usize, &mutex.num_waiters, .Sub, 1, .release)) { - 0 => unreachable, - 1 => {}, - else => { - @branchHint(.unlikely); - mutex.queue.async(mutex, &Waiter.remove); - }, + const state = @atomicRmw(State, &mutex.state, .And, .{ + .locked = false, + .num_waiters = std.math.maxInt(State.NumWaiters), + }, .release); + if (state.num_waiters > 0) { + @branchHint(.unlikely); + mutex.queue.async(mutex, &Waiter.remove); } } }; @@ -1517,10 +1554,10 @@ const Futex = struct { }; } - fn tryAdd(waiter: *Waiter) Cancelable.AwaitError!void { + fn tryAdd(waiter: *Waiter) Cancelable.RequestedError!void { if (@atomicLoad(u32, waiter.ptr, .monotonic) != waiter.expected) return error.CancelRequested; - try waiter.cancelable.await(waiter.sleeper.fiber); + try waiter.cancelable.enter(waiter.sleeper.fiber); const futex = waiter.futex; switch (waiter.timeout) { .FOREVER => {}, @@ -1542,46 +1579,28 @@ const Futex = struct { fn canceled(context: ?*anyopaque) callconv(.c) void { const cancelable: *Cancelable = @ptrCast(@alignCast(context)); - cancelable.cancel = Cancelable.is_requested; const waiter: *Waiter = @fieldParentPtr("cancelable", cancelable); - assert(@atomicRmw( - Fiber.CancelStatus, - &waiter.sleeper.fiber.cancel_status, - .Xchg, - .{ .requested = true, .awaiting = .nothing }, - .monotonic, - ) == Fiber.CancelStatus{ .requested = true, .awaiting = .fromCancelable(cancelable) }); + cancelable.requested(waiter.sleeper.fiber); const futex = waiter.futex; - waiter.removeUncancelable(); + waiter.remove(); assert(@atomicRmw(usize, &futex.num_waiters, .Sub, 1, .monotonic) >= 1); } fn timedOut(context: ?*anyopaque) callconv(.c) void { const waiter: *Waiter = @ptrCast(@alignCast(context)); const futex = waiter.futex; - waiter.remove() catch |err| switch (err) { + waiter.tryRemove() catch |err| switch (err) { error.CancelRequested => return, }; assert(@atomicRmw(usize, &futex.num_waiters, .Sub, 1, .monotonic) >= 1); } - fn remove(waiter: *Waiter) Cancelable.AwaitError!void { - if (waiter.cancelable.cancel != Cancelable.is_blocked) { - @branchHint(.likely); - const cancel_status = @atomicRmw( - Fiber.CancelStatus, - &waiter.sleeper.fiber.cancel_status, - .And, - .{ .requested = true, .awaiting = .nothing }, - .monotonic, - ); - assert(cancel_status.awaiting.toCancelable() == &waiter.cancelable); - if (cancel_status.requested) return error.CancelRequested; - } - waiter.removeUncancelable(); + fn tryRemove(waiter: *Waiter) Cancelable.RequestedError!void { + try waiter.cancelable.leave(waiter.sleeper.fiber); + waiter.remove(); } - fn removeUncancelable(waiter: *Waiter) void { + fn remove(waiter: *Waiter) void { waiter.futex.waiters.remove(&waiter.node); if (waiter.timer) |timer| timer.cancel() else wake(waiter); } @@ -1614,7 +1633,7 @@ const Futex = struct { @branchHint(.unlikely); continue; } - waiter.remove() catch |err| switch (err) { + waiter.tryRemove() catch |err| switch (err) { error.CancelRequested => continue, }; num_removed += 1; @@ -1720,7 +1739,7 @@ fn futexWait( .leeway = ev.leeway, }; ev.yield(.{ .futex_wait = &waiter }); - try waiter.cancelable.check(waiter.sleeper.fiber); + try waiter.cancelable.acknowledge(waiter.sleeper.fiber); } fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void { @@ -1734,7 +1753,7 @@ fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) .leeway = ev.leeway, }; ev.yield(.{ .futex_wait = &waiter }); - waiter.cancelable.check(waiter.sleeper.fiber) catch |err| switch (err) { + waiter.cancelable.acknowledge(waiter.sleeper.fiber) catch |err| switch (err) { error.Canceled => unreachable, // blocked }; } diff --git a/lib/std/c/darwin/dispatch.zig b/lib/std/c/darwin/dispatch.zig index 2bd44b9f01..68b0b48782 100644 --- a/lib/std/c/darwin/dispatch.zig +++ b/lib/std/c/darwin/dispatch.zig @@ -39,14 +39,13 @@ pub const once_t = enum(isize) { _, pub inline fn once(predicate: *once_t, context: ?*anyopaque, function: function_t) void { - if (@atomicLoad(once_t, predicate, .unordered) != .done) { + if (predicate.* != .done) { @branchHint(.unlikely); once_f(predicate, context, function); } else asm volatile ("" ::: .{ .memory = true }); switch (builtin.mode) { .Debug, .ReleaseSafe => {}, - .ReleaseFast, .ReleaseSmall => if (@atomicLoad(once_t, predicate, .unordered) != .done) - unreachable, + .ReleaseFast, .ReleaseSmall => if (predicate.* != .done) unreachable, } } }; @@ -110,6 +109,7 @@ const queue_s = opaque { pub const get_current = get_current_queue; pub const get_main = get_main_queue; pub const get_global = get_global_queue; + pub const TARGET_DEFAULT = TARGET_QUEUE_DEFAULT; pub const create_with_target = queue_create_with_target; pub const create = queue_create; pub const get_label = queue_get_label;