Merge pull request 'std.Thread sync primitives roundup' (#31084) from sync-cleanup into master

Reviewed-on: https://codeberg.org/ziglang/zig/pulls/31084
This commit is contained in:
Andrew Kelley 2026-02-03 07:52:02 +01:00
commit e7e700334d
29 changed files with 602 additions and 2957 deletions

View file

@ -408,8 +408,6 @@ set(ZIG_STAGE2_SOURCES
lib/std/Target/wasm.zig
lib/std/Target/x86.zig
lib/std/Thread.zig
lib/std/Thread/Futex.zig
lib/std/Thread/Mutex.zig
lib/std/array_hash_map.zig
lib/std/array_list.zig
lib/std/ascii.zig

View file

@ -30,14 +30,6 @@ pub fn main(init: process.Init.Minimal) !void {
defer _ = debug_gpa_state.deinit();
const gpa = debug_gpa_state.allocator();
// ...but we'll back our arena by `std.heap.page_allocator` for efficiency.
var single_threaded_arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
defer single_threaded_arena.deinit();
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{ .child_allocator = single_threaded_arena.allocator() };
const arena = thread_safe_arena.allocator();
const args = try init.args.toSlice(arena);
var threaded: std.Io.Threaded = .init(gpa, .{
.environ = init.environ,
.argv0 = .init(init.args),
@ -45,6 +37,17 @@ pub fn main(init: process.Init.Minimal) !void {
defer threaded.deinit();
const io = threaded.io();
// ...but we'll back our arena by `std.heap.page_allocator` for efficiency.
var single_threaded_arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
defer single_threaded_arena.deinit();
var thread_safe_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = single_threaded_arena.allocator(),
.io = io,
};
const arena = thread_safe_arena.allocator();
const args = try init.args.toSlice(arena);
// skip my own exe name
var arg_idx: usize = 1;

View file

@ -147,7 +147,8 @@ const ObjectArray = struct {
// It provides thread-safety for on-demand storage of Thread Objects.
const current_thread_storage = struct {
var key: std.c.pthread_key_t = undefined;
var init_once = std.once(current_thread_storage.init);
var init_mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER;
var init_done: bool = false;
/// Return a per thread ObjectArray with at least the expected index.
pub fn getArray(index: usize) *ObjectArray {
@ -183,9 +184,13 @@ const current_thread_storage = struct {
/// Initialize pthread_key_t.
fn init() void {
if (@atomicLoad(bool, &init_done, .monotonic)) return;
_ = std.c.pthread_mutex_lock(&init_mutex);
if (std.c.pthread_key_create(&current_thread_storage.key, current_thread_storage.deinit) != .SUCCESS) {
abort();
}
@atomicStore(bool, &init_done, true, .release);
_ = std.c.pthread_mutex_unlock(&init_mutex);
}
/// Invoked by pthread specific destructor. the passed argument is the ObjectArray pointer.
@ -283,7 +288,7 @@ const emutls_control = extern struct {
/// Get the pointer on allocated storage for emutls variable.
pub fn getPointer(self: *emutls_control) *anyopaque {
// ensure current_thread_storage initialization is done
current_thread_storage.init_once.call();
current_thread_storage.init();
const index = self.getIndex();
var array = current_thread_storage.getArray(index);

View file

@ -632,7 +632,7 @@ export fn fuzzer_main(limit_kind: abi.LimitKind, amount: u64) void {
export fn fuzzer_unslide_address(addr: usize) usize {
const si = std.debug.getSelfDebugInfo() catch @compileError("unsupported");
const slide = si.getModuleSlide(std.debug.getDebugInfoAllocator(), addr) catch |err| {
const slide = si.getModuleSlide(std.debug.getDebugInfoAllocator(), io, addr) catch |err| {
std.debug.panic("failed to find virtual address slide: {t}", .{err});
};
return addr - slide;

View file

@ -47,6 +47,9 @@ pub const Dir = @import("Io/Dir.zig");
pub const File = @import("Io/File.zig");
pub const Terminal = @import("Io/Terminal.zig");
pub const RwLock = @import("Io/RwLock.zig");
pub const Semaphore = @import("Io/Semaphore.zig");
pub const VTable = struct {
/// If it returns `null` it means `result` has been already populated and
/// `await` will be a no-op.
@ -882,7 +885,7 @@ pub const Timeout = union(enum) {
pub const Error = error{ Timeout, UnsupportedClock };
pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
pub fn toTimestamp(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
return switch (t) {
.none => null,
.duration => |d| try .fromNow(io, d),
@ -890,6 +893,14 @@ pub const Timeout = union(enum) {
};
}
pub fn toDeadline(t: Timeout, io: Io) Timeout {
return switch (t) {
.none => .none,
.duration => |d| .{ .deadline = Clock.Timestamp.fromNow(io, d) catch @panic("TODO") },
.deadline => |d| .{ .deadline = d },
};
}
pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
return switch (t) {
.none => null,
@ -2153,5 +2164,7 @@ test {
_ = Writer;
_ = Evented;
_ = Threaded;
_ = RwLock;
_ = Semaphore;
_ = @import("Io/test.zig");
}

238
lib/std/Io/RwLock.zig Normal file
View file

@ -0,0 +1,238 @@
//! A lock that supports one writer or many readers.
const RwLock = @This();
const builtin = @import("builtin");
const std = @import("../std.zig");
const Io = std.Io;
const assert = std.debug.assert;
const testing = std.testing;
state: usize,
mutex: Io.Mutex,
semaphore: Io.Semaphore,
pub const init: RwLock = .{
.state = 0,
.mutex = .init,
.semaphore = .{},
};
const is_writing: usize = 1;
const writer: usize = 1 << 1;
const reader: usize = 1 << (1 + @bitSizeOf(Count));
const writer_mask: usize = std.math.maxInt(Count) << @ctz(writer);
const reader_mask: usize = std.math.maxInt(Count) << @ctz(reader);
const Count = @Int(.unsigned, @divFloor(@bitSizeOf(usize) - 1, 2));
pub fn tryLock(rl: *RwLock, io: Io) bool {
if (rl.mutex.tryLock()) {
const state = @atomicLoad(usize, &rl.state, .seq_cst);
if (state & reader_mask == 0) {
_ = @atomicRmw(usize, &rl.state, .Or, is_writing, .seq_cst);
return true;
}
rl.mutex.unlock(io);
}
return false;
}
pub fn lockUncancelable(rl: *RwLock, io: Io) void {
_ = @atomicRmw(usize, &rl.state, .Add, writer, .seq_cst);
rl.mutex.lockUncancelable(io);
const state = @atomicRmw(usize, &rl.state, .Add, is_writing -% writer, .seq_cst);
if (state & reader_mask != 0)
rl.semaphore.waitUncancelable(io);
}
pub fn unlock(rl: *RwLock, io: Io) void {
_ = @atomicRmw(usize, &rl.state, .And, ~is_writing, .seq_cst);
rl.mutex.unlock(io);
}
pub fn tryLockShared(rl: *RwLock, io: Io) bool {
const state = @atomicLoad(usize, &rl.state, .seq_cst);
if (state & (is_writing | writer_mask) == 0) {
_ = @cmpxchgStrong(
usize,
&rl.state,
state,
state + reader,
.seq_cst,
.seq_cst,
) orelse return true;
}
if (rl.mutex.tryLock()) {
_ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst);
rl.mutex.unlock(io);
return true;
}
return false;
}
pub fn lockSharedUncancelable(rl: *RwLock, io: Io) void {
var state = @atomicLoad(usize, &rl.state, .seq_cst);
while (state & (is_writing | writer_mask) == 0) {
state = @cmpxchgWeak(
usize,
&rl.state,
state,
state + reader,
.seq_cst,
.seq_cst,
) orelse return;
}
rl.mutex.lockUncancelable(io);
_ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst);
rl.mutex.unlock(io);
}
pub fn unlockShared(rl: *RwLock, io: Io) void {
const state = @atomicRmw(usize, &rl.state, .Sub, reader, .seq_cst);
if ((state & reader_mask == reader) and (state & is_writing != 0))
rl.semaphore.post(io);
}
test "internal state" {
const io = testing.io;
var rl: Io.RwLock = .init;
// The following failed prior to the fix for Issue #13163,
// where the WRITER flag was subtracted by the lock method.
rl.lockUncancelable(io);
rl.unlock(io);
try testing.expectEqual(rl, Io.RwLock.init);
}
test "smoke test" {
const io = testing.io;
var rl: Io.RwLock = .init;
rl.lockUncancelable(io);
try testing.expect(!rl.tryLock(io));
try testing.expect(!rl.tryLockShared(io));
rl.unlock(io);
try testing.expect(rl.tryLock(io));
try testing.expect(!rl.tryLock(io));
try testing.expect(!rl.tryLockShared(io));
rl.unlock(io);
rl.lockSharedUncancelable(io);
try testing.expect(!rl.tryLock(io));
try testing.expect(rl.tryLockShared(io));
rl.unlockShared(io);
rl.unlockShared(io);
try testing.expect(rl.tryLockShared(io));
try testing.expect(!rl.tryLock(io));
try testing.expect(rl.tryLockShared(io));
rl.unlockShared(io);
rl.unlockShared(io);
rl.lockUncancelable(io);
rl.unlock(io);
}
test "concurrent access" {
if (builtin.single_threaded) return;
const io = testing.io;
const num_writers: usize = 2;
const num_readers: usize = 4;
const num_writes: usize = 1000;
const num_reads: usize = 2000;
const Runner = struct {
const Runner = @This();
io: Io,
rl: Io.RwLock,
writes: usize,
reads: std.atomic.Value(usize),
val_a: usize,
val_b: usize,
fn reader(run: *Runner, thread_idx: usize) !void {
var prng = std.Random.DefaultPrng.init(thread_idx);
const rnd = prng.random();
while (true) {
run.rl.lockSharedUncancelable(run.io);
defer run.rl.unlockShared(run.io);
try testing.expect(run.writes <= num_writes);
if (run.reads.fetchAdd(1, .monotonic) >= num_reads) break;
// We use `volatile` accesses so that we can make sure the memory is accessed either
// side of a yield, maximising chances of a race.
const a_ptr: *const volatile usize = &run.val_a;
const b_ptr: *const volatile usize = &run.val_b;
const old_a = a_ptr.*;
if (rnd.boolean()) try std.Thread.yield();
const old_b = b_ptr.*;
try testing.expect(old_a == old_b);
}
}
fn writer(run: *Runner, thread_idx: usize) !void {
var prng = std.Random.DefaultPrng.init(thread_idx);
const rnd = prng.random();
while (true) {
run.rl.lockUncancelable(run.io);
defer run.rl.unlock(run.io);
try testing.expect(run.writes <= num_writes);
if (run.writes == num_writes) break;
// We use `volatile` accesses so that we can make sure the memory is accessed either
// side of a yield, maximising chances of a race.
const a_ptr: *volatile usize = &run.val_a;
const b_ptr: *volatile usize = &run.val_b;
const new_val = rnd.int(usize);
const old_a = a_ptr.*;
a_ptr.* = new_val;
if (rnd.boolean()) try std.Thread.yield();
const old_b = b_ptr.*;
b_ptr.* = new_val;
try testing.expect(old_a == old_b);
run.writes += 1;
}
}
};
var run: Runner = .{
.io = io,
.rl = .init,
.writes = 0,
.reads = .init(0),
.val_a = 0,
.val_b = 0,
};
var write_threads: [num_writers]std.Thread = undefined;
var read_threads: [num_readers]std.Thread = undefined;
for (&write_threads, 0..) |*t, i| t.* = try .spawn(.{}, Runner.writer, .{ &run, i });
for (&read_threads, num_writers..) |*t, i| t.* = try .spawn(.{}, Runner.reader, .{ &run, i });
for (write_threads) |t| t.join();
for (read_threads) |t| t.join();
try testing.expect(run.writes == num_writes);
try testing.expect(run.reads.raw >= num_reads);
}

65
lib/std/Io/Semaphore.zig Normal file
View file

@ -0,0 +1,65 @@
//! An unsigned integer that blocks the kernel thread if the number would
//! become negative.
//!
//! This API supports static initialization and does not require deinitialization.
const Semaphore = @This();
const builtin = @import("builtin");
const std = @import("../std.zig");
const Io = std.Io;
const testing = std.testing;
mutex: Io.Mutex = .init,
cond: Io.Condition = .init,
/// It is OK to initialize this field to any value.
permits: usize = 0,
pub fn wait(s: *Semaphore, io: Io) Io.Cancelable!void {
try s.mutex.lock(io);
defer s.mutex.unlock(io);
while (s.permits == 0) try s.cond.wait(io, &s.mutex);
s.permits -= 1;
if (s.permits > 0) s.cond.signal(io);
}
pub fn waitUncancelable(s: *Semaphore, io: Io) void {
s.mutex.lockUncancelable(io);
defer s.mutex.unlock(io);
while (s.permits == 0) s.cond.waitUncancelable(io, &s.mutex);
s.permits -= 1;
if (s.permits > 0) s.cond.signal(io);
}
pub fn post(s: *Semaphore, io: Io) void {
s.mutex.lockUncancelable(io);
defer s.mutex.unlock(io);
s.permits += 1;
s.cond.signal(io);
}
test Semaphore {
if (builtin.single_threaded) return error.SkipZigTest;
const io = testing.io;
const TestContext = struct {
sem: *Semaphore,
n: *i32,
fn worker(ctx: *@This()) !void {
try ctx.sem.wait(io);
ctx.n.* += 1;
ctx.sem.post(io);
}
};
const num_threads = 3;
var sem: Semaphore = .{ .permits = 1 };
var threads: [num_threads]std.Thread = undefined;
var n: i32 = 0;
var ctx = TestContext{ .sem = &sem, .n = &n };
for (&threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
for (threads) |t| t.join();
try sem.wait(io);
try testing.expect(n == num_threads);
}

View file

@ -1126,6 +1126,25 @@ const Thread = struct {
return @ptrFromInt(@as(usize, @bitCast(split)));
}
};
/// Same as `Io.Mutex.lock` but avoids the VTable.
fn mutexLock(m: *Io.Mutex) Io.Cancelable!void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
}
};
const Syscall = struct {
@ -1486,8 +1505,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded;
pub const global_single_threaded: *Threaded = &global_single_threaded_instance;
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
t.async_limit = new_limit;
}
@ -1508,8 +1527,8 @@ pub fn deinit(t: *Threaded) void {
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
t.join_requested = true;
}
condBroadcast(&t.cond);
@ -1574,16 +1593,16 @@ fn worker(t: *Threaded) void {
defer t.wait_group.finish();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
while (true) {
while (t.run_queue.popFirst()) |runnable_node| {
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
thread.cancel_protection = .unblocked;
const runnable: *Runnable = @fieldParentPtr("node", runnable_node);
runnable.startFn(runnable, &thread, t);
mutexLockUncancelable(&t.mutex);
mutexLockInternal(&t.mutex);
t.busy_count -= 1;
}
if (t.join_requested) break;
@ -2004,12 +2023,12 @@ fn async(
},
};
mutexLockUncancelable(&t.mutex);
mutexLockInternal(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2023,7 +2042,7 @@ fn async(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2033,7 +2052,7 @@ fn async(
t.run_queue.prepend(&future.runnable.node);
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
condSignal(&t.cond);
return @ptrCast(future);
}
@ -2056,8 +2075,8 @@ fn concurrent(
};
errdefer future.destroy(gpa);
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
const busy_count = t.busy_count;
@ -2101,12 +2120,12 @@ fn groupAsync(
error.OutOfMemory => return groupAsyncEager(start, context.ptr),
};
mutexLockUncancelable(&t.mutex);
mutexLockInternal(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
}
@ -2119,7 +2138,7 @@ fn groupAsync(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
};
@ -2136,7 +2155,7 @@ fn groupAsync(
}, .monotonic);
t.run_queue.prepend(&task.runnable.node);
mutexUnlock(&t.mutex);
mutexUnlockInternal(&t.mutex);
condSignal(&t.cond);
}
fn groupAsyncEager(
@ -2201,8 +2220,8 @@ fn groupConcurrent(
};
errdefer task.destroy(gpa);
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
const busy_count = t.busy_count;
@ -2636,7 +2655,7 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) {
const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t)) catch |err| switch (err) {
error.Unexpected => deadline: {
recoverableOsBugDetected();
break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake };
@ -2735,7 +2754,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
else => {},
}
const t_io = ioBasic(t);
const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
const deadline = timeout.toTimestamp(t_io) catch return error.UnsupportedClock;
while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
@ -3838,8 +3857,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION {
if (!t.system_basic_information.initialized.load(.acquire)) {
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
switch (windows.ntdll.NtQuerySystemInformation(
.SystemBasicInformation,
@ -10899,7 +10918,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (timeout == .none) return;
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t)));
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toTimestamp(ioBasic(t)));
if (native_os == .wasi) return sleepWasi(t, timeout);
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);
return sleepNanosleep(t, timeout);
@ -12611,7 +12630,7 @@ fn netReceivePosix(
var message_i: usize = 0;
var data_i: usize = 0;
const deadline = timeout.toDeadline(t_io) catch |err| return .{ err, message_i };
const deadline = timeout.toTimestamp(t_io) catch |err| return .{ err, message_i };
recv: while (true) {
if (message_buffer.len - message_i == 0) return .{ null, message_i };
@ -14373,8 +14392,8 @@ const WindowsEnvironStrings = struct {
};
fn scanEnviron(t: *Threaded) void {
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (t.environ.initialized) return;
t.environ.initialized = true;
@ -14729,8 +14748,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
fn getDevNullFd(t: *Threaded) !posix.fd_t {
{
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (t.null_file.fd != -1) return t.null_file.fd;
}
const mode: u32 = 0;
@ -14741,8 +14760,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t {
.SUCCESS => {
syscall.finish();
const fresh_fd: posix.fd_t = @intCast(rc);
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
if (t.null_file.fd != -1) {
posix.close(fresh_fd);
return t.null_file.fd;
@ -15402,8 +15421,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
{
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (t.random_file.handle) |handle| return handle;
}
@ -15437,8 +15456,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
if (t.random_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -15458,8 +15477,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
fn getNulHandle(t: *Threaded) !windows.HANDLE {
{
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (t.null_file.handle) |handle| return handle;
}
@ -15505,8 +15524,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
if (t.null_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -16551,15 +16570,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void {
}
fn randomMainThread(t: *Threaded, buffer: []u8) void {
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (!t.csprng.isInitialized()) {
@branchHint(.unlikely);
var seed: [Csprng.seed_len]u8 = undefined;
{
mutexUnlock(&t.mutex);
defer mutexLockUncancelable(&t.mutex);
mutexUnlockInternal(&t.mutex);
defer mutexLockInternal(&t.mutex);
const prev = swapCancelProtection(t, .blocked);
defer _ = swapCancelProtection(t, prev);
@ -16744,8 +16763,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void {
fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
{
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex);
defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd == -2) return error.EntropyUnavailable;
if (t.random_file.fd != -1) return t.random_file.fd;
@ -16785,8 +16804,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
.SUCCESS => {
syscall.finish();
if (!statx.mask.TYPE) return error.EntropyUnavailable;
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16813,8 +16832,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
switch (posix.errno(fstat_sym(fd, &stat))) {
.SUCCESS => {
syscall.finish();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
mutexLockInternal(&t.mutex); // Another thread might have won the race.
defer mutexUnlockInternal(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16947,8 +16966,8 @@ const parking_futex = struct {
var status_buf: std.atomic.Value(Thread.Status) = undefined;
{
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
_ = bucket.num_waiters.fetchAdd(1, .acquire);
@ -17017,8 +17036,8 @@ const parking_futex = struct {
.parked => {
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
// our responsibility to remove `waiter` from `bucket`.
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
},
@ -17057,8 +17076,8 @@ const parking_futex = struct {
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
var waking_head: ?*std.DoublyLinkedList.Node = null;
{
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
var num_removed: u32 = 0;
var it = bucket.waiters.first;
@ -17113,8 +17132,8 @@ const parking_futex = struct {
fn removeCanceledWaiter(waiter: *Waiter) void {
const bucket = bucketForAddress(waiter.address);
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
mutexLockInternal(&bucket.mutex);
defer mutexUnlockInternal(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
@ -18163,8 +18182,8 @@ fn condWait(cond: *Condition, mutex: *Mutex) void {
assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters
}
mutexUnlock(mutex);
defer mutexLockUncancelable(mutex);
mutexUnlockInternal(mutex);
defer mutexLockInternal(mutex);
while (true) {
Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null);
@ -18189,28 +18208,13 @@ const Mutex = if (!is_windows) Io.Mutex else struct {
const init: @This() = .{ .srwlock = .{} };
};
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
fn mutexLock(m: *Io.Mutex) Io.Cancelable!void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
fn mutexLockInternal(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock);
return mutexLock(m);
}
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
fn mutexLockUncancelable(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlAcquireSRWLockExclusive(&m.srwlock);
pub fn mutexLock(m: *Io.Mutex) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
@ -18228,9 +18232,13 @@ fn mutexLockUncancelable(m: *Mutex) void {
}
}
fn mutexUnlockInternal(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock);
return mutexUnlock(m);
}
/// Same as `Io.Mutex.unlock` but avoids the VTable.
fn mutexUnlock(m: *Mutex) void {
if (is_windows) return windows.ntdll.RtlReleaseSRWLockExclusive(&m.srwlock);
pub fn mutexUnlock(m: *Io.Mutex) void {
switch (m.state.swap(.unlocked, .release)) {
.unlocked => unreachable,
.locked_once => {},

View file

@ -14,13 +14,9 @@ const posix = std.posix;
const windows = std.os.windows;
const testing = std.testing;
pub const Futex = @import("Thread/Futex.zig");
pub const Mutex = @import("Thread/Mutex.zig");
pub const Semaphore = @import("Thread/Semaphore.zig");
pub const Condition = @import("Thread/Condition.zig");
pub const RwLock = @import("Thread/RwLock.zig");
pub const Pool = @compileError("deprecated; consider using 'std.Io.Group' with 'std.Io.Threaded'");
pub const Mutex = struct {
pub const Recursive = @import("Thread/Mutex/Recursive.zig");
};
pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc;
@ -1609,11 +1605,7 @@ test "setName, getName" {
}
test {
_ = Futex;
_ = Mutex;
_ = Semaphore;
_ = Condition;
_ = RwLock;
}
fn testIncrementNotify(io: Io, value: *usize, event: *Io.Event) void {

View file

@ -1,683 +0,0 @@
//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur.
//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex.
//! Condition can be statically initialized and is at most `@sizeOf(u64)` large.
//!
//! Example:
//! ```
//! var m = Mutex{};
//! var c = Condition{};
//! var predicate = false;
//!
//! fn consumer() void {
//! m.lock();
//! defer m.unlock();
//!
//! while (!predicate) {
//! c.wait(&m);
//! }
//! }
//!
//! fn producer() void {
//! {
//! m.lock();
//! defer m.unlock();
//! predicate = true;
//! }
//! c.signal();
//! }
//!
//! const thread = try std.Thread.spawn(.{}, producer, .{});
//! consumer();
//! thread.join();
//! ```
//!
//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex.
//! This means that the following is allowed to deadlock:
//! ```
//! thread-1: mutex.lock()
//! thread-1: condition.wait(&mutex)
//!
//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1)
//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called while holding the mutex)
//! thread-2: condition.signal()
//! ```
const std = @import("../std.zig");
const builtin = @import("builtin");
const Condition = @This();
const Mutex = std.Thread.Mutex;
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Futex = std.Thread.Futex;
impl: Impl = .{},
/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
///
/// The Mutex must be locked by the caller's thread when this function is called.
/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
///
/// A blocking call to wait() is unblocked from one of the following conditions:
/// - a spurious ("at random") wake up occurs
/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`.
///
/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously
/// irrespective of any notifications from `signal()` or `broadcast()`.
pub fn wait(self: *Condition, mutex: *Mutex) void {
self.impl.wait(mutex, null) catch |err| switch (err) {
error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
};
}
/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
///
/// The Mutex must be locked by the caller's thread when this function is called.
/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
///
/// A blocking call to `timedWait()` is unblocked from one of the following conditions:
/// - a spurious ("at random") wake occurs
/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned.
/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`.
///
/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously
/// irrespective of any notifications from `signal()` or `broadcast()`.
pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void {
return self.impl.wait(mutex, timeout_ns);
}
/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex.
/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
pub fn signal(self: *Condition) void {
self.impl.wake(.one);
}
/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex.
/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
pub fn broadcast(self: *Condition) void {
self.impl.wake(.all);
}
const Impl = Impl: {
if (builtin.single_threaded) break :Impl SingleThreadedImpl;
if (builtin.os.tag == .windows) break :Impl WindowsImpl;
if (builtin.os.tag.isDarwin() or
builtin.target.os.tag == .linux or
builtin.target.os.tag == .freebsd or
builtin.target.os.tag == .openbsd or
builtin.target.os.tag == .dragonfly or
builtin.target.cpu.arch.isWasm())
{
// Futex is the system's synchronization primitive; use that.
break :Impl FutexImpl;
}
if (std.Thread.use_pthreads) {
// This system doesn't have a futex primitive, so `std.Thread.Futex` is using `PosixImpl`,
// which implements futex *on top of* pthread mutexes and conditions. Therefore, instead
// of going through that long inefficient path, just use pthread condition variable directly.
break :Impl PosixImpl;
}
break :Impl FutexImpl;
};
const Notify = enum {
one, // wake up only one thread
all, // wake up all threads
};
const SingleThreadedImpl = struct {
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
_ = self;
_ = mutex;
// There are no other threads to wake us up.
// So if we wait without a timeout we would never wake up.
assert(timeout != null); // Deadlock detected.
return error.Timeout;
}
fn wake(self: *Impl, comptime notify: Notify) void {
// There are no other threads to wake up.
_ = self;
_ = notify;
}
};
const WindowsImpl = struct {
condition: os.windows.CONDITION_VARIABLE = .{},
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
var timeout_overflowed = false;
var timeout_ms: os.windows.DWORD = os.windows.INFINITE;
if (timeout) |timeout_ns| {
// Round the nanoseconds to the nearest millisecond,
// then saturating cast it to windows DWORD for use in kernel32 call.
const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
timeout_ms = std.math.cast(os.windows.DWORD, ms) orelse std.math.maxInt(os.windows.DWORD);
// Track if the timeout overflowed into INFINITE and make sure not to wait forever.
if (timeout_ms == os.windows.INFINITE) {
timeout_overflowed = true;
timeout_ms -= 1;
}
}
if (builtin.mode == .Debug) {
// The internal state of the DebugMutex needs to be handled here as well.
mutex.impl.locking_thread.store(0, .unordered);
}
const rc = os.windows.kernel32.SleepConditionVariableSRW(
&self.condition,
if (builtin.mode == .Debug) &mutex.impl.impl.srwlock else &mutex.impl.srwlock,
timeout_ms,
0, // the srwlock was assumed to acquired in exclusive mode not shared
);
if (builtin.mode == .Debug) {
// The internal state of the DebugMutex needs to be handled here as well.
mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .unordered);
}
// Return error.Timeout if we know the timeout elapsed correctly.
if (rc == os.windows.FALSE) {
assert(os.windows.GetLastError() == .TIMEOUT);
if (!timeout_overflowed) return error.Timeout;
}
}
fn wake(self: *Impl, comptime notify: Notify) void {
switch (notify) {
.one => os.windows.ntdll.RtlWakeConditionVariable(&self.condition),
.all => os.windows.ntdll.RtlWakeAllConditionVariable(&self.condition),
}
}
};
const FutexImpl = struct {
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
epoch: std.atomic.Value(u32) = std.atomic.Value(u32).init(0),
const one_waiter = 1;
const waiter_mask = 0xffff;
const one_signal = 1 << 16;
const signal_mask = 0xffff << 16;
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
// Observe the epoch, then check the state again to see if we should wake up.
// The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
//
// - T1: s = LOAD(&state)
// - T2: UPDATE(&s, signal)
// - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
// - T1: e = LOAD(&epoch) (was reordered after the state load)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
//
// Acquire barrier to ensure the epoch load happens before the state load.
var epoch = self.epoch.load(.acquire);
var state = self.state.fetchAdd(one_waiter, .monotonic);
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
mutex.unlock();
defer mutex.lock();
var futex_deadline = Futex.Deadline.init(timeout);
while (true) {
futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) {
// On timeout, we must decrement the waiter we added above.
error.Timeout => {
while (true) {
// If there's a signal when we're timing out, consume it and report being woken up instead.
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
// Remove the waiter we added and officially return timed out.
const new_state = state - one_waiter;
state = self.state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
}
},
};
epoch = self.epoch.load(.acquire);
state = self.state.load(.monotonic);
// Try to wake up by consuming a signal and decremented the waiter we added previously.
// Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
while (state & signal_mask != 0) {
const new_state = state - one_waiter - one_signal;
state = self.state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
}
}
}
fn wake(self: *Impl, comptime notify: Notify) void {
var state = self.state.load(.monotonic);
while (true) {
const waiters = (state & waiter_mask) / one_waiter;
const signals = (state & signal_mask) / one_signal;
// Reserves which waiters to wake up by incrementing the signals count.
// Therefore, the signals count is always less than or equal to the waiters count.
// We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
const wakeable = waiters - signals;
if (wakeable == 0) {
return;
}
const to_wake = switch (notify) {
.one => 1,
.all => wakeable,
};
// Reserve the amount of waiters to wake by incrementing the signals count.
// Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
const new_state = state + (one_signal * to_wake);
state = self.state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
// Wake up the waiting threads we reserved above by changing the epoch value.
// NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
// This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
//
// Release barrier ensures the signal being added to the state happens before the epoch is changed.
// If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
//
// - T2: UPDATE(&epoch, 1) (reordered before the state change)
// - T1: e = LOAD(&epoch)
// - T1: s = LOAD(&state)
// - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
// - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
_ = self.epoch.fetchAdd(1, .release);
Futex.wake(&self.epoch, to_wake);
return;
};
}
}
};
const PosixImpl = struct {
cond: std.c.pthread_cond_t = .{},
fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
if (builtin.mode == .Debug) {
mutex.impl.locking_thread.store(0, .unordered);
}
defer if (builtin.mode == .Debug) {
mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .unordered);
};
const mtx = if (builtin.mode == .Debug) &mutex.impl.impl.mutex else &mutex.impl.mutex;
if (timeout) |t| {
switch (std.c.pthread_cond_timedwait(&self.cond, mtx, &.{
.sec = @intCast(@divFloor(t, std.time.ns_per_s)),
.nsec = @intCast(@mod(t, std.time.ns_per_s)),
})) {
.SUCCESS => return,
.TIMEDOUT => return error.Timeout,
else => unreachable,
}
}
assert(std.c.pthread_cond_wait(&self.cond, mtx) == .SUCCESS);
}
fn wake(self: *Impl, comptime notify: Notify) void {
assert(switch (notify) {
.one => std.c.pthread_cond_signal(&self.cond),
.all => std.c.pthread_cond_broadcast(&self.cond),
} == .SUCCESS);
}
};
test "smoke test" {
var mutex = Mutex{};
var cond = Condition{};
// Try to wake outside the mutex
defer cond.signal();
defer cond.broadcast();
mutex.lock();
defer mutex.unlock();
// Try to wait with a timeout (should not deadlock)
try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0));
try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms));
// Try to wake inside the mutex.
cond.signal();
cond.broadcast();
}
// Inspired from: https://github.com/Amanieu/parking_lot/pull/129
test "wait and signal" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const io = testing.io;
const num_threads = 4;
const MultiWait = struct {
mutex: Mutex = .{},
cond: Condition = .{},
threads: [num_threads]std.Thread = undefined,
spawn_count: std.math.IntFittingRange(0, num_threads) = 0,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
self.spawn_count += 1;
self.cond.wait(&self.mutex);
self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {};
self.cond.signal();
}
};
var multi_wait = MultiWait{};
for (&multi_wait.threads) |*t| {
t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait});
}
while (true) {
try std.Io.Clock.Duration.sleep(.{ .clock = .awake, .raw = .fromMilliseconds(100) }, io);
multi_wait.mutex.lock();
defer multi_wait.mutex.unlock();
// Make sure all of the threads have finished spawning to avoid a deadlock.
if (multi_wait.spawn_count == num_threads) break;
}
multi_wait.cond.signal();
for (multi_wait.threads) |t| {
t.join();
}
}
test signal {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const io = testing.io;
const num_threads = 4;
const SignalTest = struct {
mutex: Mutex = .{},
cond: Condition = .{},
notified: bool = false,
threads: [num_threads]std.Thread = undefined,
spawn_count: std.math.IntFittingRange(0, num_threads) = 0,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
self.spawn_count += 1;
// Use timedWait() a few times before using wait()
// to test multiple threads timing out frequently.
var i: usize = 0;
while (!self.notified) : (i +%= 1) {
if (i < 5) {
self.cond.timedWait(&self.mutex, 1) catch {};
} else {
self.cond.wait(&self.mutex);
}
}
// Once we received the signal, notify another thread (inside the lock).
assert(self.notified);
self.cond.signal();
}
};
var signal_test = SignalTest{};
for (&signal_test.threads) |*t| {
t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test});
}
while (true) {
try std.Io.Clock.Duration.sleep(.{ .clock = .awake, .raw = .fromMilliseconds(10) }, io);
signal_test.mutex.lock();
defer signal_test.mutex.unlock();
// Make sure at least one thread has finished spawning to avoid testing nothing.
if (signal_test.spawn_count > 0) break;
}
{
// Wake up one of them (outside the lock) after setting notified=true.
defer signal_test.cond.signal();
signal_test.mutex.lock();
defer signal_test.mutex.unlock();
try testing.expect(!signal_test.notified);
signal_test.notified = true;
}
for (signal_test.threads) |t| {
t.join();
}
}
test "multi signal" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 4;
const num_iterations = 4;
const Paddle = struct {
mutex: Mutex = .{},
cond: Condition = .{},
value: u32 = 0,
fn hit(self: *@This()) void {
defer self.cond.signal();
self.mutex.lock();
defer self.mutex.unlock();
self.value += 1;
}
fn run(self: *@This(), hit_to: *@This()) !void {
self.mutex.lock();
defer self.mutex.unlock();
var current: u32 = 0;
while (current < num_iterations) : (current += 1) {
// Wait for the value to change from hit()
while (self.value == current) {
self.cond.wait(&self.mutex);
}
// hit the next paddle
try testing.expectEqual(self.value, current + 1);
hit_to.hit();
}
}
};
var paddles = [_]Paddle{.{}} ** num_threads;
var threads = [_]std.Thread{undefined} ** num_threads;
// Create a circle of paddles which hit each other
for (&threads, 0..) |*t, i| {
const paddle = &paddles[i];
const hit_to = &paddles[(i + 1) % paddles.len];
t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
}
// Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
paddles[0].hit();
for (threads) |t| t.join();
// The first paddle will be hit one last time by the last paddle.
for (paddles, 0..) |p, i| {
const expected = @as(u32, num_iterations) + @intFromBool(i == 0);
try testing.expectEqual(p.value, expected);
}
}
test broadcast {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 10;
const BroadcastTest = struct {
mutex: Mutex = .{},
cond: Condition = .{},
completed: Condition = .{},
count: usize = 0,
threads: [num_threads]std.Thread = undefined,
fn run(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
// The last broadcast thread to start tells the main test thread it's completed.
self.count += 1;
if (self.count == num_threads) {
self.completed.signal();
}
// Waits for the count to reach zero after the main test thread observes it at num_threads.
// Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out.
var i: usize = 0;
while (self.count != 0) : (i +%= 1) {
if (i < 10) {
self.cond.timedWait(&self.mutex, 1) catch {};
} else {
self.cond.wait(&self.mutex);
}
}
}
};
var broadcast_test = BroadcastTest{};
for (&broadcast_test.threads) |*t| {
t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test});
}
{
broadcast_test.mutex.lock();
defer broadcast_test.mutex.unlock();
// Wait for all the broadcast threads to spawn.
// timedWait() to detect any potential deadlocks.
while (broadcast_test.count != num_threads) {
broadcast_test.completed.timedWait(
&broadcast_test.mutex,
1 * std.time.ns_per_s,
) catch {};
}
// Reset the counter and wake all the threads to exit.
broadcast_test.count = 0;
broadcast_test.cond.broadcast();
}
for (broadcast_test.threads) |t| {
t.join();
}
}
test "broadcasting - wake all threads" {
// Tests issue #12877
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
var num_runs: usize = 1;
const num_threads = 10;
while (num_runs > 0) : (num_runs -= 1) {
const BroadcastTest = struct {
mutex: Mutex = .{},
cond: Condition = .{},
completed: Condition = .{},
count: usize = 0,
thread_id_to_wake: usize = 0,
threads: [num_threads]std.Thread = undefined,
wakeups: usize = 0,
fn run(self: *@This(), thread_id: usize) void {
self.mutex.lock();
defer self.mutex.unlock();
// The last broadcast thread to start tells the main test thread it's completed.
self.count += 1;
if (self.count == num_threads) {
self.completed.signal();
}
while (self.thread_id_to_wake != thread_id) {
self.cond.timedWait(&self.mutex, 1 * std.time.ns_per_s) catch {};
self.wakeups += 1;
}
if (self.thread_id_to_wake <= num_threads) {
// Signal next thread to wake up.
self.thread_id_to_wake += 1;
self.cond.broadcast();
}
}
};
var broadcast_test = BroadcastTest{};
var thread_id: usize = 1;
for (&broadcast_test.threads) |*t| {
t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{ &broadcast_test, thread_id });
thread_id += 1;
}
{
broadcast_test.mutex.lock();
defer broadcast_test.mutex.unlock();
// Wait for all the broadcast threads to spawn.
// timedWait() to detect any potential deadlocks.
while (broadcast_test.count != num_threads) {
broadcast_test.completed.timedWait(
&broadcast_test.mutex,
1 * std.time.ns_per_s,
) catch {};
}
// Signal thread 1 to wake up
broadcast_test.thread_id_to_wake = 1;
broadcast_test.cond.broadcast();
}
for (broadcast_test.threads) |t| {
t.join();
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,367 +0,0 @@
//! Mutex is a synchronization primitive which enforces atomic access to a
//! shared region of code known as the "critical section".
//!
//! It does this by blocking ensuring only one thread is in the critical
//! section at any given point in time by blocking the others.
//!
//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large.
//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it.
const std = @import("../std.zig");
const builtin = @import("builtin");
const Mutex = @This();
const assert = std.debug.assert;
const testing = std.testing;
const Thread = std.Thread;
const Futex = Thread.Futex;
impl: Impl = .{},
pub const Recursive = @import("Mutex/Recursive.zig");
/// Tries to acquire the mutex without blocking the caller's thread.
/// Returns `false` if the calling thread would have to block to acquire it.
/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it.
pub fn tryLock(self: *Mutex) bool {
return self.impl.tryLock();
}
/// Acquires the mutex, blocking the caller's thread until it can.
/// It is undefined behavior if the mutex is already held by the caller's thread.
/// Once acquired, call `unlock()` on the Mutex to release it.
pub fn lock(self: *Mutex) void {
self.impl.lock();
}
/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`.
/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from.
pub fn unlock(self: *Mutex) void {
self.impl.unlock();
}
const Impl = if (builtin.mode == .Debug and !builtin.single_threaded)
DebugImpl
else
ReleaseImpl;
const ReleaseImpl = Impl: {
if (builtin.single_threaded) break :Impl SingleThreadedImpl;
if (builtin.os.tag == .windows) break :Impl WindowsImpl;
if (builtin.os.tag.isDarwin()) break :Impl DarwinImpl;
if (builtin.target.os.tag == .linux or
builtin.target.os.tag == .freebsd or
builtin.target.os.tag == .openbsd or
builtin.target.os.tag == .dragonfly or
builtin.target.cpu.arch.isWasm())
{
// Futex is the system's synchronization primitive; use that.
break :Impl FutexImpl;
}
if (std.Thread.use_pthreads) {
// This system doesn't have a futex primitive, so `std.Thread.Futex` is using `PosixImpl`,
// which implements futex *on top of* pthread mutexes and conditions. Therefore, instead
// of going through that long inefficient path, just use pthread mutex directly.
break :Impl PosixImpl;
}
break :Impl FutexImpl;
};
const DebugImpl = struct {
locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked.
impl: ReleaseImpl = .{},
inline fn tryLock(self: *@This()) bool {
const locking = self.impl.tryLock();
if (locking) {
self.locking_thread.store(Thread.getCurrentId(), .unordered);
}
return locking;
}
inline fn lock(self: *@This()) void {
const current_id = Thread.getCurrentId();
if (self.locking_thread.load(.unordered) == current_id and current_id != 0) {
@panic("Deadlock detected");
}
self.impl.lock();
self.locking_thread.store(current_id, .unordered);
}
inline fn unlock(self: *@This()) void {
assert(self.locking_thread.load(.unordered) == Thread.getCurrentId());
self.locking_thread.store(0, .unordered);
self.impl.unlock();
}
};
const SingleThreadedImpl = struct {
is_locked: bool = false,
fn tryLock(self: *@This()) bool {
if (self.is_locked) return false;
self.is_locked = true;
return true;
}
fn lock(self: *@This()) void {
if (!self.tryLock()) {
unreachable; // deadlock detected
}
}
fn unlock(self: *@This()) void {
assert(self.is_locked);
self.is_locked = false;
}
};
/// SRWLOCK on windows is almost always faster than Futex solution.
/// It also implements an efficient Condition with requeue support for us.
const WindowsImpl = struct {
srwlock: windows.SRWLOCK = .{},
fn tryLock(self: *@This()) bool {
return windows.ntdll.RtlTryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
}
fn lock(self: *@This()) void {
windows.ntdll.RtlAcquireSRWLockExclusive(&self.srwlock);
}
fn unlock(self: *@This()) void {
windows.ntdll.RtlReleaseSRWLockExclusive(&self.srwlock);
}
const windows = std.os.windows;
};
/// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
const DarwinImpl = struct {
oul: c.os_unfair_lock = .{},
fn tryLock(self: *@This()) bool {
return c.os_unfair_lock_trylock(&self.oul);
}
fn lock(self: *@This()) void {
c.os_unfair_lock_lock(&self.oul);
}
fn unlock(self: *@This()) void {
c.os_unfair_lock_unlock(&self.oul);
}
const c = std.c;
};
const FutexImpl = struct {
state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
const unlocked: u32 = 0b00;
const locked: u32 = 0b01;
const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
fn lock(self: *@This()) void {
if (!self.tryLock())
self.lockSlow();
}
fn tryLock(self: *@This()) bool {
// On x86, use `lock bts` instead of `lock cmpxchg` as:
// - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
// - `lock bts` is smaller instruction-wise which makes it better for inlining
if (builtin.target.cpu.arch.isX86()) {
const locked_bit = @ctz(locked);
return self.state.bitSet(locked_bit, .acquire) == 0;
}
// Acquire barrier ensures grabbing the lock happens before the critical section
// and that the previous lock holder's critical section happens before we grab the lock.
return self.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
}
fn lockSlow(self: *@This()) void {
@branchHint(.cold);
// Avoid doing an atomic swap below if we already know the state is contended.
// An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
if (self.state.load(.monotonic) == contended) {
Futex.wait(&self.state, contended);
}
// Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
//
// Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
// If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
// The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
// but this is better than having to wake all waiting threads on mutex unlock.
//
// Acquire barrier ensures grabbing the lock happens before the critical section
// and that the previous lock holder's critical section happens before we grab the lock.
while (self.state.swap(contended, .acquire) != unlocked) {
Futex.wait(&self.state, contended);
}
}
fn unlock(self: *@This()) void {
// Unlock the mutex and wake up a waiting thread if any.
//
// A waiting thread will acquire with `contended` instead of `locked`
// which ensures that it wakes up another thread on the next unlock().
//
// Release barrier ensures the critical section happens before we let go of the lock
// and that our critical section happens before the next lock holder grabs the lock.
const state = self.state.swap(unlocked, .release);
assert(state != unlocked);
if (state == contended) {
Futex.wake(&self.state, 1);
}
}
};
const PosixImpl = struct {
mutex: std.c.pthread_mutex_t = .{},
fn tryLock(impl: *PosixImpl) bool {
switch (std.c.pthread_mutex_trylock(&impl.mutex)) {
.SUCCESS => return true,
.BUSY => return false,
.INVAL => unreachable, // mutex is initialized correctly
else => unreachable,
}
}
fn lock(impl: *PosixImpl) void {
switch (std.c.pthread_mutex_lock(&impl.mutex)) {
.SUCCESS => return,
.INVAL => unreachable, // mutex is initialized correctly
.DEADLK => unreachable, // not an error checking mutex
else => unreachable,
}
}
fn unlock(impl: *PosixImpl) void {
switch (std.c.pthread_mutex_unlock(&impl.mutex)) {
.SUCCESS => return,
.INVAL => unreachable, // mutex is initialized correctly
.PERM => unreachable, // not an error checking mutex
else => unreachable,
}
}
};
test "smoke test" {
var mutex = Mutex{};
try testing.expect(mutex.tryLock());
try testing.expect(!mutex.tryLock());
mutex.unlock();
mutex.lock();
try testing.expect(!mutex.tryLock());
mutex.unlock();
}
// A counter which is incremented without atomic instructions
const NonAtomicCounter = struct {
// direct u128 could maybe use xmm ops on x86 which are atomic
value: [2]u64 = [_]u64{ 0, 0 },
fn get(self: NonAtomicCounter) u128 {
return @as(u128, @bitCast(self.value));
}
fn inc(self: *NonAtomicCounter) void {
for (@as([2]u64, @bitCast(self.get() + 1)), 0..) |v, i| {
@as(*volatile u64, @ptrCast(&self.value[i])).* = v;
}
}
};
test "many uncontended" {
// This test requires spawning threads.
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 4;
const num_increments = 1000;
const Runner = struct {
mutex: Mutex = .{},
thread: Thread = undefined,
counter: NonAtomicCounter = .{},
fn run(self: *@This()) void {
var i: usize = num_increments;
while (i > 0) : (i -= 1) {
self.mutex.lock();
defer self.mutex.unlock();
self.counter.inc();
}
}
};
var runners = [_]Runner{.{}} ** num_threads;
for (&runners) |*r| r.thread = try Thread.spawn(.{}, Runner.run, .{r});
for (runners) |r| r.thread.join();
for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments);
}
test "many contended" {
// This test requires spawning threads.
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 4;
const num_increments = 1000;
const Runner = struct {
mutex: Mutex = .{},
counter: NonAtomicCounter = .{},
fn run(self: *@This()) void {
var i: usize = num_increments;
while (i > 0) : (i -= 1) {
// Occasionally hint to let another thread run.
defer if (i % 100 == 0) Thread.yield() catch {};
self.mutex.lock();
defer self.mutex.unlock();
self.counter.inc();
}
}
};
var runner = Runner{};
var threads: [num_threads]Thread = undefined;
for (&threads) |*t| t.* = try Thread.spawn(.{}, Runner.run, .{&runner});
for (threads) |t| t.join();
try testing.expectEqual(runner.counter.get(), num_increments * num_threads);
}
// https://github.com/ziglang/zig/issues/19295
//test @This() {
// var m: Mutex = .{};
//
// {
// m.lock();
// defer m.unlock();
// // ... critical section code
// }
//
// if (m.tryLock()) {
// defer m.unlock();
// // ... critical section code
// }
//}

View file

@ -7,18 +7,18 @@
//! A recursive mutex is an abstraction layer on top of a regular mutex;
//! therefore it is recommended to use instead `std.Mutex` unless there is a
//! specific reason a recursive mutex is warranted.
const Recursive = @This();
const std = @import("../../std.zig");
const Recursive = @This();
const Mutex = std.Thread.Mutex;
const Io = std.Io;
const assert = std.debug.assert;
mutex: Mutex,
mutex: Io.Mutex,
thread_id: std.Thread.Id,
lock_count: usize,
pub const init: Recursive = .{
.mutex = .{},
.mutex = .init,
.thread_id = invalid_thread_id,
.lock_count = 0,
};
@ -49,7 +49,7 @@ pub fn tryLock(r: *Recursive) bool {
pub fn lock(r: *Recursive) void {
const current_thread_id = std.Thread.getCurrentId();
if (@atomicLoad(std.Thread.Id, &r.thread_id, .unordered) != current_thread_id) {
r.mutex.lock();
Io.Threaded.mutexLock(&r.mutex);
assert(r.lock_count == 0);
@atomicStore(std.Thread.Id, &r.thread_id, current_thread_id, .unordered);
}
@ -64,7 +64,7 @@ pub fn unlock(r: *Recursive) void {
r.lock_count -= 1;
if (r.lock_count == 0) {
@atomicStore(std.Thread.Id, &r.thread_id, invalid_thread_id, .unordered);
r.mutex.unlock();
Io.Threaded.mutexUnlock(&r.mutex);
}
}

View file

@ -1,386 +0,0 @@
//! A lock that supports one writer or many readers.
//! This API is for kernel threads, not evented I/O.
//! This API requires being initialized at runtime, and initialization
//! can fail. Once initialized, the core operations cannot fail.
impl: Impl = .{},
const RwLock = @This();
const std = @import("../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const testing = std.testing;
pub const Impl = if (builtin.single_threaded)
SingleThreadedRwLock
else if (std.Thread.use_pthreads)
PthreadRwLock
else
DefaultRwLock;
/// Attempts to obtain exclusive lock ownership.
/// Returns `true` if the lock is obtained, `false` otherwise.
pub fn tryLock(rwl: *RwLock) bool {
return rwl.impl.tryLock();
}
/// Blocks until exclusive lock ownership is acquired.
pub fn lock(rwl: *RwLock) void {
return rwl.impl.lock();
}
/// Releases a held exclusive lock.
/// Asserts the lock is held exclusively.
pub fn unlock(rwl: *RwLock) void {
return rwl.impl.unlock();
}
/// Attempts to obtain shared lock ownership.
/// Returns `true` if the lock is obtained, `false` otherwise.
pub fn tryLockShared(rwl: *RwLock) bool {
return rwl.impl.tryLockShared();
}
/// Obtains shared lock ownership.
/// Blocks if another thread has exclusive ownership.
/// May block if another thread is attempting to get exclusive ownership.
pub fn lockShared(rwl: *RwLock) void {
return rwl.impl.lockShared();
}
/// Releases a held shared lock.
pub fn unlockShared(rwl: *RwLock) void {
return rwl.impl.unlockShared();
}
/// Single-threaded applications use this for deadlock checks in
/// debug mode, and no-ops in release modes.
pub const SingleThreadedRwLock = struct {
state: enum { unlocked, locked_exclusive, locked_shared } = .unlocked,
shared_count: usize = 0,
/// Attempts to obtain exclusive lock ownership.
/// Returns `true` if the lock is obtained, `false` otherwise.
pub fn tryLock(rwl: *SingleThreadedRwLock) bool {
switch (rwl.state) {
.unlocked => {
assert(rwl.shared_count == 0);
rwl.state = .locked_exclusive;
return true;
},
.locked_exclusive, .locked_shared => return false,
}
}
/// Blocks until exclusive lock ownership is acquired.
pub fn lock(rwl: *SingleThreadedRwLock) void {
assert(rwl.state == .unlocked); // deadlock detected
assert(rwl.shared_count == 0); // corrupted state detected
rwl.state = .locked_exclusive;
}
/// Releases a held exclusive lock.
/// Asserts the lock is held exclusively.
pub fn unlock(rwl: *SingleThreadedRwLock) void {
assert(rwl.state == .locked_exclusive);
assert(rwl.shared_count == 0); // corrupted state detected
rwl.state = .unlocked;
}
/// Attempts to obtain shared lock ownership.
/// Returns `true` if the lock is obtained, `false` otherwise.
pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool {
switch (rwl.state) {
.unlocked => {
rwl.state = .locked_shared;
assert(rwl.shared_count == 0);
rwl.shared_count = 1;
return true;
},
.locked_shared => {
rwl.shared_count += 1;
return true;
},
.locked_exclusive => return false,
}
}
/// Blocks until shared lock ownership is acquired.
pub fn lockShared(rwl: *SingleThreadedRwLock) void {
switch (rwl.state) {
.unlocked => {
rwl.state = .locked_shared;
assert(rwl.shared_count == 0);
rwl.shared_count = 1;
},
.locked_shared => {
rwl.shared_count += 1;
},
.locked_exclusive => unreachable, // deadlock detected
}
}
/// Releases a held shared lock.
pub fn unlockShared(rwl: *SingleThreadedRwLock) void {
switch (rwl.state) {
.unlocked => unreachable, // too many calls to `unlockShared`
.locked_exclusive => unreachable, // exclusively held lock
.locked_shared => {
rwl.shared_count -= 1;
if (rwl.shared_count == 0) {
rwl.state = .unlocked;
}
},
}
}
};
pub const PthreadRwLock = struct {
rwlock: std.c.pthread_rwlock_t = .{},
pub fn tryLock(rwl: *PthreadRwLock) bool {
return std.c.pthread_rwlock_trywrlock(&rwl.rwlock) == .SUCCESS;
}
pub fn lock(rwl: *PthreadRwLock) void {
const rc = std.c.pthread_rwlock_wrlock(&rwl.rwlock);
assert(rc == .SUCCESS);
}
pub fn unlock(rwl: *PthreadRwLock) void {
const rc = std.c.pthread_rwlock_unlock(&rwl.rwlock);
assert(rc == .SUCCESS);
}
pub fn tryLockShared(rwl: *PthreadRwLock) bool {
return std.c.pthread_rwlock_tryrdlock(&rwl.rwlock) == .SUCCESS;
}
pub fn lockShared(rwl: *PthreadRwLock) void {
const rc = std.c.pthread_rwlock_rdlock(&rwl.rwlock);
assert(rc == .SUCCESS);
}
pub fn unlockShared(rwl: *PthreadRwLock) void {
const rc = std.c.pthread_rwlock_unlock(&rwl.rwlock);
assert(rc == .SUCCESS);
}
};
pub const DefaultRwLock = struct {
state: usize = 0,
mutex: std.Thread.Mutex = .{},
semaphore: std.Thread.Semaphore = .{},
const IS_WRITING: usize = 1;
const WRITER: usize = 1 << 1;
const READER: usize = 1 << (1 + @bitSizeOf(Count));
const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(WRITER);
const READER_MASK: usize = std.math.maxInt(Count) << @ctz(READER);
const Count = std.meta.Int(.unsigned, @divFloor(@bitSizeOf(usize) - 1, 2));
pub fn tryLock(rwl: *DefaultRwLock) bool {
if (rwl.mutex.tryLock()) {
const state = @atomicLoad(usize, &rwl.state, .seq_cst);
if (state & READER_MASK == 0) {
_ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .seq_cst);
return true;
}
rwl.mutex.unlock();
}
return false;
}
pub fn lock(rwl: *DefaultRwLock) void {
_ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .seq_cst);
rwl.mutex.lock();
const state = @atomicRmw(usize, &rwl.state, .Add, IS_WRITING -% WRITER, .seq_cst);
if (state & READER_MASK != 0)
rwl.semaphore.wait();
}
pub fn unlock(rwl: *DefaultRwLock) void {
_ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .seq_cst);
rwl.mutex.unlock();
}
pub fn tryLockShared(rwl: *DefaultRwLock) bool {
const state = @atomicLoad(usize, &rwl.state, .seq_cst);
if (state & (IS_WRITING | WRITER_MASK) == 0) {
_ = @cmpxchgStrong(
usize,
&rwl.state,
state,
state + READER,
.seq_cst,
.seq_cst,
) orelse return true;
}
if (rwl.mutex.tryLock()) {
_ = @atomicRmw(usize, &rwl.state, .Add, READER, .seq_cst);
rwl.mutex.unlock();
return true;
}
return false;
}
pub fn lockShared(rwl: *DefaultRwLock) void {
var state = @atomicLoad(usize, &rwl.state, .seq_cst);
while (state & (IS_WRITING | WRITER_MASK) == 0) {
state = @cmpxchgWeak(
usize,
&rwl.state,
state,
state + READER,
.seq_cst,
.seq_cst,
) orelse return;
}
rwl.mutex.lock();
_ = @atomicRmw(usize, &rwl.state, .Add, READER, .seq_cst);
rwl.mutex.unlock();
}
pub fn unlockShared(rwl: *DefaultRwLock) void {
const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .seq_cst);
if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
rwl.semaphore.post();
}
};
test "DefaultRwLock - internal state" {
var rwl = DefaultRwLock{};
// The following failed prior to the fix for Issue #13163,
// where the WRITER flag was subtracted by the lock method.
rwl.lock();
rwl.unlock();
try testing.expectEqual(rwl, DefaultRwLock{});
}
test "smoke test" {
var rwl = RwLock{};
rwl.lock();
try testing.expect(!rwl.tryLock());
try testing.expect(!rwl.tryLockShared());
rwl.unlock();
try testing.expect(rwl.tryLock());
try testing.expect(!rwl.tryLock());
try testing.expect(!rwl.tryLockShared());
rwl.unlock();
rwl.lockShared();
try testing.expect(!rwl.tryLock());
try testing.expect(rwl.tryLockShared());
rwl.unlockShared();
rwl.unlockShared();
try testing.expect(rwl.tryLockShared());
try testing.expect(!rwl.tryLock());
try testing.expect(rwl.tryLockShared());
rwl.unlockShared();
rwl.unlockShared();
rwl.lock();
rwl.unlock();
}
test "concurrent access" {
if (builtin.single_threaded)
return;
const num_writers: usize = 2;
const num_readers: usize = 4;
const num_writes: usize = 1000;
const num_reads: usize = 2000;
const Runner = struct {
const Runner = @This();
rwl: RwLock,
writes: usize,
reads: std.atomic.Value(usize),
val_a: usize,
val_b: usize,
fn reader(run: *Runner, thread_idx: usize) !void {
var prng = std.Random.DefaultPrng.init(thread_idx);
const rnd = prng.random();
while (true) {
run.rwl.lockShared();
defer run.rwl.unlockShared();
try testing.expect(run.writes <= num_writes);
if (run.reads.fetchAdd(1, .monotonic) >= num_reads) break;
// We use `volatile` accesses so that we can make sure the memory is accessed either
// side of a yield, maximising chances of a race.
const a_ptr: *const volatile usize = &run.val_a;
const b_ptr: *const volatile usize = &run.val_b;
const old_a = a_ptr.*;
if (rnd.boolean()) try std.Thread.yield();
const old_b = b_ptr.*;
try testing.expect(old_a == old_b);
}
}
fn writer(run: *Runner, thread_idx: usize) !void {
var prng = std.Random.DefaultPrng.init(thread_idx);
const rnd = prng.random();
while (true) {
run.rwl.lock();
defer run.rwl.unlock();
try testing.expect(run.writes <= num_writes);
if (run.writes == num_writes) break;
// We use `volatile` accesses so that we can make sure the memory is accessed either
// side of a yield, maximising chances of a race.
const a_ptr: *volatile usize = &run.val_a;
const b_ptr: *volatile usize = &run.val_b;
const new_val = rnd.int(usize);
const old_a = a_ptr.*;
a_ptr.* = new_val;
if (rnd.boolean()) try std.Thread.yield();
const old_b = b_ptr.*;
b_ptr.* = new_val;
try testing.expect(old_a == old_b);
run.writes += 1;
}
}
};
var run: Runner = .{
.rwl = .{},
.writes = 0,
.reads = .init(0),
.val_a = 0,
.val_b = 0,
};
var write_threads: [num_writers]std.Thread = undefined;
var read_threads: [num_readers]std.Thread = undefined;
for (&write_threads, 0..) |*t, i| t.* = try .spawn(.{}, Runner.writer, .{ &run, i });
for (&read_threads, num_writers..) |*t, i| t.* = try .spawn(.{}, Runner.reader, .{ &run, i });
for (write_threads) |t| t.join();
for (read_threads) |t| t.join();
try testing.expect(run.writes == num_writes);
try testing.expect(run.reads.raw >= num_reads);
}

View file

@ -1,111 +0,0 @@
//! A semaphore is an unsigned integer that blocks the kernel thread if
//! the number would become negative.
//! This API supports static initialization and does not require deinitialization.
//!
//! Example:
//! ```
//! var s = Semaphore{};
//!
//! fn consumer() void {
//! s.wait();
//! }
//!
//! fn producer() void {
//! s.post();
//! }
//!
//! const thread = try std.Thread.spawn(.{}, producer, .{});
//! consumer();
//! thread.join();
//! ```
mutex: Mutex = .{},
cond: Condition = .{},
/// It is OK to initialize this field to any value.
permits: usize = 0,
const Semaphore = @This();
const std = @import("../std.zig");
const Mutex = std.Thread.Mutex;
const Condition = std.Thread.Condition;
const builtin = @import("builtin");
const testing = std.testing;
pub fn wait(sem: *Semaphore) void {
sem.mutex.lock();
defer sem.mutex.unlock();
while (sem.permits == 0)
sem.cond.wait(&sem.mutex);
sem.permits -= 1;
if (sem.permits > 0)
sem.cond.signal();
}
pub fn timedWait(sem: *Semaphore, timeout_ns: u64) error{Timeout}!void {
var timeout_timer = std.time.Timer.start() catch unreachable;
sem.mutex.lock();
defer sem.mutex.unlock();
while (sem.permits == 0) {
const elapsed = timeout_timer.read();
if (elapsed > timeout_ns)
return error.Timeout;
const local_timeout_ns = timeout_ns - elapsed;
try sem.cond.timedWait(&sem.mutex, local_timeout_ns);
}
sem.permits -= 1;
if (sem.permits > 0)
sem.cond.signal();
}
pub fn post(sem: *Semaphore) void {
sem.mutex.lock();
defer sem.mutex.unlock();
sem.permits += 1;
sem.cond.signal();
}
test Semaphore {
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const TestContext = struct {
sem: *Semaphore,
n: *i32,
fn worker(ctx: *@This()) void {
ctx.sem.wait();
ctx.n.* += 1;
ctx.sem.post();
}
};
const num_threads = 3;
var sem = Semaphore{ .permits = 1 };
var threads: [num_threads]std.Thread = undefined;
var n: i32 = 0;
var ctx = TestContext{ .sem = &sem, .n = &n };
for (&threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
for (threads) |t| t.join();
sem.wait();
try testing.expect(n == num_threads);
}
test timedWait {
var sem = Semaphore{};
try testing.expectEqual(0, sem.permits);
try testing.expectError(error.Timeout, sem.timedWait(1));
sem.post();
try testing.expectEqual(1, sem.permits);
try sem.timedWait(1);
try testing.expectEqual(0, sem.permits);
}

View file

@ -1,3 +1,10 @@
const builtin = @import("builtin");
const std = @import("std.zig");
const AtomicOrder = std.builtin.AtomicOrder;
const testing = std.testing;
const assert = std.debug.assert;
/// This is a thin wrapper around a primitive value to prevent accidental data races.
pub fn Value(comptime T: type) type {
return extern struct {
@ -496,7 +503,17 @@ test "current CPU has a cache line size" {
_ = cache_line;
}
const std = @import("std.zig");
const builtin = @import("builtin");
const AtomicOrder = std.builtin.AtomicOrder;
const testing = std.testing;
/// A lock-free single-owner resource.
pub const Mutex = enum(u8) {
unlocked,
locked,
pub fn tryLock(m: *Mutex) bool {
return @cmpxchgWeak(Mutex, m, .unlocked, .locked, .acquire, .monotonic) == null;
}
pub fn unlock(m: *Mutex) void {
assert(m.* == .locked);
@atomicStore(Mutex, m, .unlocked, .release);
}
};

View file

@ -696,7 +696,7 @@ pub noinline fn writeCurrentStackTrace(options: StackUnwindOptions, t: Io.Termin
.useless, .unsafe => {},
.safe, .ideal => continue, // no need to even warn
}
const module_name = di.getModuleName(di_gpa, unwind_error.address) catch "???";
const module_name = di.getModuleName(di_gpa, io, unwind_error.address) catch "???";
const caption: []const u8 = switch (unwind_error.err) {
error.MissingDebugInfo => "unwind info unavailable",
error.InvalidDebugInfo => "unwind info invalid",
@ -1141,7 +1141,7 @@ fn printSourceAtAddress(
symbol.source_location,
address,
symbol.name orelse "???",
symbol.compile_unit_name orelse debug_info.getModuleName(gpa, address) catch "???",
symbol.compile_unit_name orelse debug_info.getModuleName(gpa, io, address) catch "???",
);
}
fn printLineInfo(
@ -1356,7 +1356,10 @@ pub fn getDebugInfoAllocator() Allocator {
// Otherwise, use a global arena backed by the page allocator
const S = struct {
var arena: std.heap.ArenaAllocator = .init(std.heap.page_allocator);
var ts_arena: std.heap.ThreadSafeAllocator = .{ .child_allocator = arena.allocator() };
var ts_arena: std.heap.ThreadSafeAllocator = .{
.child_allocator = arena.allocator(),
.io = std.Options.debug_io,
};
};
return S.ts_arena.allocator();
}

View file

@ -1,11 +1,12 @@
const Coverage = @This();
const std = @import("../std.zig");
const Io = std.Io;
const Allocator = std.mem.Allocator;
const Hash = std.hash.Wyhash;
const Dwarf = std.debug.Dwarf;
const assert = std.debug.assert;
const Coverage = @This();
/// Provides a globally-scoped integer index for directories.
///
/// As opposed to, for example, a directory index that is compilation-unit
@ -23,12 +24,12 @@ directories: std.ArrayHashMapUnmanaged(String, void, String.MapContext, false),
files: std.ArrayHashMapUnmanaged(File, void, File.MapContext, false),
string_bytes: std.ArrayList(u8),
/// Protects the other fields.
mutex: std.Thread.Mutex,
mutex: Io.Mutex,
pub const init: Coverage = .{
.directories = .{},
.files = .{},
.mutex = .{},
.mutex = .init,
.string_bytes = .{},
};
@ -140,11 +141,12 @@ pub fn stringAt(cov: *Coverage, index: String) [:0]const u8 {
return span(cov.string_bytes.items[@intFromEnum(index)..]);
}
pub const ResolveAddressesDwarfError = Dwarf.ScanError;
pub const ResolveAddressesDwarfError = Dwarf.ScanError || Io.Cancelable;
pub fn resolveAddressesDwarf(
cov: *Coverage,
gpa: Allocator,
io: Io,
endian: std.builtin.Endian,
/// Asserts the addresses are in ascending order.
sorted_pc_addrs: []const u64,
@ -161,8 +163,8 @@ pub fn resolveAddressesDwarf(
var prev_pc: u64 = 0;
var prev_cu: ?*std.debug.Dwarf.CompileUnit = null;
// Protects directories and files tables from other threads.
cov.mutex.lock();
defer cov.mutex.unlock();
try cov.mutex.lock(io);
defer cov.mutex.unlock(io);
next_pc: for (sorted_pc_addrs, output) |pc, *out| {
assert(pc >= prev_pc);
prev_pc = pc;
@ -183,8 +185,8 @@ pub fn resolveAddressesDwarf(
if (cu != prev_cu) {
prev_cu = cu;
if (cu.src_loc_cache == null) {
cov.mutex.unlock();
defer cov.mutex.lock();
cov.mutex.unlock(io);
defer cov.mutex.lockUncancelable(io);
d.populateSrcLocCache(gpa, endian, cu) catch |err| switch (err) {
error.MissingDebugInfo, error.InvalidDebugInfo => {
out.* = SourceLocation.invalid;

View file

@ -93,7 +93,7 @@ pub fn resolveAddresses(
) ResolveAddressesError!void {
assert(sorted_pc_addrs.len == output.len);
switch (info.impl) {
.elf => |*ef| return info.coverage.resolveAddressesDwarf(gpa, ef.endian, sorted_pc_addrs, output, &ef.dwarf.?),
.elf => |*ef| return info.coverage.resolveAddressesDwarf(gpa, io, ef.endian, sorted_pc_addrs, output, &ef.dwarf.?),
.macho => |*mf| {
// Resolving all of the addresses at once unfortunately isn't so easy in Mach-O binaries
// due to split debug information. For now, we'll just resolve the addreses one by one.
@ -112,7 +112,7 @@ pub fn resolveAddresses(
else => |e| return e,
};
}
try info.coverage.resolveAddressesDwarf(gpa, .little, &.{dwarf_pc_addr}, src_loc[0..1], dwarf);
try info.coverage.resolveAddressesDwarf(gpa, io, .little, &.{dwarf_pc_addr}, src_loc[0..1], dwarf);
}
},
}

View file

@ -1,4 +1,4 @@
rwlock: std.Thread.RwLock,
rwlock: Io.RwLock,
modules: std.ArrayList(Module),
ranges: std.ArrayList(Module.Range),
@ -6,7 +6,7 @@ ranges: std.ArrayList(Module.Range),
unwind_cache: if (can_unwind) ?[]Dwarf.SelfUnwinder.CacheEntry else ?noreturn,
pub const init: SelfInfo = .{
.rwlock = .{},
.rwlock = .init,
.modules = .empty,
.ranges = .empty,
.unwind_cache = null,
@ -29,8 +29,8 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
const module = try si.findModule(gpa, address, .exclusive);
defer si.rwlock.unlock();
const module = try si.findModule(gpa, io, address, .exclusive);
defer si.rwlock.unlock(io);
const vaddr = address - module.load_offset;
@ -73,15 +73,15 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st
error.OutOfMemory => |e| return e,
};
}
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
const module = try si.findModule(gpa, address, .shared);
defer si.rwlock.unlockShared();
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
const module = try si.findModule(gpa, io, address, .shared);
defer si.rwlock.unlockShared(io);
if (module.name.len == 0) return error.MissingDebugInfo;
return module.name;
}
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
const module = try si.findModule(gpa, address, .shared);
defer si.rwlock.unlockShared();
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
const module = try si.findModule(gpa, io, address, .shared);
defer si.rwlock.unlockShared(io);
return module.load_offset;
}
@ -183,8 +183,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
comptime assert(can_unwind);
{
si.rwlock.lockShared();
defer si.rwlock.unlockShared();
si.rwlock.lockSharedUncancelable(io);
defer si.rwlock.unlockShared(io);
if (si.unwind_cache) |cache| {
if (Dwarf.SelfUnwinder.CacheEntry.find(cache, context.pc)) |entry| {
return context.next(gpa, entry);
@ -192,8 +192,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
}
}
const module = try si.findModule(gpa, context.pc, .exclusive);
defer si.rwlock.unlock();
const module = try si.findModule(gpa, io, context.pc, .exclusive);
defer si.rwlock.unlock(io);
if (si.unwind_cache == null) {
si.unwind_cache = try gpa.alloc(Dwarf.SelfUnwinder.CacheEntry, 2048);
@ -375,11 +375,11 @@ const Module = struct {
}
};
fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared, exclusive }) Error!*Module {
fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum { shared, exclusive }) Error!*Module {
// With the requested lock, scan the module ranges looking for `address`.
switch (lock) {
.shared => si.rwlock.lockShared(),
.exclusive => si.rwlock.lock(),
.shared => si.rwlock.lockSharedUncancelable(io),
.exclusive => si.rwlock.lockUncancelable(io),
}
for (si.ranges.items) |*range| {
if (address >= range.start and address < range.start + range.len) {
@ -390,14 +390,14 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
// a new module was loaded. Upgrade to an exclusive lock if necessary.
switch (lock) {
.shared => {
si.rwlock.unlockShared();
si.rwlock.lock();
si.rwlock.unlockShared(io);
si.rwlock.lockUncancelable(io);
},
.exclusive => {},
}
// Rebuild module list with the exclusive lock.
{
errdefer si.rwlock.unlock();
errdefer si.rwlock.unlock(io);
for (si.modules.items) |*mod| {
unwind: {
const u = &(mod.unwind orelse break :unwind catch break :unwind);
@ -416,8 +416,8 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
// Downgrade the lock back to shared if necessary.
switch (lock) {
.shared => {
si.rwlock.unlock();
si.rwlock.lockShared();
si.rwlock.unlock(io);
si.rwlock.lockSharedUncancelable(io);
},
.exclusive => {},
}
@ -429,8 +429,8 @@ fn findModule(si: *SelfInfo, gpa: Allocator, address: usize, lock: enum { shared
}
// Still nothing; unlock and error.
switch (lock) {
.shared => si.rwlock.unlockShared(),
.exclusive => si.rwlock.unlock(),
.shared => si.rwlock.unlockShared(io),
.exclusive => si.rwlock.unlock(io),
}
return error.MissingDebugInfo;
}

View file

@ -1,9 +1,9 @@
mutex: std.Thread.Mutex,
mutex: Io.Mutex,
/// Accessed through `Module.Adapter`.
modules: std.ArrayHashMapUnmanaged(Module, void, Module.Context, false),
pub const init: SelfInfo = .{
.mutex = .{},
.mutex = .init,
.modules = .empty,
};
pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
@ -21,8 +21,8 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
const module = try si.findModule(gpa, address);
defer si.mutex.unlock();
const module = try si.findModule(gpa, io, address);
defer si.mutex.unlock(io);
const file = try module.getFile(gpa, io);
@ -76,9 +76,10 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st
) catch null,
};
}
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
_ = si;
_ = gpa;
_ = io;
// This function is marked as deprecated; however, it is significantly more
// performant than `dladdr` (since the latter also does a very slow symbol
// lookup), so let's use it since it's still available.
@ -86,9 +87,9 @@ pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]cons
@ptrFromInt(address),
) orelse return error.MissingDebugInfo);
}
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
const module = try si.findModule(gpa, address);
defer si.mutex.unlock();
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
const module = try si.findModule(gpa, io, address);
defer si.mutex.unlock(io);
const header: *std.macho.mach_header_64 = @ptrFromInt(module.text_base);
const raw_macho: [*]u8 = @ptrCast(header);
var it = macho.LoadCommandIterator.init(header, raw_macho[@sizeOf(macho.mach_header_64)..][0..header.sizeofcmds]) catch unreachable;
@ -107,8 +108,7 @@ pub const UnwindContext = std.debug.Dwarf.SelfUnwinder;
/// If the compact encoding can't encode a way to unwind a frame, it will
/// defer unwinding to DWARF, in which case `__eh_frame` will be used if available.
pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContext) Error!usize {
_ = io;
return unwindFrameInner(si, gpa, context) catch |err| switch (err) {
return unwindFrameInner(si, gpa, io, context) catch |err| switch (err) {
error.InvalidDebugInfo,
error.MissingDebugInfo,
error.UnsupportedDebugInfo,
@ -134,9 +134,9 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
=> return error.InvalidDebugInfo,
};
}
fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, context: *UnwindContext) !usize {
const module = try si.findModule(gpa, context.pc);
defer si.mutex.unlock();
fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContext) !usize {
const module = try si.findModule(gpa, io, context.pc);
defer si.mutex.unlock(io);
const unwind: *Module.Unwind = try module.getUnwindInfo(gpa);
@ -430,15 +430,15 @@ fn unwindFrameInner(si: *SelfInfo, gpa: Allocator, context: *UnwindContext) !usi
}
/// Acquires the mutex on success.
fn findModule(si: *SelfInfo, gpa: Allocator, address: usize) Error!*Module {
fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!*Module {
// This function is marked as deprecated; however, it is significantly more
// performant than `dladdr` (since the latter also does a very slow symbol
// lookup), so let's use it since it's still available.
const text_base = std.c._dyld_get_image_header_containing_address(
@ptrFromInt(address),
) orelse return error.MissingDebugInfo;
si.mutex.lock();
errdefer si.mutex.unlock();
try si.mutex.lock(io);
errdefer si.mutex.unlock(io);
const gop = try si.modules.getOrPutAdapted(gpa, @intFromPtr(text_base), Module.Adapter{});
errdefer comptime unreachable;
if (!gop.found_existing) gop.key_ptr.* = .{

View file

@ -1,9 +1,9 @@
mutex: std.Thread.Mutex,
mutex: Io.Mutex,
modules: std.ArrayList(Module),
module_name_arena: std.heap.ArenaAllocator.State,
pub const init: SelfInfo = .{
.mutex = .{},
.mutex = .init,
.modules = .empty,
.module_name_arena = .{},
};
@ -21,21 +21,21 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
}
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
si.mutex.lock();
defer si.mutex.unlock();
try si.mutex.lock(io);
defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
const di = try module.getDebugInfo(gpa, io);
return di.getSymbol(gpa, address - module.base_address);
}
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, address: usize) Error![]const u8 {
si.mutex.lock();
defer si.mutex.unlock();
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
try si.mutex.lock(io);
defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
return module.name;
}
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, address: usize) Error!usize {
si.mutex.lock();
defer si.mutex.unlock();
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
try si.mutex.lock(io);
defer si.mutex.unlock(io);
const module = try si.findModule(gpa, address);
return module.base_address;
}

View file

@ -62,7 +62,7 @@ const Thread = struct {
///
/// Threads lock this before accessing their own state in order
/// to support freelist reclamation.
mutex: std.Thread.Mutex = .{},
mutex: std.atomic.Mutex = .unlocked,
/// For each size class, tracks the next address to be returned from
/// `alloc` when the freelist is empty.

View file

@ -1,7 +1,14 @@
//! Wraps a non-thread-safe allocator and makes it thread-safe.
//! Deprecated. Thread safety should be built into each Allocator instance
//! directly rather than trying to do this "composable allocators" thing.
const ThreadSafeAllocator = @This();
const std = @import("../std.zig");
const Io = std.Io;
const Allocator = std.mem.Allocator;
child_allocator: Allocator,
mutex: std.Thread.Mutex = .{},
io: Io,
mutex: Io.Mutex = .init,
pub fn allocator(self: *ThreadSafeAllocator) Allocator {
return .{
@ -17,39 +24,39 @@ pub fn allocator(self: *ThreadSafeAllocator) Allocator {
fn alloc(ctx: *anyopaque, n: usize, alignment: std.mem.Alignment, ra: usize) ?[*]u8 {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
self.mutex.lock();
defer self.mutex.unlock();
const io = self.io;
self.mutex.lockUncancelable(io);
defer self.mutex.unlock(io);
return self.child_allocator.rawAlloc(n, alignment, ra);
}
fn resize(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) bool {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
const io = self.io;
self.mutex.lock();
defer self.mutex.unlock();
self.mutex.lockUncancelable(io);
defer self.mutex.unlock(io);
return self.child_allocator.rawResize(buf, alignment, new_len, ret_addr);
}
fn remap(context: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, return_address: usize) ?[*]u8 {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(context));
const io = self.io;
self.mutex.lock();
defer self.mutex.unlock();
self.mutex.lockUncancelable(io);
defer self.mutex.unlock(io);
return self.child_allocator.rawRemap(memory, alignment, new_len, return_address);
}
fn free(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, ret_addr: usize) void {
const self: *ThreadSafeAllocator = @ptrCast(@alignCast(ctx));
const io = self.io;
self.mutex.lock();
defer self.mutex.unlock();
self.mutex.lockUncancelable(io);
defer self.mutex.unlock(io);
return self.child_allocator.rawFree(buf, alignment, ret_addr);
}
const std = @import("../std.zig");
const ThreadSafeAllocator = @This();
const Allocator = std.mem.Allocator;

View file

@ -126,16 +126,6 @@ pub const Config = struct {
/// Whether the allocator may be used simultaneously from multiple threads.
thread_safe: bool = !builtin.single_threaded,
/// What type of mutex you'd like to use, for thread safety.
/// when specified, the mutex type must have the same shape as `std.Thread.Mutex` and
/// `DummyMutex`, and have no required fields. Specifying this field causes
/// the `thread_safe` field to be ignored.
///
/// when null (default):
/// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled.
/// * the mutex type defaults to `DummyMutex` otherwise.
MutexType: ?type = null,
/// This is a temporary debugging trick you can use to turn segfaults into more helpful
/// logged error messages with stack trace details. The downside is that every allocation
/// will be leaked, unless used with retain_metadata!
@ -204,17 +194,8 @@ pub fn DebugAllocator(comptime config: Config) type {
const total_requested_bytes_init = if (config.enable_memory_limit) @as(usize, 0) else {};
const requested_memory_limit_init = if (config.enable_memory_limit) @as(usize, math.maxInt(usize)) else {};
const mutex_init = if (config.MutexType) |T|
T{}
else if (config.thread_safe)
std.Thread.Mutex{}
else
DummyMutex{};
const DummyMutex = struct {
inline fn lock(_: DummyMutex) void {}
inline fn unlock(_: DummyMutex) void {}
};
const have_mutex = config.thread_safe;
const mutex_init = if (have_mutex) std.Io.Mutex.init else {};
const stack_n = config.stack_trace_frames;
const one_trace_size = @sizeOf(usize) * stack_n;
@ -737,8 +718,8 @@ pub fn DebugAllocator(comptime config: Config) type {
fn alloc(context: *anyopaque, len: usize, alignment: mem.Alignment, ret_addr: usize) ?[*]u8 {
const self: *Self = @ptrCast(@alignCast(context));
self.mutex.lock();
defer self.mutex.unlock();
if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
if (config.enable_memory_limit) {
const new_req_bytes = self.total_requested_bytes + len;
@ -850,8 +831,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) bool {
const self: *Self = @ptrCast(@alignCast(context));
self.mutex.lock();
defer self.mutex.unlock();
if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@ -869,8 +850,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) ?[*]u8 {
const self: *Self = @ptrCast(@alignCast(context));
self.mutex.lock();
defer self.mutex.unlock();
if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@ -887,8 +868,8 @@ pub fn DebugAllocator(comptime config: Config) type {
return_address: usize,
) void {
const self: *Self = @ptrCast(@alignCast(context));
self.mutex.lock();
defer self.mutex.unlock();
if (have_mutex) std.Io.Threaded.mutexLock(&self.mutex);
defer if (have_mutex) std.Io.Threaded.mutexUnlock(&self.mutex);
const size_class_index: usize = @max(@bitSizeOf(usize) - @clz(old_memory.len - 1), @intFromEnum(alignment));
if (size_class_index >= self.buckets.len) {
@ -1331,18 +1312,6 @@ test "realloc large object to small object" {
try std.testing.expect(slice[16] == 0x34);
}
test "overridable mutexes" {
var gpa = DebugAllocator(.{ .MutexType = std.Thread.Mutex }){
.backing_allocator = std.testing.allocator,
.mutex = std.Thread.Mutex{},
};
defer std.testing.expect(gpa.deinit() == .ok) catch @panic("leak");
const allocator = gpa.allocator();
const ptr = try allocator.create(i32);
defer allocator.destroy(ptr);
}
test "non-page-allocator backing allocator" {
var gpa: DebugAllocator(.{
.backing_allocator_zeroes = false,

View file

@ -1,5 +1,7 @@
const std = @import("../std.zig");
const builtin = @import("builtin");
const std = @import("../std.zig");
const Io = std.Io;
const math = std.math;
const Allocator = std.mem.Allocator;
const mem = std.mem;
@ -39,12 +41,12 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
var big_frees = [1]usize{0} ** big_size_class_count;
// TODO don't do the naive locking strategy
var lock: std.Thread.Mutex = .{};
var mutex: Io.Mutex = .{};
fn alloc(ctx: *anyopaque, len: usize, alignment: mem.Alignment, return_address: usize) ?[*]u8 {
_ = ctx;
_ = return_address;
lock.lock();
defer lock.unlock();
Io.Threaded.mutexLock(&mutex);
defer Io.Threaded.mutexUnlock(&mutex);
// Make room for the freelist next pointer.
const actual_len = @max(len +| @sizeOf(usize), alignment.toByteUnits());
const slot_size = math.ceilPowerOfTwo(usize, actual_len) catch return null;
@ -88,8 +90,8 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
) bool {
_ = ctx;
_ = return_address;
lock.lock();
defer lock.unlock();
Io.Threaded.mutexLock(&mutex);
defer Io.Threaded.mutexUnlock(&mutex);
// We don't want to move anything from one size class to another, but we
// can recover bytes in between powers of two.
const buf_align = alignment.toByteUnits();
@ -127,8 +129,8 @@ pub fn SbrkAllocator(comptime sbrk: *const fn (n: usize) usize) type {
) void {
_ = ctx;
_ = return_address;
lock.lock();
defer lock.unlock();
Io.Threaded.mutexLock(&mutex);
defer Io.Threaded.mutexUnlock(&mutex);
const buf_align = alignment.toByteUnits();
const actual_len = @max(buf.len + @sizeOf(usize), buf_align);
const slot_size = math.ceilPowerOfTwoAssert(usize, actual_len);

View file

@ -3,22 +3,25 @@
//! Connections are opened in a thread-safe manner, but individual Requests are not.
//!
//! TLS support may be disabled via `std.options.http_disable_tls`.
//!
//! TODO all the lockUncancelable in this file should be changed to regular lock and
//! `error.Canceled` added to more error sets.
const Client = @This();
const builtin = @import("builtin");
const std = @import("../std.zig");
const builtin = @import("builtin");
const Io = std.Io;
const testing = std.testing;
const http = std.http;
const mem = std.mem;
const Uri = std.Uri;
const Allocator = mem.Allocator;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const Io = std.Io;
const Writer = std.Io.Writer;
const Reader = std.Io.Reader;
const HostName = std.Io.net.HostName;
const Client = @This();
pub const disable_tls = std.options.http_disable_tls;
/// Used for all client allocations. Must be thread-safe.
@ -27,7 +30,7 @@ allocator: Allocator,
io: Io,
ca_bundle: if (disable_tls) void else std.crypto.Certificate.Bundle = if (disable_tls) {} else .{},
ca_bundle_mutex: std.Thread.Mutex = .{},
ca_bundle_mutex: Io.Mutex = .init,
/// Used both for the reader and writer buffers.
tls_buffer_size: if (disable_tls) u0 else usize = if (disable_tls) 0 else std.crypto.tls.Client.min_buffer_len,
/// If non-null, ssl secrets are logged to a stream. Creating such a stream
@ -62,7 +65,7 @@ https_proxy: ?*Proxy = null,
/// A Least-Recently-Used cache of open connections to be reused.
pub const ConnectionPool = struct {
mutex: std.Thread.Mutex = .{},
mutex: Io.Mutex = .init,
/// Open connections that are currently in use.
used: std.DoublyLinkedList = .{},
/// Open connections that are not currently in use.
@ -81,9 +84,9 @@ pub const ConnectionPool = struct {
/// If no connection is found, null is returned.
///
/// Threadsafe.
pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection {
pool.mutex.lock();
defer pool.mutex.unlock();
pub fn findConnection(pool: *ConnectionPool, io: Io, criteria: Criteria) ?*Connection {
pool.mutex.lockUncancelable(io);
defer pool.mutex.unlock(io);
var next = pool.free.last;
while (next) |node| : (next = node.prev) {
@ -110,9 +113,9 @@ pub const ConnectionPool = struct {
}
/// Acquires an existing connection from the connection pool. This function is threadsafe.
pub fn acquire(pool: *ConnectionPool, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pub fn acquire(pool: *ConnectionPool, io: Io, connection: *Connection) void {
pool.mutex.lockUncancelable(io);
defer pool.mutex.unlock(io);
return pool.acquireUnsafe(connection);
}
@ -122,8 +125,8 @@ pub const ConnectionPool = struct {
///
/// Threadsafe.
pub fn release(pool: *ConnectionPool, connection: *Connection, io: Io) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pool.mutex.lockUncancelable(io);
defer pool.mutex.unlock(io);
pool.used.remove(&connection.pool_node);
@ -147,9 +150,9 @@ pub const ConnectionPool = struct {
}
/// Adds a newly created node to the pool of used connections. This function is threadsafe.
pub fn addUsed(pool: *ConnectionPool, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pub fn addUsed(pool: *ConnectionPool, io: Io, connection: *Connection) void {
pool.mutex.lockUncancelable(io);
defer pool.mutex.unlock(io);
pool.used.append(&connection.pool_node);
}
@ -159,9 +162,9 @@ pub const ConnectionPool = struct {
/// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size.
///
/// Threadsafe.
pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pub fn resize(pool: *ConnectionPool, io: Io, allocator: Allocator, new_size: usize) void {
pool.mutex.lockUncancelable(io);
defer pool.mutex.unlock(io);
const next = pool.free.first;
_ = next;
@ -182,7 +185,7 @@ pub const ConnectionPool = struct {
///
/// Threadsafe.
pub fn deinit(pool: *ConnectionPool, io: Io) void {
pool.mutex.lock();
pool.mutex.lockUncancelable(io);
var next = pool.free.first;
while (next) |node| {
@ -1308,9 +1311,11 @@ pub fn deinit(client: *Client) void {
/// Uses `arena` for a few small allocations that must outlive the client, or
/// at least until those fields are set to different values.
pub fn initDefaultProxies(client: *Client, arena: Allocator, environ_map: *std.process.Environ.Map) !void {
const io = client.io;
// Prevent any new connections from being created.
client.connection_pool.mutex.lock();
defer client.connection_pool.mutex.unlock();
client.connection_pool.mutex.lockUncancelable(io);
defer client.connection_pool.mutex.unlock(io);
assert(client.connection_pool.used.first == null); // There are active requests.
@ -1437,7 +1442,7 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
const proxied_host = options.proxied_host orelse host;
const proxied_port = options.proxied_port orelse port;
if (client.connection_pool.findConnection(.{
if (client.connection_pool.findConnection(io, .{
.host = proxied_host,
.port = proxied_port,
.protocol = protocol,
@ -1455,12 +1460,12 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
error.Canceled => |e| return e,
else => return error.TlsInitializationFailed,
};
client.connection_pool.addUsed(&tc.connection);
client.connection_pool.addUsed(io, &tc.connection);
return &tc.connection;
},
.plain => {
const pc = try Connection.Plain.create(client, proxied_host, proxied_port, stream);
client.connection_pool.addUsed(&pc.connection);
client.connection_pool.addUsed(io, &pc.connection);
return &pc.connection;
},
}
@ -1474,7 +1479,7 @@ pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{N
pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connection {
const io = client.io;
if (client.connection_pool.findConnection(.{
if (client.connection_pool.findConnection(io, .{
.host = path,
.port = 0,
.protocol = .plain,
@ -1516,7 +1521,7 @@ pub fn connectProxied(
const io = client.io;
if (!proxy.supports_connect) return error.TunnelNotSupported;
if (client.connection_pool.findConnection(.{
if (client.connection_pool.findConnection(io, .{
.host = proxied_host,
.port = proxied_port,
.protocol = proxy.protocol,
@ -1691,8 +1696,8 @@ pub fn request(
if (protocol == .tls) {
if (disable_tls) unreachable;
{
client.ca_bundle_mutex.lock();
defer client.ca_bundle_mutex.unlock();
client.ca_bundle_mutex.lockUncancelable(io);
defer client.ca_bundle_mutex.unlock(io);
if (client.now == null) {
const now = try Io.Clock.real.now(io);

View file

@ -1,71 +0,0 @@
const std = @import("std.zig");
const builtin = @import("builtin");
const testing = std.testing;
pub fn once(comptime f: fn () void) Once(f) {
return Once(f){};
}
/// An object that executes the function `f` just once.
/// It is undefined behavior if `f` re-enters the same Once instance.
pub fn Once(comptime f: fn () void) type {
return struct {
done: bool = false,
mutex: std.Thread.Mutex = std.Thread.Mutex{},
/// Call the function `f`.
/// If `call` is invoked multiple times `f` will be executed only the
/// first time.
/// The invocations are thread-safe.
pub fn call(self: *@This()) void {
if (@atomicLoad(bool, &self.done, .acquire))
return;
return self.callSlow();
}
fn callSlow(self: *@This()) void {
@branchHint(.cold);
self.mutex.lock();
defer self.mutex.unlock();
// The first thread to acquire the mutex gets to run the initializer
if (!self.done) {
f();
@atomicStore(bool, &self.done, true, .release);
}
}
};
}
var global_number: i32 = 0;
var global_once = once(incr);
fn incr() void {
global_number += 1;
}
test "Once executes its function just once" {
if (builtin.single_threaded) {
global_once.call();
global_once.call();
} else {
var threads: [10]std.Thread = undefined;
var thread_count: usize = 0;
defer for (threads[0..thread_count]) |handle| handle.join();
for (&threads) |*handle| {
handle.* = try std.Thread.spawn(.{}, struct {
fn thread_fn(x: u8) void {
_ = x;
global_once.call();
if (global_number != 1) @panic("memory ordering bug");
}
}.thread_fn, .{0});
thread_count += 1;
}
}
try testing.expectEqual(@as(i32, 1), global_number);
}

View file

@ -86,7 +86,6 @@ pub const math = @import("math.zig");
pub const mem = @import("mem.zig");
pub const meta = @import("meta.zig");
pub const os = @import("os.zig");
pub const once = @import("once.zig").once;
pub const pdb = @import("pdb.zig");
pub const pie = @import("pie.zig");
pub const posix = @import("posix.zig");