diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 47ba7c2072..c56ec68866 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1031,6 +1031,9 @@ pub const Group = struct { /// Once this function is called, there are resources associated with the /// group. To release those resources, `Group.await` or `Group.cancel` must /// eventually be called. + /// + /// If `error.Canceled` is returned from any operation this task performs, + /// it is asserted that `function` returns `error.Canceled`. pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { @@ -1050,6 +1053,9 @@ pub const Group = struct { /// Once this function is called, there are resources associated with the /// group. To release those resources, `Group.await` or `Group.cancel` must /// eventually be called. + /// + /// If `error.Canceled` is returned from any operation this task performs, + /// it is asserted that `function` returns `error.Canceled`. pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { const Args = @TypeOf(args); const TypeErased = struct { diff --git a/lib/std/compress/flate/Compress.zig b/lib/std/compress/flate/Compress.zig index 41b7d8bf04..0a85dd9d0a 100644 --- a/lib/std/compress/flate/Compress.zig +++ b/lib/std/compress/flate/Compress.zig @@ -267,7 +267,7 @@ pub const Options = struct { pub const best = level_9; }; -/// It is asserted `buffer` is least `flate.max_history_len` bytes. +/// It is asserted `buffer` is least `flate.max_window_len` bytes. /// It is asserted `output` has a capacity of at least 8 bytes. pub fn init( output: *Writer, diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index a9cc86b398..e9e340c099 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -1,28 +1,34 @@ //! Represents one independent job whose responsibility is to: //! -//! 1. Check the global zig package cache to see if the hash already exists. +//! 1. Check the local zig package directory to see if the hash already exists. //! If so, load, parse, and validate the build.zig.zon file therein, and -//! goto step 8. Likewise if the location is a relative path, treat this +//! goto step 9. Likewise if the location is a relative path, treat this //! the same as a cache hit. Otherwise, proceed. -//! 2. Fetch and unpack a URL into a temporary directory. -//! 3. Load, parse, and validate the build.zig.zon file therein. It is allowed +//! 2. Check the global package cache for a compressed tarball matching the +//! hash. If it is found, unpack the contents into a temporary directory inside +//! project local zig cache. Rename this directory into the local zig package +//! directory and goto step 9, skipping step 10. +//! 3. Fetch and unpack a URL into a temporary directory. +//! 4. Load, parse, and validate the build.zig.zon file therein. It is allowed //! for the file to be missing, in which case this fetched package is considered //! to be a "naked" package. -//! 4. Apply inclusion rules of the build.zig.zon to the temporary directory by +//! 5. Apply inclusion rules of the build.zig.zon to the temporary directory by //! deleting excluded files. If any files had errors for files that were //! ultimately excluded, those errors should be ignored, such as failure to //! create symlinks that weren't supposed to be included anyway. -//! 5. Compute the package hash based on the remaining files in the temporary +//! 6. Compute the package hash based on the remaining files in the temporary //! directory. -//! 6. Rename the temporary directory into the global zig package cache -//! directory. If the hash already exists, delete the temporary directory and -//! leave the zig package cache directory untouched as it may be in use by the -//! system. This is done even if the hash is invalid, in case the package with -//! the different hash is used in the future. -//! 7. Validate the computed hash against the expected hash. If invalid, +//! 7. Rename the temporary directory into the local zig package directory. If +//! the hash already exists, delete the temporary directory and leave the zig +//! package directory untouched as it may be in use. This is done even if +//! the hash is invalid, in case the package with the different hash is used +//! in the future. +//! 8. Validate the computed hash against the expected hash. If invalid, //! this job is done. -//! 8. Spawn a new fetch job for each dependency in the manifest file. Use +//! 9. Spawn a new fetch job for each dependency in the manifest file. Use //! a mutex and a hash map so that redundant jobs do not get queued up. +//! 10.Compress the package directory and store it into the global package +//! cache. //! //! All of this must be done with only referring to the state inside this struct //! because this work will be done in a dedicated thread. @@ -110,6 +116,7 @@ pub const JobQueue = struct { all_fetches: std.ArrayList(*Fetch) = .empty, http_client: *std.http.Client, + /// This tracks `Fetch` tasks as well as recompression tasks. group: Io.Group = .init, global_cache: Cache.Directory, local_cache: Cache.Path, @@ -293,8 +300,109 @@ pub const JobQueue = struct { \\ ); } + + fn recompress(jq: *JobQueue, package_hash: Package.Hash) Io.Cancelable!void { + var dest_sub_path_buffer: ["p/".len + Package.Hash.max_len + ".tar.gz".len]u8 = undefined; + const dest_path: Cache.Path = .{ + .root_dir = jq.global_cache, + .sub_path = std.fmt.bufPrint(&dest_sub_path_buffer, "p/{s}.tar.gz", .{ + package_hash.toSlice(), + }) catch unreachable, + }; + + const gpa = jq.http_client.allocator; + + var arena_instance = std.heap.ArenaAllocator.init(gpa); + defer arena_instance.deinit(); + const arena = arena_instance.allocator(); + + recompressFallible(jq, arena, dest_path, package_hash.toSlice()) catch |err| switch (err) { + error.Canceled => |e| return e, + error.ReadFailed => comptime unreachable, + error.WriteFailed => comptime unreachable, + else => |e| std.log.warn("failed caching recompressed tarball to {f}: {t}", .{ dest_path, e }), + }; + } + + fn recompressFallible(jq: *JobQueue, arena: Allocator, dest_path: Cache.Path, package_hash: []const u8) !void { + const gpa = jq.http_client.allocator; + const io = jq.io; + + // We have to walk the file system up front in order to sort the file + // list for determinism purposes. The hash of the recompressed file is + // not critical because the true hash is based on the content alone. + // However, if we want Zig users to be able to share cached package + // data with each other via peer-to-peer protocols, we benefit greatly + // from the data being identical on everyone's computers. + var scanned_files: std.ArrayList([]const u8) = .empty; + defer scanned_files.deinit(gpa); + + var pkg_dir = try jq.root_pkg_path.openDir(io, package_hash, .{ .iterate = true }); + defer pkg_dir.close(io); + + { + var walker = try pkg_dir.walk(gpa); + defer walker.deinit(); + + while (try walker.next(io)) |entry| { + switch (entry.kind) { + .directory => continue, + .file, .sym_link => {}, + else => { + return error.IllegalFileType; + }, + } + const entry_path = try arena.dupe(u8, entry.path); + try scanned_files.append(gpa, entry_path); + } + + std.mem.sortUnstable([]const u8, scanned_files.items, {}, stringCmp); + } + + var atomic_file = try dest_path.root_dir.handle.createFileAtomic(io, dest_path.sub_path, .{ + .make_path = true, + .replace = true, + }); + defer atomic_file.deinit(io); + + var file_write_buffer: [4096]u8 = undefined; + var file_writer = atomic_file.file.writer(io, &file_write_buffer); + + var compress_buffer: [std.compress.flate.max_window_len]u8 = undefined; + var compress = std.compress.flate.Compress.init(&file_writer.interface, &compress_buffer, .gzip, .level_9) catch |err| switch (err) { + error.WriteFailed => return file_writer.err.?, + }; + + var archiver: std.tar.Writer = .{ .underlying_writer = &compress.writer }; + archiver.prefix = package_hash; + + var file_read_buffer: [4096]u8 = undefined; + + for (scanned_files.items) |entry_path| { + var file = try pkg_dir.openFile(io, entry_path, .{}); + defer file.close(io); + var file_reader: Io.File.Reader = .init(file, io, &file_read_buffer); + archiver.writeFile(entry_path, &file_reader, 0) catch |err| switch (err) { + error.ReadFailed => return file_reader.err.?, + error.WriteFailed => return file_writer.err.?, + else => |e| return e, + }; + } + + // intentionally omitting the pointless trailer + //try archiver.finish(); + compress.writer.flush() catch |err| switch (err) { + error.WriteFailed => return file_writer.err.?, + }; + try file_writer.flush(); + try atomic_file.replace(io); + } }; +fn stringCmp(_: void, lhs: []const u8, rhs: []const u8) bool { + return std.mem.lessThan(u8, lhs, rhs); +} + pub const Location = union(enum) { remote: Remote, /// A directory found inside the parent package. @@ -477,8 +585,11 @@ fn runResource( remote_hash: ?Package.Hash, ) RunError!void { const job_queue = f.job_queue; + assert(!job_queue.read_only); + const io = job_queue.io; defer resource.deinit(io); + const arena = f.arena.allocator(); const eb = &f.error_bundle; const s = fs.path.sep_str; @@ -556,6 +667,11 @@ fn runResource( ) }); return error.FetchFailed; }; + + // Spin off a task to recompress the tarball, with filtered files deleted, into + // the global cache. + job_queue.group.async(io, JobQueue.recompress, .{ job_queue, computed_package_hash }); + // Remove temporary directory root if not already renamed to global cache. if (!package_sub_path.eql(tmp_directory_path)) { tmp_directory_path.root_dir.handle.deleteDir(io, tmp_directory_path.sub_path) catch |err| switch (err) {