From 4c4e9d054e37afe82a856f5845c548f177996bd4 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Mon, 2 Feb 2026 20:18:14 -0800 Subject: [PATCH] std.Io: add RwLock and Semaphore sync primitives and restore usage by std.debug.SelfInfo.Elf --- lib/std/Io.zig | 15 ++- lib/std/Io/RwLock.zig | 238 +++++++++++++++++++++++++++++++++ lib/std/Io/Semaphore.zig | 65 +++++++++ lib/std/Io/Threaded.zig | 8 +- lib/std/debug/SelfInfo/Elf.zig | 36 ++--- 5 files changed, 342 insertions(+), 20 deletions(-) create mode 100644 lib/std/Io/RwLock.zig create mode 100644 lib/std/Io/Semaphore.zig diff --git a/lib/std/Io.zig b/lib/std/Io.zig index ea182c6803..22ca78fd2b 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -47,6 +47,9 @@ pub const Dir = @import("Io/Dir.zig"); pub const File = @import("Io/File.zig"); pub const Terminal = @import("Io/Terminal.zig"); +pub const RwLock = @import("Io/RwLock.zig"); +pub const Semaphore = @import("Io/Semaphore.zig"); + pub const VTable = struct { /// If it returns `null` it means `result` has been already populated and /// `await` will be a no-op. @@ -882,7 +885,7 @@ pub const Timeout = union(enum) { pub const Error = error{ Timeout, UnsupportedClock }; - pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp { + pub fn toTimestamp(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp { return switch (t) { .none => null, .duration => |d| try .fromNow(io, d), @@ -890,6 +893,14 @@ pub const Timeout = union(enum) { }; } + pub fn toDeadline(t: Timeout, io: Io) Timeout { + return switch (t) { + .none => .none, + .duration => |d| .{ .deadline = Clock.Timestamp.fromNow(io, d) catch @panic("TODO") }, + .deadline => |d| .{ .deadline = d }, + }; + } + pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration { return switch (t) { .none => null, @@ -2153,5 +2164,7 @@ test { _ = Writer; _ = Evented; _ = Threaded; + _ = RwLock; + _ = Semaphore; _ = @import("Io/test.zig"); } diff --git a/lib/std/Io/RwLock.zig b/lib/std/Io/RwLock.zig new file mode 100644 index 0000000000..7be4f026e1 --- /dev/null +++ b/lib/std/Io/RwLock.zig @@ -0,0 +1,238 @@ +//! A lock that supports one writer or many readers. +const RwLock = @This(); + +const builtin = @import("builtin"); + +const std = @import("../std.zig"); +const Io = std.Io; +const assert = std.debug.assert; +const testing = std.testing; + +state: usize, +mutex: Io.Mutex, +semaphore: Io.Semaphore, + +pub const init: RwLock = .{ + .state = 0, + .mutex = .init, + .semaphore = .{}, +}; + +const is_writing: usize = 1; +const writer: usize = 1 << 1; +const reader: usize = 1 << (1 + @bitSizeOf(Count)); +const writer_mask: usize = std.math.maxInt(Count) << @ctz(writer); +const reader_mask: usize = std.math.maxInt(Count) << @ctz(reader); +const Count = @Int(.unsigned, @divFloor(@bitSizeOf(usize) - 1, 2)); + +pub fn tryLock(rl: *RwLock, io: Io) bool { + if (rl.mutex.tryLock()) { + const state = @atomicLoad(usize, &rl.state, .seq_cst); + if (state & reader_mask == 0) { + _ = @atomicRmw(usize, &rl.state, .Or, is_writing, .seq_cst); + return true; + } + + rl.mutex.unlock(io); + } + + return false; +} + +pub fn lockUncancelable(rl: *RwLock, io: Io) void { + _ = @atomicRmw(usize, &rl.state, .Add, writer, .seq_cst); + rl.mutex.lockUncancelable(io); + + const state = @atomicRmw(usize, &rl.state, .Add, is_writing -% writer, .seq_cst); + if (state & reader_mask != 0) + rl.semaphore.waitUncancelable(io); +} + +pub fn unlock(rl: *RwLock, io: Io) void { + _ = @atomicRmw(usize, &rl.state, .And, ~is_writing, .seq_cst); + rl.mutex.unlock(io); +} + +pub fn tryLockShared(rl: *RwLock, io: Io) bool { + const state = @atomicLoad(usize, &rl.state, .seq_cst); + if (state & (is_writing | writer_mask) == 0) { + _ = @cmpxchgStrong( + usize, + &rl.state, + state, + state + reader, + .seq_cst, + .seq_cst, + ) orelse return true; + } + + if (rl.mutex.tryLock()) { + _ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst); + rl.mutex.unlock(io); + return true; + } + + return false; +} + +pub fn lockSharedUncancelable(rl: *RwLock, io: Io) void { + var state = @atomicLoad(usize, &rl.state, .seq_cst); + while (state & (is_writing | writer_mask) == 0) { + state = @cmpxchgWeak( + usize, + &rl.state, + state, + state + reader, + .seq_cst, + .seq_cst, + ) orelse return; + } + + rl.mutex.lockUncancelable(io); + _ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst); + rl.mutex.unlock(io); +} + +pub fn unlockShared(rl: *RwLock, io: Io) void { + const state = @atomicRmw(usize, &rl.state, .Sub, reader, .seq_cst); + + if ((state & reader_mask == reader) and (state & is_writing != 0)) + rl.semaphore.post(io); +} + +test "internal state" { + const io = testing.io; + + var rl: Io.RwLock = .init; + + // The following failed prior to the fix for Issue #13163, + // where the WRITER flag was subtracted by the lock method. + + rl.lockUncancelable(io); + rl.unlock(io); + try testing.expectEqual(rl, Io.RwLock.init); +} + +test "smoke test" { + const io = testing.io; + + var rl: Io.RwLock = .init; + + rl.lockUncancelable(io); + try testing.expect(!rl.tryLock(io)); + try testing.expect(!rl.tryLockShared(io)); + rl.unlock(io); + + try testing.expect(rl.tryLock(io)); + try testing.expect(!rl.tryLock(io)); + try testing.expect(!rl.tryLockShared(io)); + rl.unlock(io); + + rl.lockSharedUncancelable(io); + try testing.expect(!rl.tryLock(io)); + try testing.expect(rl.tryLockShared(io)); + rl.unlockShared(io); + rl.unlockShared(io); + + try testing.expect(rl.tryLockShared(io)); + try testing.expect(!rl.tryLock(io)); + try testing.expect(rl.tryLockShared(io)); + rl.unlockShared(io); + rl.unlockShared(io); + + rl.lockUncancelable(io); + rl.unlock(io); +} + +test "concurrent access" { + if (builtin.single_threaded) return; + + const io = testing.io; + const num_writers: usize = 2; + const num_readers: usize = 4; + const num_writes: usize = 1000; + const num_reads: usize = 2000; + + const Runner = struct { + const Runner = @This(); + + io: Io, + + rl: Io.RwLock, + writes: usize, + reads: std.atomic.Value(usize), + + val_a: usize, + val_b: usize, + + fn reader(run: *Runner, thread_idx: usize) !void { + var prng = std.Random.DefaultPrng.init(thread_idx); + const rnd = prng.random(); + while (true) { + run.rl.lockSharedUncancelable(run.io); + defer run.rl.unlockShared(run.io); + + try testing.expect(run.writes <= num_writes); + if (run.reads.fetchAdd(1, .monotonic) >= num_reads) break; + + // We use `volatile` accesses so that we can make sure the memory is accessed either + // side of a yield, maximising chances of a race. + const a_ptr: *const volatile usize = &run.val_a; + const b_ptr: *const volatile usize = &run.val_b; + + const old_a = a_ptr.*; + if (rnd.boolean()) try std.Thread.yield(); + const old_b = b_ptr.*; + try testing.expect(old_a == old_b); + } + } + + fn writer(run: *Runner, thread_idx: usize) !void { + var prng = std.Random.DefaultPrng.init(thread_idx); + const rnd = prng.random(); + while (true) { + run.rl.lockUncancelable(run.io); + defer run.rl.unlock(run.io); + + try testing.expect(run.writes <= num_writes); + if (run.writes == num_writes) break; + + // We use `volatile` accesses so that we can make sure the memory is accessed either + // side of a yield, maximising chances of a race. + const a_ptr: *volatile usize = &run.val_a; + const b_ptr: *volatile usize = &run.val_b; + + const new_val = rnd.int(usize); + + const old_a = a_ptr.*; + a_ptr.* = new_val; + if (rnd.boolean()) try std.Thread.yield(); + const old_b = b_ptr.*; + b_ptr.* = new_val; + try testing.expect(old_a == old_b); + + run.writes += 1; + } + } + }; + + var run: Runner = .{ + .io = io, + .rl = .init, + .writes = 0, + .reads = .init(0), + .val_a = 0, + .val_b = 0, + }; + var write_threads: [num_writers]std.Thread = undefined; + var read_threads: [num_readers]std.Thread = undefined; + + for (&write_threads, 0..) |*t, i| t.* = try .spawn(.{}, Runner.writer, .{ &run, i }); + for (&read_threads, num_writers..) |*t, i| t.* = try .spawn(.{}, Runner.reader, .{ &run, i }); + + for (write_threads) |t| t.join(); + for (read_threads) |t| t.join(); + + try testing.expect(run.writes == num_writes); + try testing.expect(run.reads.raw >= num_reads); +} diff --git a/lib/std/Io/Semaphore.zig b/lib/std/Io/Semaphore.zig new file mode 100644 index 0000000000..248e6ab4d0 --- /dev/null +++ b/lib/std/Io/Semaphore.zig @@ -0,0 +1,65 @@ +//! An unsigned integer that blocks the kernel thread if the number would +//! become negative. +//! +//! This API supports static initialization and does not require deinitialization. +const Semaphore = @This(); + +const builtin = @import("builtin"); + +const std = @import("../std.zig"); +const Io = std.Io; +const testing = std.testing; + +mutex: Io.Mutex = .init, +cond: Io.Condition = .init, +/// It is OK to initialize this field to any value. +permits: usize = 0, + +pub fn wait(s: *Semaphore, io: Io) Io.Cancelable!void { + try s.mutex.lock(io); + defer s.mutex.unlock(io); + while (s.permits == 0) try s.cond.wait(io, &s.mutex); + s.permits -= 1; + if (s.permits > 0) s.cond.signal(io); +} + +pub fn waitUncancelable(s: *Semaphore, io: Io) void { + s.mutex.lockUncancelable(io); + defer s.mutex.unlock(io); + while (s.permits == 0) s.cond.waitUncancelable(io, &s.mutex); + s.permits -= 1; + if (s.permits > 0) s.cond.signal(io); +} + +pub fn post(s: *Semaphore, io: Io) void { + s.mutex.lockUncancelable(io); + defer s.mutex.unlock(io); + + s.permits += 1; + s.cond.signal(io); +} + +test Semaphore { + if (builtin.single_threaded) return error.SkipZigTest; + const io = testing.io; + + const TestContext = struct { + sem: *Semaphore, + n: *i32, + fn worker(ctx: *@This()) !void { + try ctx.sem.wait(io); + ctx.n.* += 1; + ctx.sem.post(io); + } + }; + const num_threads = 3; + var sem: Semaphore = .{ .permits = 1 }; + var threads: [num_threads]std.Thread = undefined; + var n: i32 = 0; + var ctx = TestContext{ .sem = &sem, .n = &n }; + + for (&threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx}); + for (threads) |t| t.join(); + try sem.wait(io); + try testing.expect(n == num_threads); +} diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index b2ff753384..053a87b27c 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -2655,7 +2655,7 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { - const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) { + const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t)) catch |err| switch (err) { error.Unexpected => deadline: { recoverableOsBugDetected(); break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake }; @@ -2754,7 +2754,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout else => {}, } const t_io = ioBasic(t); - const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock; + const deadline = timeout.toTimestamp(t_io) catch return error.UnsupportedClock; while (true) { const timeout_ms: i32 = t: { if (b.completions.head != .none) { @@ -10918,7 +10918,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (timeout == .none) return; - if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t))); + if (use_parking_sleep) return parking_sleep.sleep(try timeout.toTimestamp(ioBasic(t))); if (native_os == .wasi) return sleepWasi(t, timeout); if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout); return sleepNanosleep(t, timeout); @@ -12630,7 +12630,7 @@ fn netReceivePosix( var message_i: usize = 0; var data_i: usize = 0; - const deadline = timeout.toDeadline(t_io) catch |err| return .{ err, message_i }; + const deadline = timeout.toTimestamp(t_io) catch |err| return .{ err, message_i }; recv: while (true) { if (message_buffer.len - message_i == 0) return .{ null, message_i }; diff --git a/lib/std/debug/SelfInfo/Elf.zig b/lib/std/debug/SelfInfo/Elf.zig index a2868fcee1..40841bac2c 100644 --- a/lib/std/debug/SelfInfo/Elf.zig +++ b/lib/std/debug/SelfInfo/Elf.zig @@ -1,4 +1,4 @@ -mutex: Io.Mutex, +rwlock: Io.RwLock, modules: std.ArrayList(Module), ranges: std.ArrayList(Module.Range), @@ -6,7 +6,7 @@ ranges: std.ArrayList(Module.Range), unwind_cache: if (can_unwind) ?[]Dwarf.SelfUnwinder.CacheEntry else ?noreturn, pub const init: SelfInfo = .{ - .mutex = .init, + .rwlock = .init, .modules = .empty, .ranges = .empty, .unwind_cache = null, @@ -30,7 +30,7 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void { pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol { const module = try si.findModule(gpa, io, address, .exclusive); - defer si.mutex.unlock(io); + defer si.rwlock.unlock(io); const vaddr = address - module.load_offset; @@ -75,13 +75,13 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st } pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 { const module = try si.findModule(gpa, io, address, .shared); - defer si.mutex.unlock(io); + defer si.rwlock.unlockShared(io); if (module.name.len == 0) return error.MissingDebugInfo; return module.name; } pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize { const module = try si.findModule(gpa, io, address, .shared); - defer si.mutex.unlock(io); + defer si.rwlock.unlockShared(io); return module.load_offset; } @@ -183,8 +183,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex comptime assert(can_unwind); { - try si.mutex.lock(io); - defer si.mutex.unlock(io); + si.rwlock.lockSharedUncancelable(io); + defer si.rwlock.unlockShared(io); if (si.unwind_cache) |cache| { if (Dwarf.SelfUnwinder.CacheEntry.find(cache, context.pc)) |entry| { return context.next(gpa, entry); @@ -193,7 +193,7 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex } const module = try si.findModule(gpa, io, context.pc, .exclusive); - defer si.mutex.unlock(io); + defer si.rwlock.unlock(io); if (si.unwind_cache == null) { si.unwind_cache = try gpa.alloc(Dwarf.SelfUnwinder.CacheEntry, 2048); @@ -378,8 +378,8 @@ const Module = struct { fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum { shared, exclusive }) Error!*Module { // With the requested lock, scan the module ranges looking for `address`. switch (lock) { - .shared => try si.mutex.lock(io), - .exclusive => try si.mutex.lock(io), + .shared => si.rwlock.lockSharedUncancelable(io), + .exclusive => si.rwlock.lockUncancelable(io), } for (si.ranges.items) |*range| { if (address >= range.start and address < range.start + range.len) { @@ -389,12 +389,15 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum // The address wasn't in a known range. We will rebuild the module/range lists, since it's possible // a new module was loaded. Upgrade to an exclusive lock if necessary. switch (lock) { - .shared => {}, + .shared => { + si.rwlock.unlockShared(io); + si.rwlock.lockUncancelable(io); + }, .exclusive => {}, } // Rebuild module list with the exclusive lock. { - errdefer si.mutex.unlock(io); + errdefer si.rwlock.unlock(io); for (si.modules.items) |*mod| { unwind: { const u = &(mod.unwind orelse break :unwind catch break :unwind); @@ -412,7 +415,10 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum } // Downgrade the lock back to shared if necessary. switch (lock) { - .shared => {}, + .shared => { + si.rwlock.unlock(io); + si.rwlock.lockSharedUncancelable(io); + }, .exclusive => {}, } // Scan the newly rebuilt module ranges. @@ -423,8 +429,8 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum } // Still nothing; unlock and error. switch (lock) { - .shared => si.mutex.unlock(io), - .exclusive => si.mutex.unlock(io), + .shared => si.rwlock.unlockShared(io), + .exclusive => si.rwlock.unlock(io), } return error.MissingDebugInfo; }