From da6cbb4a459d02111c44a67d3d0dd7e654abff23 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 13 Sep 2023 14:20:21 +0200 Subject: [PATCH] refactor(tvix/store/blobsvc): make BlobStore async We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/Cargo.lock | 90 +++- tvix/Cargo.nix | 283 +++++++++-- tvix/cli/Cargo.toml | 1 + tvix/cli/src/main.rs | 9 +- tvix/cli/src/tvix_store_io.rs | 177 ++++--- tvix/store/Cargo.toml | 3 +- tvix/store/src/bin/tvix-store.rs | 21 +- tvix/store/src/blobservice/dumb_seeker.rs | 110 ---- tvix/store/src/blobservice/grpc.rs | 244 +++++---- tvix/store/src/blobservice/memory.rs | 45 +- tvix/store/src/blobservice/mod.rs | 27 +- tvix/store/src/blobservice/naive_seeker.rs | 269 ++++++++++ tvix/store/src/blobservice/sled.rs | 43 +- tvix/store/src/blobservice/tests.rs | 312 +++++++----- tvix/store/src/directoryservice/mod.rs | 2 +- tvix/store/src/fuse/mod.rs | 107 +++- tvix/store/src/fuse/tests.rs | 472 ++++++++++++------ tvix/store/src/import.rs | 18 +- tvix/store/src/nar/renderer.rs | 15 +- .../src/proto/grpc_blobservice_wrapper.rs | 53 +- .../src/proto/grpc_pathinfoservice_wrapper.rs | 10 +- tvix/store/src/proto/mod.rs | 2 - .../src/proto/sync_read_into_async_read.rs | 158 ------ tvix/store/src/tests/import.rs | 19 +- tvix/store/src/tests/nar_renderer.rs | 212 ++++---- 25 files changed, 1700 insertions(+), 1002 deletions(-) delete mode 100644 tvix/store/src/blobservice/dumb_seeker.rs create mode 100644 tvix/store/src/blobservice/naive_seeker.rs delete mode 100644 tvix/store/src/proto/sync_read_into_async_read.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index d1a31b08a..f0a73f3a3 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "1.0.1" @@ -179,6 +194,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.10.1" @@ -919,6 +949,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + [[package]] name = "glob" version = "0.3.1" @@ -1043,7 +1079,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -1243,9 +1279,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.144" +version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" [[package]] name = "libm" @@ -1305,6 +1341,15 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.6" @@ -1407,6 +1452,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.17.1" @@ -1525,9 +1579,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -1950,6 +2004,12 @@ dependencies = [ "text-size", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -2176,6 +2236,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ssri" version = "7.0.0" @@ -2432,18 +2502,18 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.0" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", "mio", "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.4", "tokio-macros", "windows-sys 0.48.0", ] @@ -2734,6 +2804,7 @@ dependencies = [ "smol_str", "ssri", "thiserror", + "tokio", "tracing", "tvix-eval", "tvix-store", @@ -2805,6 +2876,7 @@ dependencies = [ "lazy_static", "libc", "nix-compat", + "pin-project-lite", "prost", "prost-build", "rayon", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 4089008aa..e967b345e 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -129,6 +129,50 @@ rec { # inject test dependencies into the build crates = { + "addr2line" = rec { + crateName = "addr2line"; + version = "0.21.0"; + edition = "2018"; + sha256 = "1jx0k3iwyqr8klqbzk6kjvr496yd94aspis10vwsj5wy7gib4c4a"; + dependencies = [ + { + name = "gimli"; + packageId = "gimli"; + usesDefaultFeatures = false; + features = [ "read" ]; + } + ]; + features = { + "alloc" = [ "dep:alloc" ]; + "compiler_builtins" = [ "dep:compiler_builtins" ]; + "core" = [ "dep:core" ]; + "cpp_demangle" = [ "dep:cpp_demangle" ]; + "default" = [ "rustc-demangle" "cpp_demangle" "std-object" "fallible-iterator" "smallvec" "memmap2" ]; + "fallible-iterator" = [ "dep:fallible-iterator" ]; + "memmap2" = [ "dep:memmap2" ]; + "object" = [ "dep:object" ]; + "rustc-demangle" = [ "dep:rustc-demangle" ]; + "rustc-dep-of-std" = [ "core" "alloc" "compiler_builtins" "gimli/rustc-dep-of-std" ]; + "smallvec" = [ "dep:smallvec" ]; + "std" = [ "gimli/std" ]; + "std-object" = [ "std" "object" "object/std" "object/compression" "gimli/endian-reader" ]; + }; + }; + "adler" = rec { + crateName = "adler"; + version = "1.0.2"; + edition = "2015"; + sha256 = "1zim79cvzd5yrkzl3nyfx0avijwgk9fqv3yrscdy1cc79ih02qpj"; + authors = [ + "Jonas Schievink " + ]; + features = { + "compiler_builtins" = [ "dep:compiler_builtins" ]; + "core" = [ "dep:core" ]; + "default" = [ "std" ]; + "rustc-dep-of-std" = [ "core" "compiler_builtins" ]; + }; + }; "aho-corasick" = rec { crateName = "aho-corasick"; version = "1.0.1"; @@ -618,6 +662,67 @@ rec { "tracing" = [ "dep:tracing" ]; }; }; + "backtrace" = rec { + crateName = "backtrace"; + version = "0.3.69"; + edition = "2018"; + sha256 = "0dsq23dhw4pfndkx2nsa1ml2g31idm7ss7ljxp8d57avygivg290"; + authors = [ + "The Rust Project Developers" + ]; + dependencies = [ + { + name = "addr2line"; + packageId = "addr2line"; + usesDefaultFeatures = false; + target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor")))); + } + { + name = "cfg-if"; + packageId = "cfg-if"; + } + { + name = "libc"; + packageId = "libc"; + usesDefaultFeatures = false; + target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor")))); + } + { + name = "miniz_oxide"; + packageId = "miniz_oxide"; + usesDefaultFeatures = false; + target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor")))); + } + { + name = "object"; + packageId = "object"; + usesDefaultFeatures = false; + target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor")))); + features = [ "read_core" "elf" "macho" "pe" "unaligned" "archive" ]; + } + { + name = "rustc-demangle"; + packageId = "rustc-demangle"; + } + ]; + buildDependencies = [ + { + name = "cc"; + packageId = "cc"; + } + ]; + features = { + "cpp_demangle" = [ "dep:cpp_demangle" ]; + "default" = [ "std" ]; + "rustc-serialize" = [ "dep:rustc-serialize" ]; + "serde" = [ "dep:serde" ]; + "serialize-rustc" = [ "rustc-serialize" ]; + "serialize-serde" = [ "serde" ]; + "verify-winapi" = [ "winapi/dbghelp" "winapi/handleapi" "winapi/libloaderapi" "winapi/memoryapi" "winapi/minwindef" "winapi/processthreadsapi" "winapi/synchapi" "winapi/tlhelp32" "winapi/winbase" "winapi/winnt" ]; + "winapi" = [ "dep:winapi" ]; + }; + resolvedDefaultFeatures = [ "default" "std" ]; + }; "base64 0.10.1" = rec { crateName = "base64"; version = "0.10.1"; @@ -2586,6 +2691,23 @@ rec { }; resolvedDefaultFeatures = [ "std" ]; }; + "gimli" = rec { + crateName = "gimli"; + version = "0.28.0"; + edition = "2018"; + sha256 = "1h7hcl3chfvd2gfrrxjymnwj7anqxjslvz20kcargkvsya2dgf3g"; + features = { + "default" = [ "read-all" "write" ]; + "endian-reader" = [ "read" "dep:stable_deref_trait" ]; + "fallible-iterator" = [ "dep:fallible-iterator" ]; + "read" = [ "read-core" ]; + "read-all" = [ "read" "std" "fallible-iterator" "endian-reader" ]; + "rustc-dep-of-std" = [ "dep:core" "dep:alloc" "dep:compiler_builtins" ]; + "std" = [ "fallible-iterator?/std" "stable_deref_trait?/std" ]; + "write" = [ "dep:indexmap" ]; + }; + resolvedDefaultFeatures = [ "read" "read-core" ]; + }; "glob" = rec { crateName = "glob"; version = "0.3.1"; @@ -2925,7 +3047,7 @@ rec { } { name = "socket2"; - packageId = "socket2"; + packageId = "socket2 0.4.9"; optional = true; features = [ "all" ]; } @@ -3570,9 +3692,9 @@ rec { }; "libc" = rec { crateName = "libc"; - version = "0.2.144"; + version = "0.2.148"; edition = "2015"; - sha256 = "1qfzrwhncsradwvdzd8vsj4mc31fh0rb5rvny3884rwa48fcq01b"; + sha256 = "16rn9l8s5sj9n2jb2pw13ghqwa5nvjggkh9q3lp6vs1jfghp3p4w"; authors = [ "The Rust Project Developers" ]; @@ -3722,6 +3844,32 @@ rec { ]; }; + "miniz_oxide" = rec { + crateName = "miniz_oxide"; + version = "0.7.1"; + edition = "2018"; + sha256 = "1ivl3rbbdm53bzscrd01g60l46lz5krl270487d8lhjvwl5hx0g7"; + authors = [ + "Frommi " + "oyvindln " + ]; + dependencies = [ + { + name = "adler"; + packageId = "adler"; + usesDefaultFeatures = false; + } + ]; + features = { + "alloc" = [ "dep:alloc" ]; + "compiler_builtins" = [ "dep:compiler_builtins" ]; + "core" = [ "dep:core" ]; + "default" = [ "with-alloc" ]; + "rustc-dep-of-std" = [ "core" "alloc" "compiler_builtins" "adler/rustc-dep-of-std" ]; + "simd" = [ "simd-adler32" ]; + "simd-adler32" = [ "dep:simd-adler32" ]; + }; + }; "mio" = rec { crateName = "mio"; version = "0.8.6"; @@ -3762,7 +3910,7 @@ rec { features = { "os-ext" = [ "os-poll" "windows-sys/Win32_System_Pipes" "windows-sys/Win32_Security" ]; }; - resolvedDefaultFeatures = [ "default" "net" "os-ext" "os-poll" ]; + resolvedDefaultFeatures = [ "net" "os-ext" "os-poll" ]; }; "multimap" = rec { crateName = "multimap"; @@ -4035,6 +4183,38 @@ rec { ]; }; + "object" = rec { + crateName = "object"; + version = "0.32.1"; + edition = "2018"; + sha256 = "1c02x4kvqpnl3wn7gz9idm4jrbirbycyqjgiw6lm1g9k77fzkxcw"; + dependencies = [ + { + name = "memchr"; + packageId = "memchr"; + usesDefaultFeatures = false; + } + ]; + features = { + "all" = [ "read" "write" "std" "compression" "wasm" ]; + "alloc" = [ "dep:alloc" ]; + "compiler_builtins" = [ "dep:compiler_builtins" ]; + "compression" = [ "dep:flate2" "dep:ruzstd" "std" ]; + "core" = [ "dep:core" ]; + "default" = [ "read" "compression" ]; + "doc" = [ "read_core" "write_std" "std" "compression" "archive" "coff" "elf" "macho" "pe" "wasm" "xcoff" ]; + "pe" = [ "coff" ]; + "read" = [ "read_core" "archive" "coff" "elf" "macho" "pe" "xcoff" "unaligned" ]; + "rustc-dep-of-std" = [ "core" "compiler_builtins" "alloc" "memchr/rustc-dep-of-std" ]; + "std" = [ "memchr/std" ]; + "unstable-all" = [ "all" "unstable" ]; + "wasm" = [ "dep:wasmparser" ]; + "write" = [ "write_std" "coff" "elf" "macho" "pe" "xcoff" ]; + "write_core" = [ "dep:crc32fast" "dep:indexmap" "dep:hashbrown" ]; + "write_std" = [ "write_core" "std" "indexmap?/std" "crc32fast?/std" ]; + }; + resolvedDefaultFeatures = [ "archive" "coff" "elf" "macho" "pe" "read_core" "unaligned" ]; + }; "once_cell" = rec { crateName = "once_cell"; version = "1.17.1"; @@ -4316,9 +4496,9 @@ rec { }; "pin-project-lite" = rec { crateName = "pin-project-lite"; - version = "0.2.9"; + version = "0.2.13"; edition = "2018"; - sha256 = "05n1z851l356hpgqadw4ar64mjanaxq1qlwqsf2k05ziq8xax9z0"; + sha256 = "0n0bwr5qxlf0mhn2xkl36sy55118s9qmvx2yl5f3ixkb007lbywa"; }; "pin-utils" = rec { @@ -5518,6 +5698,20 @@ rec { "serde1" = [ "serde" "text-size/serde" ]; }; }; + "rustc-demangle" = rec { + crateName = "rustc-demangle"; + version = "0.1.23"; + edition = "2015"; + sha256 = "0xnbk2bmyzshacjm2g1kd4zzv2y2az14bw3sjccq5qkpmsfvn9nn"; + authors = [ + "Alex Crichton " + ]; + features = { + "compiler_builtins" = [ "dep:compiler_builtins" ]; + "core" = [ "dep:core" ]; + "rustc-dep-of-std" = [ "core" "compiler_builtins" ]; + }; + }; "rustc-hash" = rec { crateName = "rustc-hash"; version = "1.1.0"; @@ -6242,7 +6436,7 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; - "socket2" = rec { + "socket2 0.4.9" = rec { crateName = "socket2"; version = "0.4.9"; edition = "2018"; @@ -6267,6 +6461,31 @@ rec { features = { }; resolvedDefaultFeatures = [ "all" ]; }; + "socket2 0.5.4" = rec { + crateName = "socket2"; + version = "0.5.4"; + edition = "2021"; + sha256 = "17lqx8w2b3nysrkdbdz8y7fkikz5v77c052q57lxwajmxchfhca0"; + authors = [ + "Alex Crichton " + "Thomas de Zeeuw " + ]; + dependencies = [ + { + name = "libc"; + packageId = "libc"; + target = { target, features }: (target."unix" or false); + } + { + name = "windows-sys"; + packageId = "windows-sys 0.48.0"; + target = { target, features }: (target."windows" or false); + features = [ "Win32_Foundation" "Win32_Networking_WinSock" "Win32_System_IO" "Win32_System_Threading" "Win32_System_WindowsProgramming" ]; + } + ]; + features = { }; + resolvedDefaultFeatures = [ "all" ]; + }; "ssri" = rec { crateName = "ssri"; version = "7.0.0"; @@ -6909,13 +7128,18 @@ rec { }; "tokio" = rec { crateName = "tokio"; - version = "1.28.0"; + version = "1.32.0"; edition = "2021"; - sha256 = "0vqk7dkmvadzqrxwlgja04wlf4s8iymjk6yvcshs7r9lh6zqdiy3"; + sha256 = "1yck1349q23l22bgxcbqd3wkaffw2vmkf7z26m3wgmkcxmvn1v8p"; authors = [ "Tokio Contributors " ]; dependencies = [ + { + name = "backtrace"; + packageId = "backtrace"; + target = { target, features }: (target."tokio_taskdump" or false); + } { name = "bytes"; packageId = "bytes"; @@ -6931,6 +7155,7 @@ rec { name = "mio"; packageId = "mio"; optional = true; + usesDefaultFeatures = false; } { name = "num_cpus"; @@ -6949,9 +7174,9 @@ rec { } { name = "socket2"; - packageId = "socket2"; + packageId = "socket2 0.5.4"; optional = true; - target = { target, features }: (!(("wasm32" == target."arch") || ("wasm64" == target."arch"))); + target = { target, features }: (!(builtins.elem "wasm" target."family")); features = [ "all" ]; } { @@ -6959,12 +7184,6 @@ rec { packageId = "tokio-macros"; optional = true; } - { - name = "windows-sys"; - packageId = "windows-sys 0.48.0"; - target = { target, features }: (target."docsrs" or false); - features = [ "Win32_Foundation" "Win32_Security_Authorization" ]; - } { name = "windows-sys"; packageId = "windows-sys 0.48.0"; @@ -6972,12 +7191,6 @@ rec { target = { target, features }: (target."windows" or false); } ]; - buildDependencies = [ - { - name = "autocfg"; - packageId = "autocfg"; - } - ]; devDependencies = [ { name = "libc"; @@ -6986,8 +7199,14 @@ rec { } { name = "socket2"; - packageId = "socket2"; - target = { target, features }: (!(("wasm32" == target."arch") || ("wasm64" == target."arch"))); + packageId = "socket2 0.5.4"; + target = { target, features }: (!(builtins.elem "wasm" target."family")); + } + { + name = "windows-sys"; + packageId = "windows-sys 0.48.0"; + target = { target, features }: (target."windows" or false); + features = [ "Win32_Foundation" "Win32_Security_Authorization" ]; } ]; features = { @@ -7010,7 +7229,7 @@ rec { "tracing" = [ "dep:tracing" ]; "windows-sys" = [ "dep:windows-sys" ]; }; - resolvedDefaultFeatures = [ "bytes" "default" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ]; + resolvedDefaultFeatures = [ "bytes" "default" "fs" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ]; }; "tokio-io-timeout" = rec { crateName = "tokio-io-timeout"; @@ -8073,6 +8292,10 @@ rec { name = "thiserror"; packageId = "thiserror"; } + { + name = "tokio"; + packageId = "tokio"; + } { name = "tracing"; packageId = "tracing"; @@ -8354,6 +8577,10 @@ rec { name = "nix-compat"; packageId = "nix-compat"; } + { + name = "pin-project-lite"; + packageId = "pin-project-lite"; + } { name = "prost"; packageId = "prost"; @@ -8386,7 +8613,7 @@ rec { { name = "tokio"; packageId = "tokio"; - features = [ "net" "rt-multi-thread" "signal" ]; + features = [ "fs" "net" "rt-multi-thread" "signal" ]; } { name = "tokio-stream"; @@ -10007,7 +10234,7 @@ rec { "Win32_Web" = [ "Win32" ]; "Win32_Web_InternetExplorer" = [ "Win32_Web" ]; }; - resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_NetworkManagement" "Win32_NetworkManagement_IpHelper" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Security_Authorization" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Console" "Win32_System_Diagnostics" "Win32_System_Diagnostics_Debug" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "default" ]; + resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_NetworkManagement" "Win32_NetworkManagement_IpHelper" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Console" "Win32_System_Diagnostics" "Win32_System_Diagnostics_Debug" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "default" ]; }; "windows-targets 0.42.2" = rec { crateName = "windows-targets"; diff --git a/tvix/cli/Cargo.toml b/tvix/cli/Cargo.toml index caebb169a..a73d9be23 100644 --- a/tvix/cli/Cargo.toml +++ b/tvix/cli/Cargo.toml @@ -20,6 +20,7 @@ smol_str = "0.2.0" ssri = "7.0.0" thiserror = "1.0.38" tracing = "0.1.37" +tokio = "1.28.0" [dependencies.wu-manber] git = "https://github.com/tvlfyi/wu-manber.git" diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 1980ac173..65970e1d1 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -80,9 +80,16 @@ fn interpret(code: &str, path: Option, args: &Args, explain: bool) -> b directory_service.clone(), )); + let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); + eval.io_handle = Box::new(tvix_io::TvixIO::new( known_paths.clone(), - TvixStoreIO::new(blob_service, directory_service, path_info_service), + TvixStoreIO::new( + blob_service, + directory_service, + path_info_service, + tokio_runtime.handle().clone(), + ), )); // bundle fetchurl.nix (used in nixpkgs) by resolving to diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs index 2c6733853..1a373a705 100644 --- a/tvix/cli/src/tvix_store_io.rs +++ b/tvix/cli/src/tvix_store_io.rs @@ -2,6 +2,7 @@ use nix_compat::store_path::{self, StorePath}; use std::{io, path::Path, path::PathBuf, sync::Arc}; +use tokio::io::AsyncReadExt; use tracing::{error, instrument, warn}; use tvix_eval::{EvalIO, FileType, StdIO}; @@ -27,6 +28,7 @@ pub struct TvixStoreIO { directory_service: Arc, path_info_service: Arc, std_io: StdIO, + tokio_handle: tokio::runtime::Handle, } impl TvixStoreIO { @@ -34,12 +36,14 @@ impl TvixStoreIO { blob_service: Arc, directory_service: Arc, path_info_service: Arc, + tokio_handle: tokio::runtime::Handle, ) -> Self { Self { blob_service, directory_service, path_info_service, std_io: StdIO {}, + tokio_handle, } } @@ -86,65 +90,6 @@ impl TvixStoreIO { sub_path, )?) } - - /// Imports a given path on the filesystem into the store, and returns the - /// [PathInfo] describing the path, that was sent to - /// [PathInfoService]. - /// While not part of the [EvalIO], it's still useful for clients who - /// care about the [PathInfo]. - #[instrument(skip(self), ret, err)] - pub fn import_path_with_pathinfo(&self, path: &std::path::Path) -> Result { - // Call [import::ingest_path], which will walk over the given path and return a root_node. - let root_node = import::ingest_path( - self.blob_service.clone(), - self.directory_service.clone(), - path, - ) - .expect("error during import_path"); - - // Render the NAR - let (nar_size, nar_sha256) = calculate_size_and_sha256( - &root_node, - self.blob_service.clone(), - self.directory_service.clone(), - ) - .expect("error during nar calculation"); // TODO: handle error - - // TODO: make a path_to_name helper function? - let name = path - .file_name() - .expect("path must not be ..") - .to_str() - .expect("path must be valid unicode"); - - let output_path = store_path::build_nar_based_store_path(&nar_sha256, name); - - // assemble a new root_node with a name that is derived from the nar hash. - let root_node = root_node.rename(output_path.to_string().into_bytes().into()); - - // assemble the [PathInfo] object. - let path_info = PathInfo { - node: Some(tvix_store::proto::Node { - node: Some(root_node), - }), - // There's no reference scanning on path contents ingested like this. - references: vec![], - narinfo: Some(NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` - // do we need this anywhere? - }), - }; - - // put into [PathInfoService], and return the [PathInfo] that we get - // back from there (it might contain additional signatures). - let path_info = self.path_info_service.put(path_info)?; - - Ok(path_info) - } } impl EvalIO for TvixStoreIO { @@ -197,24 +142,33 @@ impl EvalIO for TvixStoreIO { ) })?; - let reader = { - let resp = self.blob_service.open_read(&digest)?; - match resp { - Some(blob_reader) => blob_reader, - None => { - error!( - blob.digest = %digest, - "blob not found", - ); - Err(io::Error::new( - io::ErrorKind::NotFound, - format!("blob {} not found", &digest), - ))? - } - } - }; + let blob_service = self.blob_service.clone(); - io::read_to_string(reader) + let task = self.tokio_handle.spawn(async move { + let mut reader = { + let resp = blob_service.open_read(&digest).await?; + match resp { + Some(blob_reader) => blob_reader, + None => { + error!( + blob.digest = %digest, + "blob not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("blob {} not found", &digest), + ))? + } + } + }; + + let mut buf = String::new(); + + reader.read_to_string(&mut buf).await?; + Ok(buf) + }); + + self.tokio_handle.block_on(task).unwrap() } Node::Symlink(_symlink_node) => Err(io::Error::new( io::ErrorKind::Unsupported, @@ -296,7 +250,16 @@ impl EvalIO for TvixStoreIO { #[instrument(skip(self), ret, err)] fn import_path(&self, path: &std::path::Path) -> Result { - let path_info = self.import_path_with_pathinfo(path)?; + let p = path.to_owned(); + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + let path_info_service = self.path_info_service.clone(); + + let task = self.tokio_handle.spawn(async move { + import_path_with_pathinfo(blob_service, directory_service, path_info_service, &p).await + }); + + let path_info = self.tokio_handle.block_on(task).unwrap()?; // from the [PathInfo], extract the store path (as string). Ok({ @@ -320,3 +283,63 @@ impl EvalIO for TvixStoreIO { Some("/nix/store".to_string()) } } + +/// Imports a given path on the filesystem into the store, and returns the +/// [PathInfo] describing the path, that was sent to +/// [PathInfoService]. +#[instrument(skip(blob_service, directory_service, path_info_service), ret, err)] +async fn import_path_with_pathinfo( + blob_service: Arc, + directory_service: Arc, + path_info_service: Arc, + path: &std::path::Path, +) -> Result { + // Call [import::ingest_path], which will walk over the given path and return a root_node. + let root_node = import::ingest_path(blob_service.clone(), directory_service.clone(), path) + .await + .expect("error during import_path"); + + // Render the NAR. This is blocking. + let calc_task = tokio::task::spawn_blocking(move || { + let (nar_size, nar_sha256) = + calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone()) + .expect("error during nar calculation"); // TODO: handle error + (nar_size, nar_sha256, root_node) + }); + let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap(); + + // TODO: make a path_to_name helper function? + let name = path + .file_name() + .expect("path must not be ..") + .to_str() + .expect("path must be valid unicode"); + + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name); + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = root_node.rename(output_path.to_string().into_bytes().into()); + + // assemble the [PathInfo] object. + let path_info = PathInfo { + node: Some(tvix_store::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` + // do we need this anywhere? + }), + }; + + // put into [PathInfoService], and return the [PathInfo] that we get + // back from there (it might contain additional signatures). + let path_info = path_info_service.put(path_info)?; + + Ok(path_info) +} diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index dece06be8..3fabf4f84 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -17,7 +17,7 @@ sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" tokio-stream = "0.1.14" -tokio = { version = "1.28.0", features = ["net", "rt-multi-thread", "signal"] } +tokio = { version = "1.28.0", features = ["fs", "net", "rt-multi-thread", "signal"] } tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["json"] } @@ -29,6 +29,7 @@ bytes = "1.4.0" smol_str = "0.2.0" serde_json = "1.0" url = "2.4.0" +pin-project-lite = "0.2.13" [dependencies.fuser] optional = true diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index f69c2f7fa..0da264808 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath; use std::io; use std::path::Path; use std::path::PathBuf; +use tokio::task::JoinHandle; use tracing_subscriber::prelude::*; use tvix_store::blobservice; use tvix_store::directoryservice; @@ -199,17 +200,24 @@ async fn main() -> Result<(), Box> { let directory_service = directory_service.clone(); let path_info_service = path_info_service.clone(); - let task = tokio::task::spawn_blocking(move || -> io::Result<()> { + let task: JoinHandle> = tokio::task::spawn(async move { // Ingest the path into blob and directory service. let root_node = import::ingest_path( blob_service.clone(), directory_service.clone(), &path, ) + .await .expect("failed to ingest path"); // Ask the PathInfoService for the NAR size and sha256 - let (nar_size, nar_sha256) = path_info_service.calculate_nar(&root_node)?; + let root_node_copy = root_node.clone(); + let path_info_service_clone = path_info_service.clone(); + let (nar_size, nar_sha256) = tokio::task::spawn_blocking(move || { + path_info_service_clone.calculate_nar(&root_node_copy) + }) + .await + .unwrap()?; // TODO: make a path_to_name helper function? let name = path @@ -241,7 +249,10 @@ async fn main() -> Result<(), Box> { // put into [PathInfoService], and return the PathInfo that we get back // from there (it might contain additional signatures). - let path_info = path_info_service.put(path_info)?; + let path_info = + tokio::task::spawn_blocking(move || path_info_service.put(path_info)) + .await + .unwrap()?; let node = path_info.node.unwrap().node.unwrap(); @@ -304,9 +315,9 @@ async fn main() -> Result<(), Box> { // task. tokio::task::spawn_blocking(move || -> io::Result<()> { info!("mounting tvix-store on {:?}", fuse_session.mountpoint()); - let res = fuse_session.run()?; + fuse_session.run()?; info!("unmount occured, terminating…"); - Ok(res) + Ok(()) }) .await??; } diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs deleted file mode 100644 index 6df4eb57f..000000000 --- a/tvix/store/src/blobservice/dumb_seeker.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::io; - -use tracing::{debug, instrument}; - -use super::BlobReader; - -/// This implements [io::Seek] for and [io::Read] by simply skipping over some -/// bytes, keeping track of the position. -/// It fails whenever you try to seek backwards. -pub struct DumbSeeker { - r: R, - pos: u64, -} - -impl DumbSeeker { - pub fn new(r: R) -> Self { - DumbSeeker { r, pos: 0 } - } -} - -impl io::Read for DumbSeeker { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let bytes_read = self.r.read(buf)?; - - self.pos += bytes_read as u64; - - Ok(bytes_read) - } -} - -impl io::Seek for DumbSeeker { - #[instrument(skip(self))] - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let absolute_offset: u64 = match pos { - io::SeekFrom::Start(start_offset) => { - if start_offset < self.pos { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - format!("can't seek backwards ({} -> {})", self.pos, start_offset), - )); - } else { - start_offset - } - } - // we don't know the total size, can't support this. - io::SeekFrom::End(_end_offset) => { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek from end", - )); - } - io::SeekFrom::Current(relative_offset) => { - if relative_offset < 0 { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek backwards relative to current position", - )); - } else { - self.pos + relative_offset as u64 - } - } - }; - - debug!(absolute_offset=?absolute_offset, "seek"); - - // we already know absolute_offset is larger than self.pos - debug_assert!( - absolute_offset >= self.pos, - "absolute_offset {} is larger than self.pos {}", - absolute_offset, - self.pos - ); - - // calculate bytes to skip - let bytes_to_skip: u64 = absolute_offset - self.pos; - - // discard these bytes. We can't use take() as it requires ownership of - // self.r, but we only have &mut self. - let mut buf = [0; 1024]; - let mut bytes_skipped: u64 = 0; - while bytes_skipped < bytes_to_skip { - let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64); - match self.r.read(&mut buf[..len as usize]) { - Ok(0) => break, - Ok(n) => bytes_skipped += n as u64, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - } - - // This will fail when seeking past the end of self.r - if bytes_to_skip != bytes_skipped { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!( - "tried to skip {} bytes, but only was able to skip {} until reaching EOF", - bytes_to_skip, bytes_skipped - ), - )); - } - - self.pos = absolute_offset; - - // return the new position from the start of the stream - Ok(absolute_offset) - } -} - -/// A Cursor> can be used as a BlobReader. -impl BlobReader for DumbSeeker {} diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index c6d28860f..cea796033 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -1,22 +1,26 @@ -use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter}; +use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; use crate::{proto, B3Digest}; -use futures::sink::{SinkExt, SinkMapErr}; -use std::{collections::VecDeque, io}; +use futures::sink::SinkExt; +use futures::TryFutureExt; +use std::{ + collections::VecDeque, + io::{self}, + pin::pin, + task::Poll, +}; +use tokio::io::AsyncWriteExt; use tokio::{net::UnixStream, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ - io::{CopyToBytes, SinkWriter, SyncIoBridge}, + io::{CopyToBytes, SinkWriter}, sync::{PollSendError, PollSender}, }; -use tonic::{transport::Channel, Code, Status, Streaming}; +use tonic::{async_trait, transport::Channel, Code, Status}; use tracing::instrument; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient, @@ -28,13 +32,11 @@ impl GRPCBlobService { pub fn from_client( grpc_client: proto::blob_service_client::BlobServiceClient, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl BlobService for GRPCBlobService { /// Constructs a [GRPCBlobService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -89,22 +91,16 @@ impl BlobService for GRPCBlobService { } #[instrument(skip(self, digest), fields(blob.digest=%digest))] - fn has(&self, digest: &B3Digest) -> Result { - // Get a new handle to the gRPC client, and copy the digest. + async fn has(&self, digest: &B3Digest) -> Result { let mut grpc_client = self.grpc_client.clone(); - let digest = digest.clone(); + let resp = grpc_client + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + ..Default::default() + }) + .await; - let task: JoinHandle> = self.tokio_handle.spawn(async move { - Ok(grpc_client - .stat(proto::StatBlobRequest { - digest: digest.into(), - ..Default::default() - }) - .await? - .into_inner()) - }); - - match self.tokio_handle.block_on(task)? { + match resp { Ok(_blob_meta) => Ok(true), Err(e) if e.code() == Code::NotFound => Ok(false), Err(e) => Err(crate::Error::StorageError(e.to_string())), @@ -113,35 +109,30 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result>, crate::Error> { + async fn open_read( + &self, + digest: &B3Digest, + ) -> Result>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); - let digest = digest.clone(); - // Construct the task that'll send out the request and return the stream - // the gRPC client should use to send [proto::BlobChunk], or an error if - // the blob doesn't exist. - let task: JoinHandle, Status>> = - self.tokio_handle.spawn(async move { - let stream = grpc_client - .read(proto::ReadBlobRequest { - digest: digest.into(), - }) - .await? - .into_inner(); - - Ok(stream) - }); + // Get a stream of [proto::BlobChunk], or return an error if the blob + // doesn't exist. + let resp = grpc_client + .read(proto::ReadBlobRequest { + digest: digest.clone().into(), + }) + .await; // This runs the task to completion, which on success will return a stream. // On reading from it, we receive individual [proto::BlobChunk], so we // massage this to a stream of bytes, // then create an [AsyncRead], which we'll turn into a [io::Read], // that's returned from the function. - match self.tokio_handle.block_on(task)? { + match resp { Ok(stream) => { // map the stream of proto::BlobChunk to bytes. - let data_stream = stream.map(|x| { + let data_stream = stream.into_inner().map(|x| { x.map(|x| VecDeque::from(x.data.to_vec())) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); @@ -149,9 +140,7 @@ impl BlobService for GRPCBlobService { // Use StreamReader::new to convert to an AsyncRead. let data_reader = tokio_util::io::StreamReader::new(data_stream); - // Use SyncIoBridge to turn it into a sync Read. - let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader); - Ok(Some(Box::new(DumbSeeker::new(sync_reader)))) + Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), @@ -160,7 +149,7 @@ impl BlobService for GRPCBlobService { /// Returns a BlobWriter, that'll internally wrap each write in a // [proto::BlobChunk], which is send to the gRPC server. - fn open_write(&self) -> Box { + async fn open_write(&self) -> Box { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -171,9 +160,8 @@ impl BlobService for GRPCBlobService { let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. - let task: JoinHandle> = self - .tokio_handle - .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + let task: JoinHandle> = + tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); // The tx part of the channel is converted to a sink of byte chunks. @@ -187,43 +175,26 @@ impl BlobService for GRPCBlobService { // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" // … which is turned into an [tokio::io::AsyncWrite]. - let async_writer = SinkWriter::new(CopyToBytes::new(sink)); - // … which is then turned into a [io::Write]. - let writer = SyncIoBridge::new(async_writer); + let writer = SinkWriter::new(CopyToBytes::new(sink)); Box::new(GRPCBlobWriter { - tokio_handle: self.tokio_handle.clone(), task_and_writer: Some((task, writer)), digest: None, }) } } -type BridgedWriter = SyncIoBridge< - SinkWriter< - CopyToBytes< - SinkMapErr, fn(PollSendError) -> io::Error>, - >, - >, ->; - -pub struct GRPCBlobWriter { - /// A handle into the active tokio runtime. Necessary to block on the task - /// containing the put request. - tokio_handle: tokio::runtime::Handle, - +pub struct GRPCBlobWriter { /// The task containing the put request, and the inner writer, if we're still writing. - task_and_writer: Option<( - JoinHandle>, - BridgedWriter, - )>, + task_and_writer: Option<(JoinHandle>, W)>, /// The digest that has been returned, if we successfully closed. digest: Option, } -impl BlobWriter for GRPCBlobWriter { - fn close(&mut self) -> Result { +#[async_trait] +impl BlobWriter for GRPCBlobWriter { + async fn close(&mut self) -> Result { if self.task_and_writer.is_none() { // if we're already closed, return the b3 digest, which must exist. // If it doesn't, we already closed and failed once, and didn't handle the error. @@ -240,12 +211,14 @@ impl BlobWriter for GRPCBlobWriter { // the channel. writer .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; + .map_err(|e| crate::Error::StorageError(e.to_string())) + .await?; // block on the RPC call to return. // This ensures all chunks are sent out, and have been received by the // backend. - match self.tokio_handle.block_on(task)? { + + match task.await? { Ok(resp) => { // return the digest from the response, and store it in self.digest for subsequent closes. let digest: B3Digest = resp.digest.try_into().map_err(|_| { @@ -262,26 +235,48 @@ impl BlobWriter for GRPCBlobWriter { } } -impl io::Write for GRPCBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { +impl tokio::io::AsyncWrite for GRPCBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { match &mut self.task_and_writer { - None => Err(io::Error::new( + None => Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", - )), - Some((_, ref mut writer)) => writer.write(buf), + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_write(cx, buf) + } } } - fn flush(&mut self) -> io::Result<()> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { match &mut self.task_and_writer { - None => Err(io::Error::new( + None => Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", - )), - Some((_, ref mut writer)) => writer.flush(), + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_flush(cx) + } } } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // TODO(raitobezarius): this might not be a graceful shutdown of the + // channel inside the gRPC connection. + Poll::Ready(Ok(())) + } } #[cfg(test)] @@ -291,7 +286,6 @@ mod tests { use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::task; use tokio::time; use tokio_stream::wrappers::UnixListenerStream; @@ -358,32 +352,23 @@ mod tests { } /// This uses the correct scheme for a unix socket, and provides a server on the other side. - #[tokio::test] - async fn test_valid_unix_path_ping_pong() { + /// This is not a tokio::test, because spawn two separate tokio runtimes and + // want to have explicit control. + #[test] + fn test_valid_unix_path_ping_pong() { let tmpdir = TempDir::new().unwrap(); let path = tmpdir.path().join("daemon"); - // let mut join_set = JoinSet::new(); - - // prepare a client - let client = { - let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); - url.set_path(path.to_str().unwrap()); - GRPCBlobService::from_url(&url).expect("must succeed") - }; - - let path_copy = path.clone(); + let path_clone = path.clone(); // Spin up a server, in a thread far away, which spawns its own tokio runtime, // and blocks on the task. thread::spawn(move || { // Create the runtime let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); - let task = handle.spawn(async { - let uds = UnixListener::bind(path_copy).unwrap(); + let task = rt.spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); let uds_stream = UnixListenerStream::new(uds); // spin up a new server @@ -397,33 +382,46 @@ mod tests { router.serve_with_incoming(uds_stream).await }); - handle.block_on(task) + rt.block_on(task).unwrap().unwrap(); }); - // wait for the socket to be created - { - let mut socket_created = false; - for _try in 1..20 { - if path.exists() { - socket_created = true; - break; + // Now create another tokio runtime which we'll use in the main test code. + let rt = tokio::runtime::Runtime::new().unwrap(); + + let task = rt.spawn(async move { + // wait for the socket to be created + { + let mut socket_created = false; + // TODO: exponential backoff urgently + for _try in 1..20 { + if path.exists() { + socket_created = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(20)).await; } - tokio::time::sleep(time::Duration::from_millis(20)).await; + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); } - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); - } + // prepare a client + let client = { + let mut url = + url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + url.set_path(path.to_str().unwrap()); + GRPCBlobService::from_url(&url).expect("must succeed") + }; - let has = task::spawn_blocking(move || { - client + let has = client .has(&fixtures::BLOB_A_DIGEST) - .expect("must not be err") - }) - .await - .expect("must not be err"); - assert!(!has); + .await + .expect("must not be err"); + + assert!(!has); + }); + rt.block_on(task).unwrap() } } diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 893f27364..383127344 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,9 +1,11 @@ -use std::io::{self, Cursor}; +use std::io::{self, Cursor, Write}; +use std::task::Poll; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::{instrument, warn}; +use tonic::async_trait; +use tracing::instrument; use super::{BlobReader, BlobService, BlobWriter}; use crate::{B3Digest, Error}; @@ -13,6 +15,7 @@ pub struct MemoryBlobService { db: Arc>>>, } +#[async_trait] impl BlobService for MemoryBlobService { /// Constructs a [MemoryBlobService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -31,12 +34,12 @@ impl BlobService for MemoryBlobService { } #[instrument(skip(self, digest), fields(blob.digest=%digest))] - fn has(&self, digest: &B3Digest) -> Result { + async fn has(&self, digest: &B3Digest) -> Result { let db = self.db.read().unwrap(); Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result>, Error> { + async fn open_read(&self, digest: &B3Digest) -> Result>, Error> { let db = self.db.read().unwrap(); match db.get(digest).map(|x| Cursor::new(x.clone())) { @@ -46,7 +49,7 @@ impl BlobService for MemoryBlobService { } #[instrument(skip(self))] - fn open_write(&self) -> Box { + async fn open_write(&self) -> Box { Box::new(MemoryBlobWriter::new(self.db.clone())) } } @@ -70,9 +73,13 @@ impl MemoryBlobWriter { } } } -impl std::io::Write for MemoryBlobWriter { - fn write(&mut self, b: &[u8]) -> std::io::Result { - match &mut self.writers { +impl tokio::io::AsyncWrite for MemoryBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll> { + Poll::Ready(match &mut self.writers { None => Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", @@ -81,22 +88,34 @@ impl std::io::Write for MemoryBlobWriter { let bytes_written = buf.write(b)?; hasher.write(&b[..bytes_written]) } - } + }) } - fn flush(&mut self) -> std::io::Result<()> { - match &mut self.writers { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(match self.writers { None => Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", )), Some(_) => Ok(()), - } + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // shutdown is "instantaneous", we only write to memory. + Poll::Ready(Ok(())) } } +#[async_trait] impl BlobWriter for MemoryBlobWriter { - fn close(&mut self) -> Result { + async fn close(&mut self) -> Result { if self.writers.is_none() { match &self.digest { Some(digest) => Ok(digest.clone()), diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index 33cfb113e..5ecf25ac1 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -1,11 +1,12 @@ use std::io; +use tonic::async_trait; use crate::{B3Digest, Error}; -mod dumb_seeker; mod from_addr; mod grpc; mod memory; +mod naive_seeker; mod sled; #[cfg(test)] @@ -21,35 +22,41 @@ pub use self::sled::SledBlobService; /// a way to get a [io::Read] to a blob, and a method to initiate writing a new /// Blob, which will return something implmenting io::Write, and providing a /// close funtion, to finalize a blob and get its digest. +#[async_trait] pub trait BlobService: Send + Sync { /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting fn from_url(url: &url::Url) -> Result where Self: Sized; /// Check if the service has the blob, by its content hash. - fn has(&self, digest: &B3Digest) -> Result; + async fn has(&self, digest: &B3Digest) -> Result; /// Request a blob from the store, by its content hash. - fn open_read(&self, digest: &B3Digest) -> Result>, Error>; + async fn open_read(&self, digest: &B3Digest) -> Result>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. - fn open_write(&self) -> Box; + async fn open_write(&self) -> Box; } -/// A [io::Write] that you need to close() afterwards, and get back the digest -/// of the written blob. -pub trait BlobWriter: io::Write + Send + Sync + 'static { +/// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back +/// the digest of the written blob. +#[async_trait] +pub trait BlobWriter: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static { /// Signal there's no more data to be written, and return the digest of the /// contents written. /// /// Closing a already-closed BlobWriter is a no-op. - fn close(&mut self) -> Result; + async fn close(&mut self) -> Result; } -/// A [io::Read] that also allows seeking. -pub trait BlobReader: io::Read + io::Seek + Send + 'static {} +/// A [tokio::io::AsyncRead] that also allows seeking. +pub trait BlobReader: + tokio::io::AsyncRead + tokio::io::AsyncSeek + tokio::io::AsyncBufRead + Send + Unpin + 'static +{ +} /// A [`io::Cursor>`] can be used as a BlobReader. impl BlobReader for io::Cursor> {} diff --git a/tvix/store/src/blobservice/naive_seeker.rs b/tvix/store/src/blobservice/naive_seeker.rs new file mode 100644 index 000000000..e65a82c7f --- /dev/null +++ b/tvix/store/src/blobservice/naive_seeker.rs @@ -0,0 +1,269 @@ +use super::BlobReader; +use pin_project_lite::pin_project; +use std::io; +use std::task::Poll; +use tokio::io::AsyncRead; +use tracing::{debug, instrument}; + +pin_project! { + /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by + /// simply skipping over some bytes, keeping track of the position. + /// It fails whenever you try to seek backwards. + /// + /// ## Pinning concerns: + /// + /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern + /// ourselves regarding that. + /// + /// Though, its fields as per + /// + /// can be pinned or unpinned. + /// + /// So we need to go over each field and choose our policy carefully. + /// + /// The obvious cases are the bookkeeping integers we keep in the structure, + /// those are private and not shared to anyone, we never build a + /// `Pin<&mut X>` out of them at any point, therefore, we can safely never + /// mark them as pinned. Of course, it is expected that no developer here + /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If + /// they have to become pinned, they should be marked `#[pin]` and we need + /// to discuss it. + /// + /// So the bookkeeping integers are in the right state with respect to their + /// pinning status. The projection should offer direct access. + /// + /// On the `r` field, i.e. a `BufReader`, given that + /// + /// is available, even a `Pin<&mut BufReader>` can be safely moved. + /// + /// The only care we should have regards the internal reader itself, i.e. + /// the `R` instance, see that Tokio decided to `#[pin]` it too: + /// + /// + /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead` + /// (see ). + /// + /// Therefore, we could keep it unpinned and pin it in every call site + /// whenever we need to call `poll_*` which can be confusing to the non- + /// expert developer and we have a fair share amount of situations where the + /// [BufReader] instance is naked, i.e. in its `&mut BufReader` + /// form, this is annoying because it could lead to expose the naked `R` + /// internal instance somehow and would produce a risk of making it move + /// unexpectedly. + /// + /// We choose the path of the least resistance as we have no reason to have + /// access to the raw `BufReader` instance, we just `#[pin]` it too and + /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the + /// internal implementations themselves, which studied the question longer + /// than us. + pub struct NaiveSeeker { + #[pin] + r: tokio::io::BufReader, + pos: u64, + bytes_to_skip: u64, + } +} + +impl NaiveSeeker { + pub fn new(r: R) -> Self { + NaiveSeeker { + r: tokio::io::BufReader::new(r), + pos: 0, + bytes_to_skip: 0, + } + } +} + +impl tokio::io::AsyncRead for NaiveSeeker { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + let this = self.project(); + let pos: &mut u64 = this.pos; + + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl tokio::io::AsyncBufRead for NaiveSeeker { + fn poll_fill_buf( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().r.poll_fill_buf(cx) + } + + fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { + let this = self.project(); + this.r.consume(amt); + let pos: &mut u64 = this.pos; + *pos += amt as u64; + } +} + +impl tokio::io::AsyncSeek for NaiveSeeker { + #[instrument(skip(self))] + fn start_seek( + self: std::pin::Pin<&mut Self>, + position: std::io::SeekFrom, + ) -> std::io::Result<()> { + let absolute_offset: u64 = match position { + io::SeekFrom::Start(start_offset) => { + if start_offset < self.pos { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + format!("can't seek backwards ({} -> {})", self.pos, start_offset), + )); + } else { + start_offset + } + } + // we don't know the total size, can't support this. + io::SeekFrom::End(_end_offset) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek from end", + )); + } + io::SeekFrom::Current(relative_offset) => { + if relative_offset < 0 { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek backwards relative to current position", + )); + } else { + self.pos + relative_offset as u64 + } + } + }; + + debug!(absolute_offset=?absolute_offset, "seek"); + + // we already know absolute_offset is larger than self.pos + debug_assert!( + absolute_offset >= self.pos, + "absolute_offset {} is larger than self.pos {}", + absolute_offset, + self.pos + ); + + // calculate bytes to skip + *self.project().bytes_to_skip = absolute_offset - self.pos; + + Ok(()) + } + + #[instrument(skip(self))] + fn poll_complete( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.bytes_to_skip == 0 { + // return the new position (from the start of the stream) + return Poll::Ready(Ok(self.pos)); + } + + // discard some bytes, until pos is where we want it to be. + // We create a buffer that we'll discard later on. + let mut buf = [0; 1024]; + + // Loop until we've reached the desired seek position. This is done by issuing repeated + // `poll_read` calls. If the data is not available yet, we will yield back to the executor + // and wait to be polled again. + loop { + // calculate the length we want to skip at most, which is either a max + // buffer size, or the number of remaining bytes to read, whatever is + // smaller. + let bytes_to_skip = std::cmp::min(self.bytes_to_skip as usize, buf.len()); + + let mut read_buf = tokio::io::ReadBuf::new(&mut buf[..bytes_to_skip]); + + match self.as_mut().poll_read(cx, &mut read_buf) { + Poll::Ready(_a) => { + let bytes_read = read_buf.filled().len() as u64; + + if bytes_read == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "tried to skip {} bytes, but only was able to skip {} until reaching EOF", + bytes_to_skip, bytes_read + ), + ))); + } + + // calculate bytes to skip + let bytes_to_skip = self.bytes_to_skip - bytes_read; + + *self.as_mut().project().bytes_to_skip = bytes_to_skip; + + if bytes_to_skip == 0 { + return Poll::Ready(Ok(self.pos)); + } + } + Poll::Pending => return Poll::Pending, + }; + } + } +} + +impl BlobReader for NaiveSeeker {} + +#[cfg(test)] +mod tests { + use super::NaiveSeeker; + use std::io::{Cursor, SeekFrom}; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + /// This seek requires multiple `poll_read` as we use a 1024 bytes internal + /// buffer when doing the seek. + /// This ensures we don't hang indefinitely. + #[tokio::test] + async fn seek() { + let buf = vec![0u8; 4096]; + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + seeker.seek(SeekFrom::Start(4000)).await.unwrap(); + } + + #[tokio::test] + async fn seek_read() { + let mut buf = vec![0u8; 2048]; + buf.extend_from_slice(&[1u8; 2048]); + buf.extend_from_slice(&[2u8; 2048]); + + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + + let mut read_buf = vec![0u8; 1024]; + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[0u8; 1024]); + + seeker + .seek(SeekFrom::Current(1024)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[1u8; 1024]); + + seeker + .seek(SeekFrom::Start(2 * 2048)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[2u8; 1024]); + } +} diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 00291ba88..209f0b76f 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -1,9 +1,11 @@ use super::{BlobReader, BlobService, BlobWriter}; use crate::{B3Digest, Error}; use std::{ - io::{self, Cursor}, + io::{self, Cursor, Write}, path::PathBuf, + task::Poll, }; +use tonic::async_trait; use tracing::instrument; #[derive(Clone)] @@ -27,6 +29,7 @@ impl SledBlobService { } } +#[async_trait] impl BlobService for SledBlobService { /// Constructs a [SledBlobService] from the passed [url::Url]: /// - scheme has to be `sled://` @@ -57,7 +60,7 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn has(&self, digest: &B3Digest) -> Result { + async fn has(&self, digest: &B3Digest) -> Result { match self.db.contains_key(digest.to_vec()) { Ok(has) => Ok(has), Err(e) => Err(Error::StorageError(e.to_string())), @@ -65,7 +68,7 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result>, Error> { + async fn open_read(&self, digest: &B3Digest) -> Result>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), @@ -74,7 +77,7 @@ impl BlobService for SledBlobService { } #[instrument(skip(self))] - fn open_write(&self) -> Box { + async fn open_write(&self) -> Box { Box::new(SledBlobWriter::new(self.db.clone())) } } @@ -99,9 +102,13 @@ impl SledBlobWriter { } } -impl io::Write for SledBlobWriter { - fn write(&mut self, b: &[u8]) -> io::Result { - match &mut self.writers { +impl tokio::io::AsyncWrite for SledBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll> { + Poll::Ready(match &mut self.writers { None => Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", @@ -110,22 +117,34 @@ impl io::Write for SledBlobWriter { let bytes_written = buf.write(b)?; hasher.write(&b[..bytes_written]) } - } + }) } - fn flush(&mut self) -> io::Result<()> { - match &mut self.writers { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(match &mut self.writers { None => Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", )), Some(_) => Ok(()), - } + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // shutdown is "instantaneous", we only write to a Vec as buffer. + Poll::Ready(Ok(())) } } +#[async_trait] impl BlobWriter for SledBlobWriter { - fn close(&mut self) -> Result { + async fn close(&mut self) -> Result { if self.writers.is_none() { match &self.digest { Some(digest) => Ok(digest.clone()), diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs index ec7a618fa..501270780 100644 --- a/tvix/store/src/blobservice/tests.rs +++ b/tvix/store/src/blobservice/tests.rs @@ -1,6 +1,9 @@ use std::io; +use std::pin::pin; use test_case::test_case; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use super::B3Digest; use super::BlobService; @@ -24,19 +27,25 @@ fn gen_sled_blob_service() -> impl BlobService { #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn has_nonexistent_false(blob_service: impl BlobService) { - assert!(!blob_service - .has(&fixtures::BLOB_A_DIGEST) - .expect("must not fail")); + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(!blob_service + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail")); + }) } /// Trying to read a non-existing blob should return a None instead of a reader. #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn not_found_read(blob_service: impl BlobService) { - assert!(blob_service - .open_read(&fixtures::BLOB_A_DIGEST) - .expect("must not fail") - .is_none()) + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(blob_service + .open_read(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail") + .is_none()) + }) } /// Put a blob in the store, check has, get it back. @@ -46,165 +55,192 @@ fn not_found_read(blob_service: impl BlobService) { #[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")] #[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")] fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) { - let mut w = blob_service.open_write(); + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; - let l = io::copy(&mut io::Cursor::new(blob_contents), &mut w).expect("copy must succeed"); - assert_eq!( - blob_contents.len(), - l as usize, - "written bytes must match blob length" - ); + let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w) + .await + .expect("copy must succeed"); + assert_eq!( + blob_contents.len(), + l as usize, + "written bytes must match blob length" + ); - let digest = w.close().expect("close must succeed"); + let digest = w.close().await.expect("close must succeed"); - assert_eq!(*blob_digest, digest, "returned digest must be correct"); + assert_eq!(*blob_digest, digest, "returned digest must be correct"); - assert!( - blob_service.has(blob_digest).expect("must not fail"), - "blob service should now have the blob" - ); + assert!( + blob_service.has(blob_digest).await.expect("must not fail"), + "blob service should now have the blob" + ); - let mut r = blob_service - .open_read(blob_digest) - .expect("open_read must succeed") - .expect("must be some"); + let mut r = blob_service + .open_read(blob_digest) + .await + .expect("open_read must succeed") + .expect("must be some"); - let mut buf: Vec = Vec::new(); - let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); + let mut buf: Vec = Vec::new(); + let mut pinned_reader = pin!(r); + let l = tokio::io::copy(&mut pinned_reader, &mut buf) + .await + .expect("copy must succeed"); + // let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); - assert_eq!( - blob_contents.len(), - l as usize, - "read bytes must match blob length" - ); + assert_eq!( + blob_contents.len(), + l as usize, + "read bytes must match blob length" + ); - assert_eq!(blob_contents, buf, "read blob contents must match"); + assert_eq!(blob_contents, buf, "read blob contents must match"); + }) } /// Put a blob in the store, and seek inside it a bit. #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn put_seek(blob_service: impl BlobService) { - let mut w = blob_service.open_write(); + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; - io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed"); - w.close().expect("close must succeed"); + tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w) + .await + .expect("copy must succeed"); + w.close().await.expect("close must succeed"); - // open a blob for reading - let mut r = blob_service - .open_read(&fixtures::BLOB_B_DIGEST) - .expect("open_read must succeed") - .expect("must be some"); + // open a blob for reading + let mut r = blob_service + .open_read(&fixtures::BLOB_B_DIGEST) + .await + .expect("open_read must succeed") + .expect("must be some"); - let mut pos: u64 = 0; + let mut pos: u64 = 0; - // read the first 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); - - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected first 10 bytes to match" - ); - - pos += buf.len() as u64; - } - // seek by 0 bytes, using SeekFrom::Start. - let p = r.seek(io::SeekFrom::Start(pos)).expect("must not fail"); - assert_eq!(pos, p); - - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); - - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); - - pos += buf.len() as u64; - } - - // seek by 5 bytes, using SeekFrom::Start. - let p = r.seek(io::SeekFrom::Start(pos + 5)).expect("must not fail"); - pos += 5; - assert_eq!(pos, p); - - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); - - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); - - pos += buf.len() as u64; - } - - // seek by 12345 bytes, using SeekFrom:: - let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail"); - pos += 12345; - assert_eq!(pos, p); - - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); - - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); - - #[allow(unused_assignments)] + // read the first 10 bytes, they must match the data in the fixture. { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); + pos += buf.len() as u64; } - } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos)) + .await + .expect("must not fail"); + assert_eq!(pos, p); - // seeking to the end is okay… - let p = r - .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) - .expect("must not fail"); - pos = fixtures::BLOB_B.len() as u64; - assert_eq!(pos, p); + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); - { - // but it returns no more data. - let mut buf: Vec = Vec::new(); - r.read_to_end(&mut buf).expect("must not fail"); - assert!(buf.is_empty(), "expected no more data to be read"); - } + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); - // seeking past the end… - match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) { - // should either be ok, but then return 0 bytes. - // this matches the behaviour or a Cursor>. - Ok(_pos) => { + pos += buf.len() as u64; + } + + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos + 5)) + .await + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 12345 bytes, using SeekFrom:: + let p = r + .seek(io::SeekFrom::Current(12345)) + .await + .expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } + } + + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) + .await + .expect("must not fail"); + pos = fixtures::BLOB_B.len() as u64; + assert_eq!(pos, p); + + { + // but it returns no more data. let mut buf: Vec = Vec::new(); - r.read_to_end(&mut buf).expect("must not fail"); + r.read_to_end(&mut buf).await.expect("must not fail"); assert!(buf.is_empty(), "expected no more data to be read"); } - // or not be okay. - Err(_) => {} - } - // TODO: this is only broken for the gRPC version - // We expect seeking backwards or relative to the end to fail. - // r.seek(io::SeekFrom::Current(-1)) - // .expect_err("SeekFrom::Current(-1) expected to fail"); + // seeking past the end… + match r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) + .await + { + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor>. + Ok(_pos) => { + let mut buf: Vec = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + Err(_) => {} + } - // r.seek(io::SeekFrom::Start(pos - 1)) - // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); - // r.seek(io::SeekFrom::End(0)) - // .expect_err("SeekFrom::End(_) expected to fail"); + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); + }) } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index 36e5b10d3..fea419140 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -49,7 +49,7 @@ pub trait DirectoryService: Send + Sync { /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to /// retrieve the root digest (or an error). -pub trait DirectoryPutter { +pub trait DirectoryPutter: Send { /// Put a individual [proto::Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 0015abb9d..978fd50e2 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -18,11 +18,12 @@ use crate::{ }; use fuser::{FileAttr, ReplyAttr, Request}; use nix_compat::store_path::StorePath; -use std::io::{self, Read, Seek}; +use std::io; use std::os::unix::ffi::OsStrExt; use std::str::FromStr; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; +use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; use tracing::{debug, info_span, warn}; use self::inode_tracker::InodeTracker; @@ -79,6 +80,8 @@ pub struct FUSE { file_handles: HashMap>, next_file_handle: u64, + + tokio_handle: tokio::runtime::Handle, } impl FUSE { @@ -100,6 +103,7 @@ impl FUSE { file_handles: Default::default(), next_file_handle: 1, + tokio_handle: tokio::runtime::Handle::current(), } } @@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::ENOSYS); return; } + // lookup the inode match *self.inode_tracker.get(ino).unwrap() { // read is invalid on non-files. @@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE { let span = info_span!("read", blob.digest = %blob_digest); let _enter = span.enter(); - match self.blob_service.open_read(blob_digest) { + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + + let task = self + .tokio_handle + .spawn(async move { blob_service.open_read(&blob_digest).await }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { Ok(None) => { warn!("blob not found"); reply.error(libc::EIO); @@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::EIO); } Ok(Some(blob_reader)) => { + debug!("add file handle {}", fh); self.file_handles.insert(fh, blob_reader); reply.opened(fh, 0); @@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE { reply: fuser::ReplyEmpty, ) { // remove and get ownership on the blob reader - let blob_reader = self.file_handles.remove(&fh).unwrap(); - // drop it, which will close it. - drop(blob_reader); + match self.file_handles.remove(&fh) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", fh); + } + } reply.ok(); } @@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE { ) { debug!("read"); - let blob_reader = self.file_handles.get_mut(&fh).unwrap(); - - // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)); - match resp { - Ok(pos) => { - debug_assert_eq!(offset as u64, pos); - } - Err(e) => { - warn!("failed to seek to offset {}: {}", offset, e); + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let mut blob_reader = match self.file_handles.remove(&fh) { + Some(blob_reader) => blob_reader, + None => { + warn!("file handle {} unknown", fh); reply.error(libc::EIO); return; } - } + }; - // now with the blobreader seeked to this location, read size of data - let data: std::io::Result> = - blob_reader.bytes().take(size.try_into().unwrap()).collect(); + let task = self.tokio_handle.spawn(async move { + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; - match data { - // respond with the requested data - Ok(data) => reply.data(&data), - Err(e) => reply.error(e.raw_os_error().unwrap()), + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(libc::EIO); + } + } + + // As written in the fuser docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec = Vec::with_capacity(size as usize); + + while (buf.len() as u64) < size as u64 { + match blob_reader.fill_buf().await { + Ok(int_buf) => { + // copy things from the internal buffer into buf to fill it till up until size + + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } + + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); + } + Err(e) => return Err(e.raw_os_error().unwrap()), + } + } + Ok((buf, blob_reader)) + }); + + let resp = self.tokio_handle.block_on(task).unwrap(); + + match resp { + Err(e) => reply.error(e), + Ok((buf, blob_reader)) => { + reply.data(&buf); + self.file_handles.insert(fh, blob_reader); + } } } diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs index 2c99f7547..29856433b 100644 --- a/tvix/store/src/fuse/tests.rs +++ b/tvix/store/src/fuse/tests.rs @@ -1,8 +1,8 @@ -use std::fs; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::Arc; +use std::{fs, io}; use tempfile::TempDir; @@ -21,34 +21,25 @@ const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; -fn setup_and_mount, F>( - mountpoint: P, - setup_fn: F, -) -> Result -where - F: Fn(Arc, Arc, Arc), -{ - setup_and_mount_with_listing(mountpoint, setup_fn, false) -} - -fn setup_and_mount_with_listing, F>( - mountpoint: P, - setup_fn: F, - list_root: bool, -) -> Result -where - F: Fn(Arc, Arc, Arc), -{ +fn gen_svcs() -> ( + Arc, + Arc, + Arc, +) { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); - setup_fn( - blob_service.clone(), - directory_service.clone(), - path_info_service.clone(), - ); + (blob_service, directory_service, path_info_service) +} +fn do_mount>( + blob_service: Arc, + directory_service: Arc, + path_info_service: Arc, + mountpoint: P, + list_root: bool, +) -> io::Result { let fs = FUSE::new( blob_service, directory_service, @@ -58,16 +49,17 @@ where fuser::spawn_mount2(fs, mountpoint, &[]) } -fn populate_blob_a( - blob_service: Arc, - _directory_service: Arc, - path_info_service: Arc, +async fn populate_blob_a( + blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, ) { // Upload BLOB_A - let mut bw = blob_service.open_write(); - std::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await .expect("must succeed uploading"); - bw.close().expect("must succeed closing"); + bw.close().await.expect("must succeed closing"); // Create a PathInfo for it let path_info = PathInfo { @@ -84,16 +76,17 @@ fn populate_blob_a( path_info_service.put(path_info).expect("must succeed"); } -fn populate_blob_b( - blob_service: Arc, - _directory_service: Arc, - path_info_service: Arc, +async fn populate_blob_b( + blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, ) { // Upload BLOB_B - let mut bw = blob_service.open_write(); - std::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await .expect("must succeed uploading"); - bw.close().expect("must succeed closing"); + bw.close().await.expect("must succeed closing"); // Create a PathInfo for it let path_info = PathInfo { @@ -111,9 +104,9 @@ fn populate_blob_b( } fn populate_symlink( - _blob_service: Arc, - _directory_service: Arc, - path_info_service: Arc, + _blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, ) { // Create a PathInfo for it let path_info = PathInfo { @@ -131,9 +124,9 @@ fn populate_symlink( /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. fn populate_symlink2( - _blob_service: Arc, - _directory_service: Arc, - path_info_service: Arc, + _blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, ) { // Create a PathInfo for it let path_info = PathInfo { @@ -148,16 +141,16 @@ fn populate_symlink2( path_info_service.put(path_info).expect("must succeed"); } -fn populate_directory_with_keep( - blob_service: Arc, - directory_service: Arc, - path_info_service: Arc, +async fn populate_directory_with_keep( + blob_service: &Arc, + directory_service: &Arc, + path_info_service: &Arc, ) { // upload empty blob - let mut bw = blob_service.open_write(); + let mut bw = blob_service.open_write().await; assert_eq!( fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().expect("must succeed closing").to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), ); // upload directory @@ -182,9 +175,9 @@ fn populate_directory_with_keep( /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory /// itself. fn populate_pathinfo_without_directory( - _: Arc, - _: Arc, - path_info_service: Arc, + _: &Arc, + _: &Arc, + path_info_service: &Arc, ) { // upload pathinfo let path_info = PathInfo { @@ -202,9 +195,9 @@ fn populate_pathinfo_without_directory( /// Insert , but don't provide the blob .keep is pointing to fn populate_blob_a_without_blob( - _: Arc, - _: Arc, - path_info_service: Arc, + _: &Arc, + _: &Arc, + path_info_service: &Arc, ) { // Create a PathInfo for blob A let path_info = PathInfo { @@ -221,16 +214,16 @@ fn populate_blob_a_without_blob( path_info_service.put(path_info).expect("must succeed"); } -fn populate_directory_complicated( - blob_service: Arc, - directory_service: Arc, - path_info_service: Arc, +async fn populate_directory_complicated( + blob_service: &Arc, + directory_service: &Arc, + path_info_service: &Arc, ) { // upload empty blob - let mut bw = blob_service.open_write(); + let mut bw = blob_service.open_write().await; assert_eq!( fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().expect("must succeed closing").to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), ); // upload inner directory @@ -258,8 +251,8 @@ fn populate_directory_complicated( } /// Ensure mounting itself doesn't fail -#[test] -fn mount() { +#[tokio::test] +async fn mount() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -268,14 +261,22 @@ fn mount() { let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); fuser_session.join() } /// Ensure listing the root isn't allowed -#[test] -fn root() { +#[tokio::test] +async fn root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -283,7 +284,15 @@ fn root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); { // read_dir succeeds, but getting the first element will fail. @@ -297,8 +306,8 @@ fn root() { } /// Ensure listing the root is allowed if configured explicitly -#[test] -fn root_with_listing() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -306,8 +315,17 @@ fn root_with_listing() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount_with_listing(tmpdir.path(), populate_blob_a, true).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + true, /* allow listing */ + ) + .expect("must succeed"); { // read_dir succeeds, but getting the first element will fail. @@ -325,8 +343,8 @@ fn root_with_listing() { } /// Ensure we can stat a file at the root -#[test] -fn stat_file_at_root() { +#[tokio::test] +async fn stat_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -334,7 +352,17 @@ fn stat_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); @@ -349,8 +377,8 @@ fn stat_file_at_root() { } /// Ensure we can read a file at the root -#[test] -fn read_file_at_root() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -358,7 +386,17 @@ fn read_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); @@ -373,8 +411,8 @@ fn read_file_at_root() { } /// Ensure we can read a large file at the root -#[test] -fn read_large_file_at_root() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -382,7 +420,17 @@ fn read_large_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_b).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_b(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_B_NAME); { @@ -405,8 +453,8 @@ fn read_large_file_at_root() { } /// Read the target of a symlink -#[test] -fn symlink_readlink() { +#[tokio::test] +async fn symlink_readlink() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -414,7 +462,18 @@ fn symlink_readlink() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_symlink).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + let p = tmpdir.path().join(SYMLINK_NAME); let target = fs::read_link(&p).expect("must succeed"); @@ -437,8 +496,8 @@ fn symlink_readlink() { } /// Read and stat a regular file through a symlink pointing to it. -#[test] -fn read_stat_through_symlink() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -446,10 +505,17 @@ fn read_stat_through_symlink() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_blob_a(bs.clone(), ds.clone(), ps.clone()); - populate_symlink(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_symlink = tmpdir.path().join(SYMLINK_NAME); @@ -473,8 +539,8 @@ fn read_stat_through_symlink() { } /// Read a directory in the root, and validate some attributes. -#[test] -fn read_stat_directory() { +#[tokio::test] +async fn read_stat_directory() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -482,8 +548,17 @@ fn read_stat_directory() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -495,9 +570,9 @@ fn read_stat_directory() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Read a blob inside a directory. This ensures we successfully populate directory data. -fn read_blob_inside_dir() { +async fn read_blob_inside_dir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -505,8 +580,17 @@ fn read_blob_inside_dir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); @@ -522,10 +606,10 @@ fn read_blob_inside_dir() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Read a blob inside a directory inside a directory. This ensures we properly /// populate directories as we traverse down the structure. -fn read_blob_deep_inside_dir() { +async fn read_blob_deep_inside_dir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -533,8 +617,17 @@ fn read_blob_deep_inside_dir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir .path() @@ -555,8 +648,8 @@ fn read_blob_deep_inside_dir() { } /// Ensure readdir works. -#[test] -fn readdir() { +#[tokio::test] +async fn readdir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -564,8 +657,17 @@ fn readdir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); @@ -601,9 +703,9 @@ fn readdir() { fuser_session.join() } -#[test] +#[tokio::test] /// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. -fn readdir_deep() { +async fn readdir_deep() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -611,8 +713,17 @@ fn readdir_deep() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); @@ -636,8 +747,8 @@ fn readdir_deep() { } /// Check attributes match how they show up in /nix/store normally. -#[test] -fn check_attributes() { +#[tokio::test] +async fn check_attributes() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -645,11 +756,18 @@ fn check_attributes() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_blob_a(bs.clone(), ds.clone(), ps.clone()); - populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone()); - populate_symlink(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_file = tmpdir.path().join(BLOB_A_NAME); @@ -689,10 +807,10 @@ fn check_attributes() { fuser_session.join() } -#[test] +#[tokio::test] /// Ensure we allocate the same inodes for the same directory contents. /// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. -fn compare_inodes_directories() { +async fn compare_inodes_directories() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -700,10 +818,17 @@ fn compare_inodes_directories() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone()); - populate_directory_complicated(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -720,8 +845,8 @@ fn compare_inodes_directories() { /// Ensure we allocate the same inodes for the same directory contents. /// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep -#[test] -fn compare_inodes_files() { +#[tokio::test] +async fn compare_inodes_files() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -729,8 +854,17 @@ fn compare_inodes_files() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); let p_keep2 = tmpdir @@ -750,8 +884,8 @@ fn compare_inodes_files() { /// Ensure we allocate the same inode for symlinks pointing to the same targets. /// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. -#[test] -fn compare_inodes_symlinks() { +#[tokio::test] +async fn compare_inodes_symlinks() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -759,10 +893,17 @@ fn compare_inodes_symlinks() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_directory_complicated(bs.clone(), ds.clone(), ps.clone()); - populate_symlink2(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + populate_symlink2(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); @@ -778,8 +919,8 @@ fn compare_inodes_symlinks() { } /// Check we match paths exactly. -#[test] -fn read_wrong_paths_in_root() { +#[tokio::test] +async fn read_wrong_paths_in_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -787,7 +928,17 @@ fn read_wrong_paths_in_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); // wrong name assert!(!tmpdir @@ -817,8 +968,8 @@ fn read_wrong_paths_in_root() { } /// Make sure writes are not allowed -#[test] -fn disallow_writes() { +#[tokio::test] +async fn disallow_writes() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -827,7 +978,16 @@ fn disallow_writes() { let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); let e = std::fs::File::create(p).expect_err("must fail"); @@ -837,17 +997,26 @@ fn disallow_writes() { fuser_session.join() } -#[test] +#[tokio::test] /// Ensure we get an IO error if the directory service does not have the Directory object. -fn missing_directory() { +async fn missing_directory() { if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); return; } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_pathinfo_without_directory).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -871,17 +1040,26 @@ fn missing_directory() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Ensure we get an IO error if the blob service does not have the blob -fn missing_blob() { +async fn missing_blob() { if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); return; } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_blob_a_without_blob).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index cd3dc01cf..6764eaddb 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -6,8 +6,6 @@ use std::sync::Arc; use std::{ collections::HashMap, fmt::Debug, - fs::File, - io, os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; @@ -57,7 +55,7 @@ impl From for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry( +async fn process_entry( blob_service: Arc, directory_putter: &mut Box, entry: &walkdir::DirEntry, @@ -102,16 +100,17 @@ fn process_entry( .metadata() .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; - let mut file = File::open(entry.path()) + let mut file = tokio::fs::File::open(entry.path()) + .await .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; - let mut writer = blob_service.open_write(); + let mut writer = blob_service.open_write().await; - if let Err(e) = io::copy(&mut file, &mut writer) { + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); }; - let digest = writer.close()?; + let digest = writer.close().await?; return Ok(proto::node::Node::File(proto::FileNode { name: entry.file_name().as_bytes().to_vec().into(), @@ -137,7 +136,7 @@ fn process_entry( /// caller to possibly register it somewhere (and potentially rename it based on /// some naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] -pub fn ingest_path + Debug>( +pub async fn ingest_path + Debug>( blob_service: Arc, directory_service: Arc, p: P, @@ -175,7 +174,8 @@ pub fn ingest_path + Debug>( &mut directory_putter, &entry, maybe_directory, - )?; + ) + .await?; if entry.depth() == 0 { return Ok(node); diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index cc5af488a..4255148fc 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -7,10 +7,8 @@ use crate::{ use count_write::CountWrite; use nix_compat::nar; use sha2::{Digest, Sha256}; -use std::{ - io::{self, BufReader}, - sync::Arc, -}; +use std::{io, sync::Arc}; +use tokio::io::BufReader; use tracing::warn; /// Invoke [write_nar], and return the size and sha256 digest of the produced @@ -75,8 +73,11 @@ fn walk_node( )) })?; - let mut blob_reader = match blob_service - .open_read(&digest) + // HACK: blob_service is async, but this function isn't async yet.. + let tokio_handle = tokio::runtime::Handle::current(); + + let blob_reader = match tokio_handle + .block_on(async { blob_service.open_read(&digest).await }) .map_err(RenderError::StoreError)? { Some(blob_reader) => Ok(BufReader::new(blob_reader)), @@ -90,7 +91,7 @@ fn walk_node( .file( proto_file_node.executable, proto_file_node.size.into(), - &mut blob_reader, + &mut tokio_util::io::SyncIoBridge::new(blob_reader), ) .map_err(RenderError::NARWriterError)?; } diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 2d8c39653..8bd3083c1 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,6 @@ -use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead}; +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::TryFutureExt; use std::{ collections::VecDeque, io, @@ -6,7 +8,6 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -103,7 +104,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { return Err(Status::internal("not implemented")); } - match self.blob_service.has(&req_digest) { + match self.blob_service.has(&req_digest).await { Ok(true) => Ok(Response::new(super::BlobMeta::default())), Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Err(e) => Err(e.into()), @@ -122,13 +123,8 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - match self.blob_service.open_read(&req_digest) { + match self.blob_service.open_read(&req_digest).await { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead< - _, - BytesMutWithDefaultCapacity<{ 100 * 1024 }>, - > = reader.into(); - fn stream_mapper( x: Result, ) -> Result { @@ -138,7 +134,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { } } - let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + let chunks_stream = ReaderStream::new(reader).map(stream_mapper); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), @@ -158,35 +154,28 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); - let data_reader = tokio_util::io::StreamReader::new(data_stream); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); - // prepare a writer, which we'll use in the blocking task below. - let mut writer = self.blob_service.open_write(); + let mut blob_writer = pin!(self.blob_service.open_write().await); - let result = task::spawn_blocking(move || -> Result { - // construct a sync reader to the data - let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); - - io::copy(&mut reader, &mut writer).map_err(|e| { + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { warn!("error copying: {}", e); Status::internal("error copying") })?; - let digest = writer - .close() - .map_err(|e| { - warn!("error closing stream: {}", e); - Status::internal("error closing stream") - })? - .to_vec(); - - Ok(super::PutBlobResponse { - digest: digest.into(), + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") }) - }) - .await - .map_err(|_| Status::internal("failed to wait for task"))??; + .await? + .to_vec(); - Ok(Response::new(result)) + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) } } diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 16a2fd51d..33861d9ff 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -71,10 +71,12 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra match request.into_inner().node { None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&root_node) - .expect("error during nar calculation"); // TODO: handle error + let path_info_service = self.path_info_service.clone(); + let (nar_size, nar_sha256) = + task::spawn_blocking(move || path_info_service.calculate_nar(&root_node)) + .await + .unwrap() + .expect("error during nar calculation"); // TODO: handle error Ok(Response::new(proto::CalculateNarResponse { nar_size, diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 044769ce5..97a2694ac 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -12,8 +12,6 @@ mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; mod grpc_pathinfoservice_wrapper; -mod sync_read_into_async_read; - pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs deleted file mode 100644 index 0a0ef0197..000000000 --- a/tvix/store/src/proto/sync_read_into_async_read.rs +++ /dev/null @@ -1,158 +0,0 @@ -use bytes::Buf; -use core::task::Poll::Ready; -use futures::ready; -use futures::Future; -use std::io; -use std::io::Read; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::runtime::Handle; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; - -#[derive(Debug)] -enum State { - Idle(Option), - Busy(JoinHandle<(io::Result, Buf)>), -} - -use State::{Busy, Idle}; - -/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a -/// synchronous API. -#[derive(Debug)] -pub struct SyncReadIntoAsyncRead { - state: Mutex>, - reader: Arc>, - rt: Handle, -} - -impl SyncReadIntoAsyncRead { - /// This must be called from within a Tokio runtime context, or else it will panic. - #[track_caller] - pub fn new(rt: Handle, reader: R) -> Self { - Self { - rt, - state: State::Idle(None).into(), - reader: Arc::new(reader.into()), - } - } - - /// This must be called from within a Tokio runtime context, or else it will panic. - pub fn new_with_reader(readable: R) -> Self { - Self::new(Handle::current(), readable) - } -} - -/// Repeats operations that are interrupted. -macro_rules! uninterruptibly { - ($e:expr) => {{ - loop { - match $e { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - res => break res, - } - } - }}; -} - -impl< - R: Read + Send + 'static + std::marker::Unpin, - Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, - > AsyncRead for SyncReadIntoAsyncRead -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let me = self.get_mut(); - // Do we need this mutex? - let state = me.state.get_mut(); - - loop { - match state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap_or_default(); - - if buf.has_remaining() { - // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` - // The `rest` is stuffed into the `buf_cell` for further poll_read. - // The other is completely consumed into the unfilled destination. - // `rest` can be empty. - let mut adjusted_src = - buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); - let copied_size = adjusted_src.remaining(); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); - dst.set_filled(copied_size); - *buf_cell = Some(buf); - return Ready(Ok(())); - } - - let reader = me.reader.clone(); - *state = Busy(me.rt.spawn_blocking(move || { - let result = uninterruptibly!(reader.blocking_lock().read( - // SAFETY: `reader.read` will *ONLY* write initialized bytes - // and never *READ* uninitialized bytes - // inside this buffer. - // - // Furthermore, casting the slice as `*mut [u8]` - // is safe because it has the same layout. - // - // Finally, the pointer obtained is valid and owned - // by `buf` only as we have a valid mutable reference - // to it, it is valid for write. - // - // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 - unsafe { - &mut *(buf.chunk_mut().as_uninit_slice_mut() - as *mut [std::mem::MaybeUninit] - as *mut [u8]) - } - )); - - if let Ok(n) = result { - // SAFETY: given we initialize `n` bytes, we can move `n` bytes - // forward. - unsafe { - buf.advance_mut(n); - } - } - - (result, buf) - })); - } - Busy(ref mut rx) => { - let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; - - match result { - Ok(n) => { - if n > 0 { - let remaining = std::cmp::min(n, dst.remaining()); - let mut adjusted_src = buf.copy_to_bytes(remaining); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); - dst.advance(remaining); - } - *state = Idle(Some(buf)); - return Ready(Ok(())); - } - Err(e) => { - *state = Idle(None); - return Ready(Err(e)); - } - } - } - } - } - } -} - -impl From for SyncReadIntoAsyncRead { - /// This must be called from within a Tokio runtime context, or else it will panic. - fn from(value: R) -> Self { - Self::new_with_reader(value) - } -} diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index ccaa4f4e4..45b9c3440 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -9,8 +9,8 @@ use tempfile::TempDir; use std::os::unix::ffi::OsStrExt; #[cfg(target_family = "unix")] -#[test] -fn symlink() { +#[tokio::test] +async fn symlink() { let tmpdir = TempDir::new().unwrap(); std::fs::create_dir_all(&tmpdir).unwrap(); @@ -25,6 +25,7 @@ fn symlink() { gen_directory_service(), tmpdir.path().join("doesntmatter"), ) + .await .expect("must succeed"); assert_eq!( @@ -36,8 +37,8 @@ fn symlink() { ) } -#[test] -fn single_file() { +#[tokio::test] +async fn single_file() { let tmpdir = TempDir::new().unwrap(); std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); @@ -49,6 +50,7 @@ fn single_file() { gen_directory_service(), tmpdir.path().join("root"), ) + .await .expect("must succeed"); assert_eq!( @@ -62,12 +64,12 @@ fn single_file() { ); // ensure the blob has been uploaded - assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).unwrap()); + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); } #[cfg(target_family = "unix")] -#[test] -fn complicated() { +#[tokio::test] +async fn complicated() { let tmpdir = TempDir::new().unwrap(); // File ``.keep` @@ -87,6 +89,7 @@ fn complicated() { directory_service.clone(), tmpdir.path(), ) + .await .expect("must succeed"); // ensure root_node matched expectations @@ -116,5 +119,5 @@ fn complicated() { .is_some()); // ensure EMPTY_BLOB_CONTENTS has been uploaded - assert!(blob_service.has(&EMPTY_BLOB_DIGEST).unwrap()); + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); } diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 102bf5e7c..22dbd7bcb 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -28,22 +28,26 @@ fn single_symlink() { } /// Make sure the NARRenderer fails if a referred blob doesn't exist. -#[test] -fn single_file_missing_blob() { +#[tokio::test] +async fn single_file_missing_blob() { let mut buf: Vec = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - // the blobservice is empty intentionally, to provoke the error. - gen_blob_service(), - gen_directory_service(), - ) + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + gen_blob_service(), + gen_directory_service(), + ) + }) + .await + .unwrap() .expect_err("must fail"); match e { @@ -56,34 +60,43 @@ fn single_file_missing_blob() { /// Make sure the NAR Renderer fails if the returned blob meta has another size /// than specified in the proto node. -#[test] -fn single_file_wrong_blob_size() { +#[tokio::test] +async fn single_file_wrong_blob_size() { let blob_service = gen_blob_service(); // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), &mut writer, ) + .await .unwrap(); - assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + let bs = blob_service.clone(); // Test with a root FileNode of a too big size { let mut buf: Vec = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 42, // <- note the wrong size here! - executable: false, - }), - blob_service.clone(), - gen_directory_service(), - ) + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + }) + .await + .unwrap() .expect_err("must fail"); match e { @@ -94,22 +107,27 @@ fn single_file_wrong_blob_size() { } } + let bs = blob_service.clone(); // Test with a root FileNode of a too small size { let mut buf: Vec = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 2, // <- note the wrong size here! - executable: false, - }), - blob_service, - gen_directory_service(), - ) - .expect_err("must fail"); + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + .expect_err("must fail") + }) + .await + .unwrap(); match e { crate::nar::RenderError::NARWriterError(e) => { @@ -120,51 +138,63 @@ fn single_file_wrong_blob_size() { } } -#[test] -fn single_file() { +#[tokio::test] +async fn single_file() { let blob_service = gen_blob_service(); // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.clone()), &mut writer, ) + .await .unwrap(); - assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); let mut buf: Vec = vec![]; - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - blob_service, - gen_directory_service(), - ) - .expect("must succeed"); + let buf = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + blob_service, + gen_directory_service(), + ) + .expect("must succeed"); + + buf + }) + .await + .unwrap(); assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); } -#[test] -fn test_complicated() { +#[tokio::test] +async fn test_complicated() { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); // put all data into the stores. // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.clone()), &mut writer, ) + .await .unwrap(); - assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().unwrap()); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); directory_service @@ -173,30 +203,44 @@ fn test_complicated() { let mut buf: Vec = vec![]; - write_nar( - &mut buf, - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - blob_service.clone(), - directory_service.clone(), - ) - .expect("must succeed"); + let bs = blob_service.clone(); + let ds = directory_service.clone(); + + let buf = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .expect("must succeed"); + buf + }) + .await + .unwrap(); assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); // ensure calculate_nar does return the correct sha256 digest and sum. - let (nar_size, nar_digest) = calculate_size_and_sha256( - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - blob_service, - directory_service, - ) + let bs = blob_service.clone(); + let ds = directory_service.clone(); + let (nar_size, nar_digest) = tokio::task::spawn_blocking(move || { + calculate_size_and_sha256( + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + }) + .await + .unwrap() .expect("must succeed"); assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);