fetch: implement recompression

After fetching a package and applying the filter by deleting files that
are not part of the hash, creates a recompressed $GLOBAL_CACHE/p/$PKG_HASH.tar.gz

Checking this cache before fetching network URLs is not yet implemented.
This commit is contained in:
Andrew Kelley 2026-02-04 21:40:06 -08:00
parent df64a3a368
commit ee21a1f988
3 changed files with 136 additions and 14 deletions

View file

@ -1031,6 +1031,9 @@ pub const Group = struct {
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
///
/// If `error.Canceled` is returned from any operation this task performs,
/// it is asserted that `function` returns `error.Canceled`.
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
@ -1050,6 +1053,9 @@ pub const Group = struct {
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
///
/// If `error.Canceled` is returned from any operation this task performs,
/// it is asserted that `function` returns `error.Canceled`.
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {

View file

@ -267,7 +267,7 @@ pub const Options = struct {
pub const best = level_9;
};
/// It is asserted `buffer` is least `flate.max_history_len` bytes.
/// It is asserted `buffer` is least `flate.max_window_len` bytes.
/// It is asserted `output` has a capacity of at least 8 bytes.
pub fn init(
output: *Writer,

View file

@ -1,28 +1,34 @@
//! Represents one independent job whose responsibility is to:
//!
//! 1. Check the global zig package cache to see if the hash already exists.
//! 1. Check the local zig package directory to see if the hash already exists.
//! If so, load, parse, and validate the build.zig.zon file therein, and
//! goto step 8. Likewise if the location is a relative path, treat this
//! goto step 9. Likewise if the location is a relative path, treat this
//! the same as a cache hit. Otherwise, proceed.
//! 2. Fetch and unpack a URL into a temporary directory.
//! 3. Load, parse, and validate the build.zig.zon file therein. It is allowed
//! 2. Check the global package cache for a compressed tarball matching the
//! hash. If it is found, unpack the contents into a temporary directory inside
//! project local zig cache. Rename this directory into the local zig package
//! directory and goto step 9, skipping step 10.
//! 3. Fetch and unpack a URL into a temporary directory.
//! 4. Load, parse, and validate the build.zig.zon file therein. It is allowed
//! for the file to be missing, in which case this fetched package is considered
//! to be a "naked" package.
//! 4. Apply inclusion rules of the build.zig.zon to the temporary directory by
//! 5. Apply inclusion rules of the build.zig.zon to the temporary directory by
//! deleting excluded files. If any files had errors for files that were
//! ultimately excluded, those errors should be ignored, such as failure to
//! create symlinks that weren't supposed to be included anyway.
//! 5. Compute the package hash based on the remaining files in the temporary
//! 6. Compute the package hash based on the remaining files in the temporary
//! directory.
//! 6. Rename the temporary directory into the global zig package cache
//! directory. If the hash already exists, delete the temporary directory and
//! leave the zig package cache directory untouched as it may be in use by the
//! system. This is done even if the hash is invalid, in case the package with
//! the different hash is used in the future.
//! 7. Validate the computed hash against the expected hash. If invalid,
//! 7. Rename the temporary directory into the local zig package directory. If
//! the hash already exists, delete the temporary directory and leave the zig
//! package directory untouched as it may be in use. This is done even if
//! the hash is invalid, in case the package with the different hash is used
//! in the future.
//! 8. Validate the computed hash against the expected hash. If invalid,
//! this job is done.
//! 8. Spawn a new fetch job for each dependency in the manifest file. Use
//! 9. Spawn a new fetch job for each dependency in the manifest file. Use
//! a mutex and a hash map so that redundant jobs do not get queued up.
//! 10.Compress the package directory and store it into the global package
//! cache.
//!
//! All of this must be done with only referring to the state inside this struct
//! because this work will be done in a dedicated thread.
@ -110,6 +116,7 @@ pub const JobQueue = struct {
all_fetches: std.ArrayList(*Fetch) = .empty,
http_client: *std.http.Client,
/// This tracks `Fetch` tasks as well as recompression tasks.
group: Io.Group = .init,
global_cache: Cache.Directory,
local_cache: Cache.Path,
@ -293,8 +300,109 @@ pub const JobQueue = struct {
\\
);
}
fn recompress(jq: *JobQueue, package_hash: Package.Hash) Io.Cancelable!void {
var dest_sub_path_buffer: ["p/".len + Package.Hash.max_len + ".tar.gz".len]u8 = undefined;
const dest_path: Cache.Path = .{
.root_dir = jq.global_cache,
.sub_path = std.fmt.bufPrint(&dest_sub_path_buffer, "p/{s}.tar.gz", .{
package_hash.toSlice(),
}) catch unreachable,
};
const gpa = jq.http_client.allocator;
var arena_instance = std.heap.ArenaAllocator.init(gpa);
defer arena_instance.deinit();
const arena = arena_instance.allocator();
recompressFallible(jq, arena, dest_path, package_hash.toSlice()) catch |err| switch (err) {
error.Canceled => |e| return e,
error.ReadFailed => comptime unreachable,
error.WriteFailed => comptime unreachable,
else => |e| std.log.warn("failed caching recompressed tarball to {f}: {t}", .{ dest_path, e }),
};
}
fn recompressFallible(jq: *JobQueue, arena: Allocator, dest_path: Cache.Path, package_hash: []const u8) !void {
const gpa = jq.http_client.allocator;
const io = jq.io;
// We have to walk the file system up front in order to sort the file
// list for determinism purposes. The hash of the recompressed file is
// not critical because the true hash is based on the content alone.
// However, if we want Zig users to be able to share cached package
// data with each other via peer-to-peer protocols, we benefit greatly
// from the data being identical on everyone's computers.
var scanned_files: std.ArrayList([]const u8) = .empty;
defer scanned_files.deinit(gpa);
var pkg_dir = try jq.root_pkg_path.openDir(io, package_hash, .{ .iterate = true });
defer pkg_dir.close(io);
{
var walker = try pkg_dir.walk(gpa);
defer walker.deinit();
while (try walker.next(io)) |entry| {
switch (entry.kind) {
.directory => continue,
.file, .sym_link => {},
else => {
return error.IllegalFileType;
},
}
const entry_path = try arena.dupe(u8, entry.path);
try scanned_files.append(gpa, entry_path);
}
std.mem.sortUnstable([]const u8, scanned_files.items, {}, stringCmp);
}
var atomic_file = try dest_path.root_dir.handle.createFileAtomic(io, dest_path.sub_path, .{
.make_path = true,
.replace = true,
});
defer atomic_file.deinit(io);
var file_write_buffer: [4096]u8 = undefined;
var file_writer = atomic_file.file.writer(io, &file_write_buffer);
var compress_buffer: [std.compress.flate.max_window_len]u8 = undefined;
var compress = std.compress.flate.Compress.init(&file_writer.interface, &compress_buffer, .gzip, .level_9) catch |err| switch (err) {
error.WriteFailed => return file_writer.err.?,
};
var archiver: std.tar.Writer = .{ .underlying_writer = &compress.writer };
archiver.prefix = package_hash;
var file_read_buffer: [4096]u8 = undefined;
for (scanned_files.items) |entry_path| {
var file = try pkg_dir.openFile(io, entry_path, .{});
defer file.close(io);
var file_reader: Io.File.Reader = .init(file, io, &file_read_buffer);
archiver.writeFile(entry_path, &file_reader, 0) catch |err| switch (err) {
error.ReadFailed => return file_reader.err.?,
error.WriteFailed => return file_writer.err.?,
else => |e| return e,
};
}
// intentionally omitting the pointless trailer
//try archiver.finish();
compress.writer.flush() catch |err| switch (err) {
error.WriteFailed => return file_writer.err.?,
};
try file_writer.flush();
try atomic_file.replace(io);
}
};
fn stringCmp(_: void, lhs: []const u8, rhs: []const u8) bool {
return std.mem.lessThan(u8, lhs, rhs);
}
pub const Location = union(enum) {
remote: Remote,
/// A directory found inside the parent package.
@ -477,8 +585,11 @@ fn runResource(
remote_hash: ?Package.Hash,
) RunError!void {
const job_queue = f.job_queue;
assert(!job_queue.read_only);
const io = job_queue.io;
defer resource.deinit(io);
const arena = f.arena.allocator();
const eb = &f.error_bundle;
const s = fs.path.sep_str;
@ -556,6 +667,11 @@ fn runResource(
) });
return error.FetchFailed;
};
// Spin off a task to recompress the tarball, with filtered files deleted, into
// the global cache.
job_queue.group.async(io, JobQueue.recompress, .{ job_queue, computed_package_hash });
// Remove temporary directory root if not already renamed to global cache.
if (!package_sub_path.eql(tmp_directory_path)) {
tmp_directory_path.root_dir.handle.deleteDir(io, tmp_directory_path.sub_path) catch |err| switch (err) {