From 312f50f51f6f25d7e1499c0fe3e2558a32856d75 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 10 Sep 2025 16:00:30 -0700 Subject: [PATCH 1/7] rebuild lock --- Cargo.lock | 373 ++---------- deny.toml | 1 - python/Cargo.lock | 332 +---------- python/python/lance/tf/tfrecord.py | 5 - python/python/tests/test_tf.py | 225 +------ python/src/executor.rs | 20 - python/src/lib.rs | 131 +--- rust/lance-core/src/utils/testing.rs | 2 + rust/lance/Cargo.toml | 4 +- rust/lance/src/dataset/transaction.rs | 194 +----- rust/lance/src/dataset/write/commit.rs | 9 - rust/lance/src/utils.rs | 2 - rust/lance/src/utils/tfrecord.rs | 792 ------------------------- 13 files changed, 58 insertions(+), 2032 deletions(-) delete mode 100644 python/python/lance/tf/tfrecord.py delete mode 100644 rust/lance/src/utils/tfrecord.rs diff --git a/Cargo.lock b/Cargo.lock index cf1437085cd..f238def2738 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -413,17 +413,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - [[package]] name = "async-channel" version = "2.3.1" @@ -436,54 +425,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-executor" -version = "1.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb812ffb58524bdd10860d7d974e2f01cc0950c2438a74ee5ec2e2280c6c4ffa" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "pin-project-lite", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.3.1", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "async-io" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" -dependencies = [ - "async-lock", - "cfg-if", - "concurrent-queue", - "futures-io", - "futures-lite", - "parking", - "polling", - "rustix 1.0.7", - "slab", - "tracing", - "windows-sys 0.59.0", -] - [[package]] name = "async-lock" version = "3.4.0" @@ -515,38 +456,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "async-std" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730294c1c08c2e0f85759590518f6333f0d5a0a766a27d519c1b244c3dfd8a24" -dependencies = [ - "async-channel 1.9.0", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async-trait" version = "0.1.88" @@ -1246,19 +1155,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" -dependencies = [ - "async-channel 2.3.1", - "async-task", - "futures-io", - "futures-lite", - "piper", -] - [[package]] name = "bon" version = "3.6.4" @@ -2584,7 +2480,7 @@ dependencies = [ "itertools 0.14.0", "object_store", "pbjson-types", - "prost 0.13.5", + "prost", "substrait 0.58.0", "tokio", "url", @@ -3016,12 +2912,6 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca81e6b4777c89fd810c25a4be2b1bd93ea034fbe58e6a75216a34c6b82c539b" -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "4.0.3" @@ -3106,12 +2996,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "fixedbitset" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" - [[package]] name = "fixedbitset" version = "0.5.7" @@ -3272,19 +3156,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-lite" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.31" @@ -3548,17 +3419,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "hostname" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" -dependencies = [ - "libc", - "match_cfg", - "winapi", -] - [[package]] name = "htmlescape" version = "0.3.1" @@ -3999,12 +3859,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "integer-encoding" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" - [[package]] name = "io-uring" version = "0.7.8" @@ -4228,15 +4082,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lance" version = "0.35.0" @@ -4300,9 +4145,8 @@ dependencies = [ "pin-project", "pprof", "pretty_assertions", - "prost 0.12.6", - "prost 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-types", "rand 0.9.1", "roaring", "rstest", @@ -4312,7 +4156,6 @@ dependencies = [ "tantivy", "tempfile", "test-log", - "tfrecord", "tokio", "tokio-stream", "tracing", @@ -4374,7 +4217,7 @@ dependencies = [ "object_store", "pin-project", "proptest", - "prost 0.13.5", + "prost", "rand 0.9.1", "roaring", "serde_json", @@ -4410,7 +4253,7 @@ dependencies = [ "lance-datagen", "log", "pin-project", - "prost 0.13.5", + "prost", "snafu", "substrait-expr", "tempfile", @@ -4468,9 +4311,9 @@ dependencies = [ "num-traits", "pprof", "proptest", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "protobuf-src", "rand 0.9.1", "rand_xoshiro", @@ -4540,9 +4383,9 @@ dependencies = [ "pprof", "pretty_assertions", "proptest", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "protobuf-src", "rand 0.9.1", "roaring", @@ -4564,7 +4407,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "arrow-select", - "async-channel 2.3.1", + "async-channel", "async-recursion", "async-trait", "bitpacking", @@ -4603,9 +4446,9 @@ dependencies = [ "num-traits", "object_store", "pprof", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "protobuf-src", "rand 0.9.1", "rayon", @@ -4655,7 +4498,7 @@ dependencies = [ "path_abs", "pin-project", "pprof", - "prost 0.13.5", + "prost", "rand 0.9.1", "rstest", "serde", @@ -4725,9 +4568,9 @@ dependencies = [ "pprof", "pretty_assertions", "proptest", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "protobuf-src", "rand 0.9.1", "rangemap", @@ -5120,9 +4963,6 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" -dependencies = [ - "value-bag", -] [[package]] name = "loom" @@ -5207,12 +5047,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" -[[package]] -name = "match_cfg" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" - [[package]] name = "matchers" version = "0.2.0" @@ -5412,15 +5246,6 @@ dependencies = [ "libc", ] -[[package]] -name = "noisy_float" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978fe6e6ebc0bf53de533cd456ca2d9de13de13856eda1518a285d7705a213af" -dependencies = [ - "num-traits", -] - [[package]] name = "nom" version = "7.1.3" @@ -5924,8 +5749,8 @@ checksum = "6eea3058763d6e656105d1403cb04e0a41b7bbac6362d413e7c33be0c32279c9" dependencies = [ "heck", "itertools 0.13.0", - "prost 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-types", ] [[package]] @@ -5938,8 +5763,8 @@ dependencies = [ "chrono", "pbjson", "pbjson-build", - "prost 0.13.5", - "prost-build 0.13.5", + "prost", + "prost-build", "serde", ] @@ -5984,23 +5809,13 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7" -[[package]] -name = "petgraph" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" -dependencies = [ - "fixedbitset 0.4.2", - "indexmap", -] - [[package]] name = "petgraph" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "indexmap", ] @@ -6010,7 +5825,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54acf3a685220b533e437e264e4d932cfbdc4cc7ec0cd232ed73c08d03b8a7ca" dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "hashbrown 0.15.3", "indexmap", "serde", @@ -6125,17 +5940,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" -dependencies = [ - "atomic-waker", - "fastrand", - "futures-io", -] - [[package]] name = "pkcs1" version = "0.7.5" @@ -6218,21 +6022,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "polling" -version = "3.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50" -dependencies = [ - "cfg-if", - "concurrent-queue", - "hermit-abi", - "pin-project-lite", - "rustix 1.0.7", - "tracing", - "windows-sys 0.59.0", -] - [[package]] name = "portable-atomic" version = "1.11.0" @@ -6403,16 +6192,6 @@ dependencies = [ "unarray", ] -[[package]] -name = "prost" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" -dependencies = [ - "bytes", - "prost-derive 0.12.6", -] - [[package]] name = "prost" version = "0.13.5" @@ -6420,28 +6199,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive 0.13.5", -] - -[[package]] -name = "prost-build" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" -dependencies = [ - "bytes", - "heck", - "itertools 0.12.1", - "log", - "multimap", - "once_cell", - "petgraph 0.6.5", - "prettyplease", - "prost 0.12.6", - "prost-types 0.12.6", - "regex", - "syn 2.0.106", - "tempfile", + "prost-derive", ] [[package]] @@ -6457,26 +6215,13 @@ dependencies = [ "once_cell", "petgraph 0.7.1", "prettyplease", - "prost 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-types", "regex", "syn 2.0.106", "tempfile", ] -[[package]] -name = "prost-derive" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" -dependencies = [ - "anyhow", - "itertools 0.12.1", - "proc-macro2", - "quote", - "syn 2.0.106", -] - [[package]] name = "prost-derive" version = "0.13.5" @@ -6490,22 +6235,13 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "prost-types" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" -dependencies = [ - "prost 0.12.6", -] - [[package]] name = "prost-types" version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost 0.13.5", + "prost", ] [[package]] @@ -7760,9 +7496,9 @@ checksum = "b1772d041c37cc7e6477733c76b2acf4ee36bd52b2ae4d9ea0ec9c87d003db32" dependencies = [ "heck", "prettyplease", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "regress", "schemars", "semver", @@ -7785,9 +7521,9 @@ dependencies = [ "pbjson-build", "pbjson-types", "prettyplease", - "prost 0.13.5", - "prost-build 0.13.5", - "prost-types 0.13.5", + "prost", + "prost-build", + "prost-types", "regress", "schemars", "semver", @@ -7806,7 +7542,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d091cf06bc7808bd81eb01f5f5b77b2b14288bb022501a2dcad78633c65262f" dependencies = [ "once_cell", - "prost 0.13.5", + "prost", "substrait 0.50.4", "substrait-expr-funcgen", "substrait-expr-macros", @@ -8142,35 +7878,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "tfrecord" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7036e822a1d906b8a49620e524a6fe21ab956583ac77f1427e908c61499a1f78" -dependencies = [ - "anyhow", - "async-std", - "bytemuck", - "crc", - "flate2", - "futures", - "glob", - "hex", - "hostname", - "integer-encoding 4.0.2", - "itertools 0.11.0", - "noisy_float", - "num", - "num-traits", - "once_cell", - "pin-project", - "prost 0.12.6", - "prost-build 0.12.6", - "tar", - "thiserror 1.0.69", - "ureq", -] - [[package]] name = "thiserror" version = "1.0.69" @@ -8228,7 +7935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", - "integer-encoding 3.0.4", + "integer-encoding", "ordered-float 2.10.1", ] @@ -8837,12 +8544,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-bag" -version = "1.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/deny.toml b/deny.toml index dda717373ae..3605d1e7cc1 100644 --- a/deny.toml +++ b/deny.toml @@ -84,7 +84,6 @@ ignore = [ { id = "RUSTSEC-2024-0384", reason = "`instant` is used by tantivy" }, { id = "RUSTSEC-2024-0370", reason = "`proc-macro-error` is used by jieba-rs via include-flate" }, { id = "RUSTSEC-2024-0436", reason = "`paste` is used by datafusion" }, - { id = "RUSTSEC-2025-0052", reason = "`async-std` is used by tfrecord" }, { id = "RUSTSEC-2023-0071", reason = "`rsa` is used by opendal via reqsign" }, ] # If this is true, then cargo deny will use the git executable to fetch advisory database. diff --git a/python/Cargo.lock b/python/Cargo.lock index 13cf6802e3e..bd2a2b5add5 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -456,17 +456,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - [[package]] name = "async-channel" version = "2.4.0" @@ -496,20 +485,6 @@ dependencies = [ "zstd-safe", ] -[[package]] -name = "async-executor" -version = "1.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb812ffb58524bdd10860d7d974e2f01cc0950c2438a74ee5ec2e2280c6c4ffa" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "pin-project-lite", - "slab", -] - [[package]] name = "async-ffi" version = "0.5.0" @@ -519,40 +494,6 @@ dependencies = [ "abi_stable", ] -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.4.0", - "async-executor", - "async-io", - "async-lock", - "blocking", - "futures-lite", - "once_cell", -] - -[[package]] -name = "async-io" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" -dependencies = [ - "async-lock", - "cfg-if", - "concurrent-queue", - "futures-io", - "futures-lite", - "parking", - "polling", - "rustix 1.0.7", - "slab", - "tracing", - "windows-sys 0.59.0", -] - [[package]] name = "async-lock" version = "3.4.0" @@ -584,38 +525,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "async-std" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730294c1c08c2e0f85759590518f6333f0d5a0a766a27d519c1b244c3dfd8a24" -dependencies = [ - "async-channel 1.9.0", - "async-global-executor", - "async-io", - "async-lock", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async-trait" version = "0.1.88" @@ -1215,19 +1124,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" -dependencies = [ - "async-channel 2.4.0", - "async-task", - "futures-io", - "futures-lite", - "piper", -] - [[package]] name = "bon" version = "3.6.4" @@ -1601,21 +1497,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32c" version = "0.6.8" @@ -2738,12 +2619,6 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca81e6b4777c89fd810c25a4be2b1bd93ea034fbe58e6a75216a34c6b82c539b" -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "event-listener" version = "4.0.3" @@ -2947,19 +2822,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-lite" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.31" @@ -3197,17 +3059,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "hostname" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" -dependencies = [ - "libc", - "match_cfg", - "winapi", -] - [[package]] name = "htmlescape" version = "0.3.1" @@ -3363,7 +3214,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.2", "tower-service", - "webpki-roots 1.0.1", + "webpki-roots", ] [[package]] @@ -3605,12 +3456,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "integer-encoding" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" - [[package]] name = "io-uring" version = "0.7.8" @@ -3814,15 +3659,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lance" version = "0.35.0" @@ -3870,7 +3706,6 @@ dependencies = [ "object_store", "permutation", "pin-project", - "prost 0.12.6", "prost 0.13.5", "prost-types 0.13.5", "rand 0.9.1", @@ -3880,7 +3715,6 @@ dependencies = [ "snafu", "tantivy", "tempfile", - "tfrecord", "tokio", "tokio-stream", "tracing", @@ -4076,7 +3910,7 @@ dependencies = [ "arrow-ord", "arrow-schema", "arrow-select", - "async-channel 2.4.0", + "async-channel", "async-recursion", "async-trait", "bitpacking", @@ -4562,9 +4396,6 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" -dependencies = [ - "value-bag", -] [[package]] name = "loom" @@ -4633,12 +4464,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "match_cfg" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" - [[package]] name = "matchers" version = "0.1.0" @@ -4766,15 +4591,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b" -[[package]] -name = "noisy_float" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978fe6e6ebc0bf53de533cd456ca2d9de13de13856eda1518a285d7705a213af" -dependencies = [ - "num-traits", -] - [[package]] name = "nom" version = "7.1.3" @@ -5393,17 +5209,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" -dependencies = [ - "atomic-waker", - "fastrand", - "futures-io", -] - [[package]] name = "pkcs1" version = "0.7.5" @@ -5448,21 +5253,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "polling" -version = "3.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50" -dependencies = [ - "cfg-if", - "concurrent-queue", - "hermit-abi", - "pin-project-lite", - "rustix 1.0.7", - "tracing", - "windows-sys 0.59.0", -] - [[package]] name = "portable-atomic" version = "1.11.1" @@ -5565,16 +5355,6 @@ dependencies = [ "prost-derive 0.11.9", ] -[[package]] -name = "prost" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" -dependencies = [ - "bytes", - "prost-derive 0.12.6", -] - [[package]] name = "prost" version = "0.13.5" @@ -5607,27 +5387,6 @@ dependencies = [ "which", ] -[[package]] -name = "prost-build" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" -dependencies = [ - "bytes", - "heck 0.5.0", - "itertools 0.11.0", - "log", - "multimap", - "once_cell", - "petgraph 0.6.5", - "prettyplease 0.2.35", - "prost 0.12.6", - "prost-types 0.12.6", - "regex", - "syn 2.0.104", - "tempfile", -] - [[package]] name = "prost-build" version = "0.13.5" @@ -5661,19 +5420,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "prost-derive" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" -dependencies = [ - "anyhow", - "itertools 0.11.0", - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "prost-derive" version = "0.13.5" @@ -5696,15 +5442,6 @@ dependencies = [ "prost 0.11.9", ] -[[package]] -name = "prost-types" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" -dependencies = [ - "prost 0.12.6", -] - [[package]] name = "prost-types" version = "0.13.5" @@ -6231,7 +5968,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 1.0.1", + "webpki-roots", ] [[package]] @@ -6378,7 +6115,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" dependencies = [ "aws-lc-rs", - "log", "once_cell", "ring", "rustls-pki-types", @@ -7199,35 +6935,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "tfrecord" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7036e822a1d906b8a49620e524a6fe21ab956583ac77f1427e908c61499a1f78" -dependencies = [ - "anyhow", - "async-std", - "bytemuck", - "crc", - "flate2", - "futures", - "glob", - "hex", - "hostname", - "integer-encoding 4.0.2", - "itertools 0.11.0", - "noisy_float", - "num", - "num-traits", - "once_cell", - "pin-project", - "prost 0.12.6", - "prost-build 0.12.6", - "tar", - "thiserror 1.0.69", - "ureq", -] - [[package]] name = "thiserror" version = "1.0.69" @@ -7284,7 +6991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", - "integer-encoding 3.0.4", + "integer-encoding", "ordered-float 2.10.1", ] @@ -7699,22 +7406,6 @@ version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" -[[package]] -name = "ureq" -version = "2.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" -dependencies = [ - "base64 0.22.1", - "flate2", - "log", - "once_cell", - "rustls 0.23.28", - "rustls-pki-types", - "url", - "webpki-roots 0.26.11", -] - [[package]] name = "url" version = "2.5.4" @@ -7768,12 +7459,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-bag" -version = "1.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" - [[package]] name = "version_check" version = "0.9.5" @@ -7930,15 +7615,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" -dependencies = [ - "webpki-roots 1.0.1", -] - [[package]] name = "webpki-roots" version = "1.0.1" diff --git a/python/python/lance/tf/tfrecord.py b/python/python/lance/tf/tfrecord.py deleted file mode 100644 index ef9d1235e4b..00000000000 --- a/python/python/lance/tf/tfrecord.py +++ /dev/null @@ -1,5 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright The Lance Authors - -from lance.lance import infer_tfrecord_schema as infer_tfrecord_schema -from lance.lance import read_tfrecord as read_tfrecord diff --git a/python/python/tests/test_tf.py b/python/python/tests/test_tf.py index 878a8b29425..87a01aa5e25 100644 --- a/python/python/tests/test_tf.py +++ b/python/python/tests/test_tf.py @@ -5,12 +5,11 @@ import warnings import lance -import ml_dtypes import numpy as np import pandas as pd import pyarrow as pa import pytest -from lance.arrow import BFloat16Type, ImageArray, bfloat16_array +from lance.arrow import ImageArray from lance.fragment import LanceFragment pytest.skip("Skip tensorflow tests", allow_module_level=True) @@ -32,7 +31,6 @@ lance_fragments, lance_take_batches, ) -from lance.tf.tfrecord import infer_tfrecord_schema, read_tfrecord # noqa: E402 @pytest.fixture @@ -313,224 +311,3 @@ def test_image_types(tmp_path): assert batch["tensor_images"].shape == (3, 1, 1, 4) assert batch["tensor_images"].dtype == tf.uint8 assert batch["tensor_images"].numpy().tolist() == tensors.to_numpy().tolist() - - -@pytest.fixture -def sample_tf_example(): - # Create a TFRecord with a string, float, int, and a tensor - tensor = tf.constant(np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=np.float32)) - tensor_bf16 = tf.constant( - np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=ml_dtypes.bfloat16) - ) - - feature = { - "1_int": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), - "2_int_list": tf.train.Feature(int64_list=tf.train.Int64List(value=[1, 2, 3])), - "3_float": tf.train.Feature(float_list=tf.train.FloatList(value=[1.0])), - "4_float_list": tf.train.Feature( - float_list=tf.train.FloatList(value=[1.0, 2.0, 3.0]) - ), - "5_bytes": tf.train.Feature( - bytes_list=tf.train.BytesList(value=[b"Hello, TensorFlow!"]) - ), - "6_bytes_list": tf.train.Feature( - bytes_list=tf.train.BytesList( - value=[b"Hello, TensorFlow!", b"Hello, Lance!"] - ) - ), - "7_string": tf.train.Feature( - bytes_list=tf.train.BytesList(value=[b"Hello, TensorFlow!"]) - ), - "8_tensor": tf.train.Feature( - bytes_list=tf.train.BytesList( - value=[tf.io.serialize_tensor(tensor).numpy()] - ) - ), - "9_tensor_bf16": tf.train.Feature( - bytes_list=tf.train.BytesList( - value=[tf.io.serialize_tensor(tensor_bf16).numpy()] - ) - ), - } - - return tf.train.Example(features=tf.train.Features(feature=feature)) - - -def test_tfrecord_parsing(tmp_path, sample_tf_example): - serialized = sample_tf_example.SerializeToString() - - path = tmp_path / "test.tfrecord" - with tf.io.TFRecordWriter(str(path)) as writer: - writer.write(serialized) - - inferred_schema = infer_tfrecord_schema(str(path)) - - assert inferred_schema == pa.schema( - { - "1_int": pa.int64(), - "2_int_list": pa.list_(pa.int64()), - "3_float": pa.float32(), - "4_float_list": pa.list_(pa.float32()), - "5_bytes": pa.binary(), - "6_bytes_list": pa.list_(pa.binary()), - # tensors and strings assumed binary - "7_string": pa.binary(), - "8_tensor": pa.binary(), - "9_tensor_bf16": pa.binary(), - } - ) - - inferred_schema = infer_tfrecord_schema( - str(path), - tensor_features=["8_tensor", "9_tensor_bf16"], - string_features=["7_string"], - ) - assert inferred_schema == pa.schema( - { - "1_int": pa.int64(), - "2_int_list": pa.list_(pa.int64()), - "3_float": pa.float32(), - "4_float_list": pa.list_(pa.float32()), - "5_bytes": pa.binary(), - "6_bytes_list": pa.list_(pa.binary()), - "7_string": pa.string(), - "8_tensor": pa.fixed_shape_tensor(pa.float32(), [2, 3]), - "9_tensor_bf16": pa.fixed_shape_tensor(BFloat16Type(), [2, 3]), - } - ) - - reader = read_tfrecord(str(path), inferred_schema) - assert reader.schema == inferred_schema - table = reader.read_all() - - assert table.schema == inferred_schema - - tensor_type = pa.fixed_shape_tensor(pa.float32(), [2, 3]) - inner = pa.array([float(x) for x in range(1, 7)], pa.float32()) - storage = pa.FixedSizeListArray.from_arrays(inner, 6) - f32_array = pa.ExtensionArray.from_storage(tensor_type, storage) - - tensor_type = pa.fixed_shape_tensor(BFloat16Type(), [2, 3]) - bf16_array = bfloat16_array([float(x) for x in range(1, 7)]) - storage = pa.FixedSizeListArray.from_arrays(bf16_array, 6) - bf16_array = pa.ExtensionArray.from_storage(tensor_type, storage) - - expected_data = pa.table( - { - "1_int": pa.array([1]), - "2_int_list": pa.array([[1, 2, 3]]), - "3_float": pa.array([1.0], pa.float32()), - "4_float_list": pa.array([[1.0, 2.0, 3.0]], pa.list_(pa.float32())), - "5_bytes": pa.array([b"Hello, TensorFlow!"]), - "6_bytes_list": pa.array([[b"Hello, TensorFlow!", b"Hello, Lance!"]]), - "7_string": pa.array(["Hello, TensorFlow!"]), - "8_tensor": f32_array, - "9_tensor_bf16": bf16_array, - } - ) - - assert table == expected_data - - -def test_tfrecord_roundtrip(tmp_path, sample_tf_example): - serialized = sample_tf_example.SerializeToString() - - path = tmp_path / "test.tfrecord" - with tf.io.TFRecordWriter(str(path)) as writer: - writer.write(serialized) - - schema = infer_tfrecord_schema( - str(path), - tensor_features=["8_tensor", "9_tensor_bf16"], - string_features=["7_string"], - ) - - table = read_tfrecord(str(path), schema).read_all() - - # Can roundtrip to lance - dataset_uri = tmp_path / "dataset" - dataset = lance.write_dataset(table, dataset_uri) - assert dataset.schema == table.schema - assert dataset.to_table() == table - - # TODO: validate we can roundtrip with from_lance() - # tf_ds = from_lance(dataset, batch_size=1) - - -def test_tfrecord_parsing_nulls(tmp_path): - # Make sure we don't trip up on missing values - tensor = tf.constant(np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=np.float32)) - - features = [ - { - "a": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), - "b": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), - "c": tf.train.Feature(float_list=tf.train.FloatList(value=[1.0])), - "d": tf.train.Feature( - bytes_list=tf.train.BytesList( - value=[tf.io.serialize_tensor(tensor).numpy()] - ) - ), - }, - { - "a": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), - }, - { - "a": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), - "b": tf.train.Feature(int64_list=tf.train.Int64List(value=[1, 2, 3])), - "c": tf.train.Feature(float_list=tf.train.FloatList(value=[1.0])), - }, - ] - - path = tmp_path / "test.tfrecord" - with tf.io.TFRecordWriter(str(path)) as writer: - for feature in features: - example_proto = tf.train.Example( - features=tf.train.Features(feature=feature) - ) - serialized = example_proto.SerializeToString() - writer.write(serialized) - - inferred_schema = infer_tfrecord_schema(str(path), tensor_features=["d"]) - assert inferred_schema == pa.schema( - { - "a": pa.int64(), - "b": pa.list_(pa.int64()), - "c": pa.float32(), - "d": pa.fixed_shape_tensor(pa.float32(), [2, 3]), - } - ) - - tensor_type = pa.fixed_shape_tensor(pa.float32(), [2, 3]) - inner = pa.array([float(x) for x in range(1, 7)] + [None] * 12, pa.float32()) - storage = pa.FixedSizeListArray.from_arrays(inner, 6) - f32_array = pa.ExtensionArray.from_storage(tensor_type, storage) - - data = read_tfrecord(str(path), inferred_schema).read_all() - expected = pa.table( - { - "a": pa.array([1, 1, 1]), - "b": pa.array([[1], [], [1, 2, 3]]), - "c": pa.array([1.0, None, 1.0], pa.float32()), - "d": f32_array, - } - ) - - assert data == expected - - # can do projection - read_schema = pa.schema( - { - "a": pa.int64(), - "c": pa.float32(), - } - ) - expected = pa.table( - { - "a": pa.array([1, 1, 1]), - "c": pa.array([1.0, None, 1.0], pa.float32()), - } - ) - - data = read_tfrecord(str(path), read_schema).read_all() - assert data == expected diff --git a/python/src/executor.rs b/python/src/executor.rs index d4be02d8e12..eeb2371fa0c 100644 --- a/python/src/executor.rs +++ b/python/src/executor.rs @@ -102,26 +102,6 @@ impl BackgroundExecutor { } } - /// Spawn a task in the background - pub fn spawn_background(&self, py: Option>, task: T) - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - if let Some(py) = py { - py.allow_threads(|| { - self.runtime.spawn(task); - }) - } else { - // Python::with_gil is a no-op if the GIL is already held by the thread. - Python::with_gil(|py| { - py.allow_threads(|| { - self.runtime.spawn(task); - }) - }) - } - } - /// Block on a future and wait for it to complete. /// /// This helper method also frees the GIL before blocking. diff --git a/python/src/lib.rs b/python/src/lib.rs index cdf20c95fd1..00b4317ece8 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -27,15 +27,10 @@ use std::sync::{Arc, LazyLock}; use std::ffi::CString; -use ::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use ::arrow::pyarrow::PyArrowType; use ::arrow_schema::Schema as ArrowSchema; use ::lance::arrow::json::ArrowJsonExt; use ::lance::datafusion::LanceTableProvider; - -use arrow_array::{RecordBatch, RecordBatchIterator}; -use arrow_schema::ArrowError; -use datafusion::error::Result; use datafusion_ffi::table_provider::FFI_TableProvider; #[cfg(feature = "datagen")] use datagen::register_datagen; @@ -50,10 +45,9 @@ use file::{ LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, LanceFileStatistics, LanceFileWriter, LancePageMetadata, }; -use futures::StreamExt; use lance_index::DatasetIndexExt; use log::Level; -use pyo3::exceptions::{PyIOError, PyValueError}; +use pyo3::exceptions::PyIOError; use pyo3::prelude::*; use pyo3::types::{PyAny, PyAnyMethods, PyCapsule}; use scanner::ScanStatistics; @@ -197,8 +191,6 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(write_fragments_transaction))?; m.add_wrapped(wrap_pyfunction!(schema_to_json))?; m.add_wrapped(wrap_pyfunction!(json_to_schema))?; - m.add_wrapped(wrap_pyfunction!(infer_tfrecord_schema))?; - m.add_wrapped(wrap_pyfunction!(read_tfrecord))?; m.add_wrapped(wrap_pyfunction!(trace_to_chrome))?; m.add_wrapped(wrap_pyfunction!(capture_trace_events))?; m.add_wrapped(wrap_pyfunction!(shutdown_tracing))?; @@ -261,127 +253,6 @@ pub fn language_model_home() -> PyResult { Ok(String::from(pstr)) } -/// Infer schema from tfrecord file -/// -/// Parameters -/// ---------- -/// uri: str -/// URI of the tfrecord file -/// tensor_features: Optional[List[str]] -/// Names of features that should be treated as tensors. Currently only -/// fixed-shape tensors are supported. -/// string_features: Optional[List[str]] -/// Names of features that should be treated as strings. Otherwise they -/// will be treated as binary. -/// batch_size: Optional[int], default None -/// Number of records to read to infer the schema. If None, will read the -/// entire file. -/// -/// Returns -/// ------- -/// pyarrow.Schema -/// An Arrow schema inferred from the tfrecord file. The schema is -/// alphabetically sorted by field names, since TFRecord doesn't have -/// a concept of field order. -#[pyfunction] -#[pyo3(signature = (uri, *, tensor_features = None, string_features = None, num_rows = None))] -fn infer_tfrecord_schema( - uri: &str, - tensor_features: Option>, - string_features: Option>, - num_rows: Option, -) -> PyResult> { - let tensor_features = tensor_features.unwrap_or_default(); - let tensor_features = tensor_features - .iter() - .map(|s| s.as_str()) - .collect::>(); - let string_features = string_features.unwrap_or_default(); - let string_features = string_features - .iter() - .map(|s| s.as_str()) - .collect::>(); - let schema = RT - .runtime - .block_on(::lance::utils::tfrecord::infer_tfrecord_schema( - uri, - &tensor_features, - &string_features, - num_rows, - )) - .map_err(|err| PyIOError::new_err(err.to_string()))?; - Ok(PyArrowType(schema)) -} - -/// Read tfrecord file as an Arrow stream -/// -/// Parameters -/// ---------- -/// uri: str -/// URI of the tfrecord file -/// schema: pyarrow.Schema -/// Arrow schema of the tfrecord file. Use :py:func:`infer_tfrecord_schema` -/// to infer the schema. The schema is allowed to be a subset of fields; the -/// reader will only parse the fields that are present in the schema. -/// batch_size: int, default 10k -/// Number of records to read per batch. -/// -/// Returns -/// ------- -/// pyarrow.RecordBatchReader -/// An Arrow reader, which can be passed directly to -/// :py:func:`lance.write_dataset`. The output schema will match the schema -/// provided, including field order. -#[pyfunction] -#[pyo3(signature = (uri, schema, *, batch_size = 10_000))] -fn read_tfrecord( - uri: String, - schema: PyArrowType, - batch_size: usize, -) -> PyResult> { - let schema = Arc::new(schema.0); - - let (init_sender, init_receiver) = std::sync::mpsc::channel::>(); - let (batch_sender, batch_receiver) = - std::sync::mpsc::channel::>(); - - let schema_ref = schema.clone(); - RT.spawn_background(None, async move { - let mut stream = - match ::lance::utils::tfrecord::read_tfrecord(&uri, schema_ref, Some(batch_size)).await - { - Ok(stream) => { - init_sender.send(Ok(())).unwrap(); - stream - } - Err(err) => { - init_sender.send(Err(err)).unwrap(); - return; - } - }; - - while let Some(batch) = stream.next().await { - let batch = batch.map_err(|err| ArrowError::ExternalError(Box::new(err))); - batch_sender.send(batch).unwrap(); - } - }); - - // Verify initialization happened successfully - init_receiver.recv().unwrap().map_err(|err| { - PyIOError::new_err(format!("Failed to initialize tfrecord reader: {}", err)) - })?; - - let batch_reader = RecordBatchIterator::new(batch_receiver, schema); - - // TODO: this should be handled by upstream - let stream = FFI_ArrowArrayStream::new(Box::new(batch_reader)); - let stream_reader = ArrowArrayStreamReader::try_new(stream).map_err(|err| { - PyValueError::new_err(format!("Failed to export record batch reader: {}", err)) - })?; - - Ok(PyArrowType(stream_reader)) -} - #[pyfunction] #[pyo3(signature = (dataset,))] fn manifest_needs_migration(dataset: &Bound<'_, PyAny>) -> PyResult { diff --git a/rust/lance-core/src/utils/testing.rs b/rust/lance-core/src/utils/testing.rs index e02c20a362a..f83282d0602 100644 --- a/rust/lance-core/src/utils/testing.rs +++ b/rust/lance-core/src/utils/testing.rs @@ -3,6 +3,8 @@ //! Testing utilities +#![allow(deprecated)] // For PutMultipartOpts which is deprecated but PutMultipartOpts doesn't exist yet + use crate::Result; use async_trait::async_trait; use bytes::Bytes; diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index b259ce7bdf5..4da0fcf4219 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -73,8 +73,6 @@ serde = { workspace = true } moka.workspace = true permutation = { version = "0.4.0" } tantivy.workspace = true -tfrecord = { version = "0.15.0", optional = true, features = ["async"] } -prost_old = { version = "0.12.6", package = "prost", optional = true } aws-sdk-dynamodb = { workspace = true, optional = true } tempfile.workspace = true tracing.workspace = true @@ -113,7 +111,7 @@ default = ["aws", "azure", "gcp", "oss"] fp16kernels = ["lance-linalg/fp16kernels"] # Prevent dynamic linking of lzma, which comes from datafusion cli = ["dep:clap", "lzma-sys/static"] -tensorflow = ["dep:tfrecord", "dep:prost_old"] +tensorflow = [] dynamodb = ["lance-table/dynamodb", "dep:aws-sdk-dynamodb"] dynamodb_tests = ["dynamodb"] substrait = ["lance-datafusion/substrait"] diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 8f46c45872b..f6cf734d476 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -171,8 +171,6 @@ pub enum Operation { replacements: Vec, }, /// Merge a new column in - /// 'fragments' is the final fragments include all data files, the new fragments must align with old ones at rows. - /// 'schema' is not forced to include existed columns, which means we could use Merge to drop column data Merge { fragments: Vec, schema: Schema, @@ -2674,15 +2672,16 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> ( None, Operation::Overwrite { - fragments, schema, .. + fragments, + schema, + config_upsert_values: None, }, ) => { // Validate here because we are going to return early. - schema_fragments_valid(None, schema, fragments)?; + schema_fragments_valid(schema, fragments)?; return Ok(()); } - (None, Operation::Clone { .. }) => return Ok(()), (Some(manifest), _) => manifest, (None, _) => { return Err(Error::invalid_input( @@ -2698,60 +2697,33 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> match operation { Operation::Append { fragments } => { // Fragments must contain all fields in the schema - schema_fragments_valid(Some(manifest), &manifest.schema, fragments) + schema_fragments_valid(&manifest.schema, fragments) } Operation::Project { schema } => { - schema_fragments_valid(Some(manifest), schema, manifest.fragments.as_ref()) - } - Operation::Merge { fragments, schema } => { - merge_fragments_valid(manifest, fragments)?; - schema_fragments_valid(Some(manifest), schema, fragments) + schema_fragments_valid(schema, manifest.fragments.as_ref()) } - Operation::Overwrite { + Operation::Merge { fragments, schema } + | Operation::Overwrite { fragments, schema, config_upsert_values: None, - } => schema_fragments_valid(Some(manifest), schema, fragments), + } => schema_fragments_valid(schema, fragments), Operation::Update { updated_fragments, new_fragments, .. } => { - schema_fragments_valid(Some(manifest), &manifest.schema, updated_fragments)?; - schema_fragments_valid(Some(manifest), &manifest.schema, new_fragments) + schema_fragments_valid(&manifest.schema, updated_fragments)?; + schema_fragments_valid(&manifest.schema, new_fragments) } _ => Ok(()), } } -fn schema_fragments_valid( - manifest: Option<&Manifest>, - schema: &Schema, - fragments: &[Fragment], -) -> Result<()> { - if let Some(manifest) = manifest { - if manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy { - return schema_fragments_legacy_valid(schema, fragments); - } - } - // validate that each data file at least contains one field. - for fragment in fragments { - for data_file in &fragment.files { - if data_file.fields.iter().len() == 0 { - return Err(Error::invalid_input( - format!("Datafile {} does not contain any fields", data_file.path), - location!(), - )); - } - } - } - Ok(()) -} - /// Check that each fragment contains all fields in the schema. /// It is not required that the schema contains all fields in the fragment. /// There may be masked fields. -fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> { +fn schema_fragments_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> { // TODO: add additional validation. Consider consolidating with various // validate() methods in the codebase. for fragment in fragments { @@ -2775,70 +2747,6 @@ fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Res Ok(()) } -/// Validate that Merge operations preserve all original fragments. -/// Merge operations should only add columns or rows, not reduce fragments. -/// This ensures fragments correspond at one-to-one with the original fragment list. -fn merge_fragments_valid(manifest: &Manifest, new_fragments: &[Fragment]) -> Result<()> { - let original_fragments = manifest.fragments.as_ref(); - - // Additional validation: ensure we're not accidentally reducing the fragment count - if new_fragments.len() < original_fragments.len() { - return Err(Error::invalid_input( - format!( - "Merge operation reduced fragment count from {} to {}. \ - Merge operations should only add columns, not reduce fragments.", - original_fragments.len(), - new_fragments.len() - ), - location!(), - )); - } - - // Collect new fragment IDs - let new_fragment_map: HashMap = - new_fragments.iter().map(|f| (f.id, f)).collect(); - - // Check that all original fragments are preserved in the new fragments list - // Validate that each original fragment's metadata is preserved - let mut missing_fragments: Vec = Vec::new(); - for original_fragment in original_fragments { - if let Some(new_fragment) = new_fragment_map.get(&original_fragment.id) { - // Validate physical_rows (row count) hasn't changed - if original_fragment.physical_rows != new_fragment.physical_rows { - return Err(Error::invalid_input( - format!( - "Merge operation changed row count for fragment {}. \ - Original: {:?}, New: {:?}. \ - Merge operations should preserve fragment row counts and only add new columns.", - original_fragment.id, - original_fragment.physical_rows, - new_fragment.physical_rows - ), - location!(), - )); - } - } else { - missing_fragments.push(original_fragment.id); - } - } - - if !missing_fragments.is_empty() { - return Err(Error::invalid_input( - format!( - "Merge operation is missing original fragments: {:?}. \ - Merge operations should preserve all original fragments and only add new columns. \ - Expected fragments: {:?}, but got: {:?}", - missing_fragments, - original_fragments.iter().map(|f| f.id).collect::>(), - new_fragment_map.keys().copied().collect::>() - ), - location!(), - )); - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; @@ -2894,84 +2802,6 @@ mod tests { assert_eq!(final_fragments, expected_fragments); } - #[test] - fn test_merge_fragments_valid() { - use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; - use lance_core::datatypes::Schema as LanceSchema; - use lance_table::format::Manifest; - use std::sync::Arc; - - // Create a simple schema for testing - let schema = ArrowSchema::new(vec![ - ArrowField::new("id", DataType::Int32, false), - ArrowField::new("name", DataType::Utf8, false), - ]); - - // Create original fragments - let original_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)]; - - // Create a manifest with original fragments - let manifest = Manifest::new( - LanceSchema::try_from(&schema).unwrap(), - Arc::new(original_fragments), - DataStorageFormat::new(LanceFileVersion::V2_0), - None, - HashMap::new(), - ); - - // Test 1: Empty fragments should fail - let empty_fragments = vec![]; - let result = merge_fragments_valid(&manifest, &empty_fragments); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("reduced fragment count")); - - // Test 2: Missing original fragments should fail - let missing_fragments = vec![ - Fragment::new(1), - Fragment::new(2), - // Fragment 3 is missing - Fragment::new(4), // New fragment - ]; - let result = merge_fragments_valid(&manifest, &missing_fragments); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("missing original fragments")); - - // Test 3: Reduced fragment count should fail - let reduced_fragments = vec![ - Fragment::new(1), - Fragment::new(2), - // Fragment 3 is missing, no new fragments added - ]; - let result = merge_fragments_valid(&manifest, &reduced_fragments); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("reduced fragment count")); - - // Test 4: Valid merge with all original fragments plus new ones should succeed - let valid_fragments = vec![ - Fragment::new(1), - Fragment::new(2), - Fragment::new(3), - Fragment::new(4), // New fragment - Fragment::new(5), // Another new fragment - ]; - let result = merge_fragments_valid(&manifest, &valid_fragments); - assert!(result.is_ok()); - - // Test 5: Same fragments (no new ones) should succeed - let same_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)]; - let result = merge_fragments_valid(&manifest, &same_fragments); - assert!(result.is_ok()); - } - #[test] fn test_remove_tombstoned_data_files() { // Create a fragment with mixed data files: some normal, some fully tombstoned diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index abfac37d415..c42ed7e3815 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -26,7 +26,6 @@ use crate::{ }; use super::{resolve_commit_handler, WriteDestination}; -use crate::dataset::transaction::validate_operation; use lance_core::utils::tracing::{DATASET_COMMITTED_EVENT, TRACE_DATASET_EVENTS}; use tracing::info; @@ -256,14 +255,6 @@ impl<'a> CommitBuilder<'a> { }); } - // Validate the operation before proceeding with the commit - // This ensures that operations like Merge have proper validation for data integrity - if let Some(dataset) = dest.dataset() { - validate_operation(Some(&dataset.manifest), &transaction.operation)?; - } else { - validate_operation(None, &transaction.operation)?; - } - let (metadata_cache, index_cache) = match &dest { WriteDestination::Dataset(ds) => (ds.metadata_cache.clone(), ds.index_cache.clone()), WriteDestination::Uri(uri) => ( diff --git a/rust/lance/src/utils.rs b/rust/lance/src/utils.rs index 79c79c90f5d..518fcae2417 100644 --- a/rust/lance/src/utils.rs +++ b/rust/lance/src/utils.rs @@ -7,8 +7,6 @@ pub(crate) mod future; pub(crate) mod temporal; #[cfg(test)] pub(crate) mod test; -#[cfg(feature = "tensorflow")] -pub mod tfrecord; // Re-export pub use lance_datafusion::sql; diff --git a/rust/lance/src/utils/tfrecord.rs b/rust/lance/src/utils/tfrecord.rs deleted file mode 100644 index 1731076a0c1..00000000000 --- a/rust/lance/src/utils/tfrecord.rs +++ /dev/null @@ -1,792 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -//! Reading TFRecord files into Arrow data -//! -//! Use [infer_tfrecord_schema] to infer the schema of a TFRecord file, then use -//! [read_tfrecord] to read the file into an Arrow record batch stream. - -use arrow::buffer::OffsetBuffer; -use arrow_array::builder::PrimitiveBuilder; -use arrow_array::{ArrayRef, FixedSizeListArray, ListArray}; -use arrow_buffer::ArrowNativeType; -use arrow_buffer::ScalarBuffer; -use datafusion::error::DataFusionError; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{StreamExt, TryStreamExt}; -use half::{bf16, f16}; -use lance_arrow::bfloat16::BFLOAT16_EXT_NAME; -use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY}; -use prost_old::Message; -use std::collections::HashMap; -use std::sync::Arc; - -use crate::error::{Error, Result}; -use crate::io::ObjectStore; -use arrow::record_batch::RecordBatch; -use arrow_schema::{ - DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, -}; -use snafu::{location, Location}; -use tfrecord::protobuf::feature::Kind; -use tfrecord::protobuf::{DataType as TensorDataType, TensorProto}; -use tfrecord::record_reader::RecordStream; -use tfrecord::{Example, Feature}; - -trait OldProstResultExt { - fn map_prost_err(self, location: Location) -> Result; -} - -impl OldProstResultExt for std::result::Result { - fn map_prost_err(self, location: Location) -> Result { - self.map_err(|err| Error::IO { - source: Box::new(err), - location, - }) - } -} - -/// Infer the Arrow schema from a TFRecord file. -/// -/// The featured named by `tensor_features` will be assumed to be binary fields -/// containing serialized tensors (TensorProto messages). Currently only -/// fixed-shape tensors are supported. -/// -/// The features named by `string_features` will be assumed to be UTF-8 encoded -/// strings. -/// -/// `num_rows` determines the number of rows to read from the file to infer the -/// schema. If `None`, the entire file will be read. -pub async fn infer_tfrecord_schema( - uri: &str, - tensor_features: &[&str], - string_features: &[&str], - num_rows: Option, -) -> Result { - let mut columns: HashMap = HashMap::new(); - - let (store, path) = ObjectStore::from_uri(uri).await?; - // TODO: should we avoid reading the entire file into memory? - let data = store - .inner - .get(&path) - .await? - .into_stream() - .map_err(std::io::Error::other) - .into_async_read(); - let mut records = RecordStream::::from_reader(data, Default::default()); - let mut i = 0; - while let Some(record) = records.next().await { - let record = record.map_err(|err| Error::io(err.to_string(), location!()))?; - - if let Some(features) = record.features { - for (name, feature) in features.feature { - if let Some(entry) = columns.get_mut(&name) { - entry.try_update(&feature)?; - } else { - columns.insert( - name.clone(), - FeatureMeta::try_new( - &feature, - tensor_features.contains(&name.as_str()), - string_features.contains(&name.as_str()), - )?, - ); - } - } - } - - i += 1; - if let Some(num_rows) = num_rows { - if i >= num_rows { - break; - } - } - } - - let mut fields = columns - .iter() - .map(|(name, meta)| make_field(name, meta)) - .collect::>>()?; - - // To guarantee some sort of deterministic order, we sort the fields by name - fields.sort_by(|a, b| a.name().cmp(b.name())); - Ok(ArrowSchema::new(fields)) -} - -/// Read a TFRecord file into an Arrow record batch stream. -/// -/// Reads `batch_size` rows at a time. If `batch_size` is `None`, a default -/// batch size of 10,000 is used. -/// -/// The schema may be a partial schema, in which case only the fields present in -/// the schema will be read. -pub async fn read_tfrecord( - uri: &str, - schema: ArrowSchemaRef, - batch_size: Option, -) -> Result { - let batch_size = batch_size.unwrap_or(10_000); - - let (store, path) = ObjectStore::from_uri(uri).await?; - let data = store - .inner - .get(&path) - .await? - .into_stream() - .map_err(std::io::Error::other) - .into_async_read(); - let schema_ref = schema.clone(); - let batch_stream = RecordStream::::from_reader(data, Default::default()) - .try_chunks(batch_size) - .map(move |chunk| { - let chunk = chunk.map_err(|err| DataFusionError::External(Box::new(err.1)))?; - let batch = convert_batch(chunk, &schema_ref)?; - Ok(batch) - }); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - schema, - batch_stream, - ))) -} - -/// Check if a feature has more than 1 value. -fn feature_is_repeated(feature: &tfrecord::Feature) -> bool { - match feature.kind.as_ref().unwrap() { - Kind::BytesList(bytes_list) => bytes_list.value.len() > 1, - Kind::FloatList(float_list) => float_list.value.len() > 1, - Kind::Int64List(int64_list) => int64_list.value.len() > 1, - } -} - -/// Simplified representation of a features data type. -#[derive(Clone, PartialEq, Debug)] -enum FeatureType { - Integer, - Float, - Binary, - String, - Tensor { - shape: Vec, - dtype: TensorDataType, - }, -} - -/// General type information about a single feature. -struct FeatureMeta { - /// Whether the feature contains multiple values per example. Ones that do - /// will be converted to Arrow lists. Otherwise they will be primitive arrays. - repeated: bool, - feature_type: FeatureType, -} - -impl FeatureMeta { - /// Create a new FeatureMeta from a single example. - pub fn try_new(feature: &Feature, is_tensor: bool, is_string: bool) -> Result { - let feature_type = match feature.kind.as_ref().unwrap() { - Kind::BytesList(data) => { - if is_tensor { - Self::extract_tensor(data.value[0].as_slice())? - } else if is_string { - FeatureType::String - } else { - FeatureType::Binary - } - } - Kind::FloatList(_) => FeatureType::Float, - Kind::Int64List(_) => FeatureType::Integer, - }; - Ok(Self { - repeated: feature_is_repeated(feature), - feature_type, - }) - } - - /// Update the FeatureMeta with a new example, or return an error if the - /// example is inconsistent with the existing FeatureMeta. - pub fn try_update(&mut self, feature: &Feature) -> Result<()> { - let feature_type = match feature.kind.as_ref().unwrap() { - Kind::BytesList(data) => match self.feature_type { - FeatureType::String => FeatureType::String, - FeatureType::Binary => FeatureType::Binary, - FeatureType::Tensor { .. } => Self::extract_tensor(data.value[0].as_slice())?, - _ => { - return Err(Error::io( - format!( - "Data type mismatch: expected {:?}, got {:?}", - self.feature_type, - feature.kind.as_ref().unwrap() - ), - location!(), - )) - } - }, - Kind::FloatList(_) => FeatureType::Float, - Kind::Int64List(_) => FeatureType::Integer, - }; - if self.feature_type != feature_type { - return Err(Error::io( - format!("inconsistent feature type for field {:?}", feature_type), - location!(), - )); - } - if feature_is_repeated(feature) { - self.repeated = true; - } - Ok(()) - } - - fn extract_tensor(data: &[u8]) -> Result { - let tensor_proto = TensorProto::decode(data).map_prost_err(location!())?; - Ok(FeatureType::Tensor { - shape: tensor_proto - .tensor_shape - .as_ref() - .unwrap() - .dim - .iter() - .map(|d| d.size) - .collect(), - dtype: tensor_proto.dtype(), - }) - } -} - -/// Metadata for a fixed-shape tensor. -#[derive(serde::Serialize)] -struct ArrowTensorMetadata { - shape: Vec, -} - -fn tensor_dtype_to_arrow(tensor_dtype: &TensorDataType) -> Result { - Ok(match tensor_dtype { - TensorDataType::DtBfloat16 => DataType::FixedSizeBinary(2), - TensorDataType::DtHalf => DataType::Float16, - TensorDataType::DtFloat => DataType::Float32, - TensorDataType::DtDouble => DataType::Float64, - _ => { - return Err(Error::io( - format!("unsupported tensor data type {:?}", tensor_dtype), - location!(), - )); - } - }) -} - -fn make_field(name: &str, feature_meta: &FeatureMeta) -> Result { - let data_type = match &feature_meta.feature_type { - FeatureType::Integer => DataType::Int64, - FeatureType::Float => DataType::Float32, - FeatureType::Binary => DataType::Binary, - FeatureType::String => DataType::Utf8, - FeatureType::Tensor { shape, dtype } => { - let list_size = shape.iter().map(|x| *x as i32).product(); - let inner_type = tensor_dtype_to_arrow(dtype)?; - - let inner_meta = match dtype { - TensorDataType::DtBfloat16 => Some( - [(ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME)] - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect::>(), - ), - _ => None, - }; - let mut inner_field = ArrowField::new("item", inner_type, true); - if let Some(metadata) = inner_meta { - inner_field.set_metadata(metadata); - } - - DataType::FixedSizeList(Arc::new(inner_field), list_size) - } - }; - - // This metadata marks the field as a tensor column, which PyArrow can - // recognize. - let metadata = match &feature_meta.feature_type { - FeatureType::Tensor { shape, dtype: _ } => { - let mut metadata = HashMap::new(); - let tensor_metadata = ArrowTensorMetadata { - shape: shape.clone(), - }; - metadata.insert( - ARROW_EXT_NAME_KEY.to_string(), - "arrow.fixed_shape_tensor".to_string(), - ); - metadata.insert( - ARROW_EXT_META_KEY.to_string(), - serde_json::to_string(&tensor_metadata)?, - ); - Some(metadata) - } - _ => None, - }; - - let mut field = if feature_meta.repeated { - ArrowField::new("item", data_type, true) - } else { - ArrowField::new(name, data_type, true) - }; - if let Some(metadata) = metadata { - field.set_metadata(metadata); - } - - let field = if feature_meta.repeated { - ArrowField::new(name, DataType::List(Arc::new(field)), true) - } else { - field - }; - - Ok(field) -} - -/// Convert a vector of TFRecord examples into an Arrow record batch. -fn convert_batch(records: Vec, schema: &ArrowSchema) -> Result { - // TODO: do this in parallel? - let columns = schema - .fields - .iter() - .map(|field| convert_column(&records, field)) - .collect::>>()?; - - let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?; - Ok(batch) -} - -/// Convert a single column of TFRecord examples into an Arrow array. -fn convert_column(records: &[Example], field: &ArrowField) -> Result { - let type_info = parse_type(field.data_type()); - // Make leaf type - let (mut column, offsets) = convert_leaf(records, field.name(), &type_info)?; - - if let Some(fsl_size) = &type_info.fsl_size { - let mut field = ArrowField::new("item", type_info.leaf_type.clone(), true); - if matches!(&type_info.leaf_type, DataType::FixedSizeBinary(2)) { - field.set_metadata( - [ - ( - ARROW_EXT_NAME_KEY.to_string(), - BFLOAT16_EXT_NAME.to_string(), - ), - (ARROW_EXT_META_KEY.to_string(), "".to_string()), - ] - .into_iter() - .collect(), - ); - } - // Wrap in a FSL - column = Arc::new(FixedSizeListArray::try_new( - Arc::new(field), - *fsl_size, - column, - None, - )?); - } - - if type_info.in_list { - column = Arc::new(ListArray::try_new( - Arc::new(ArrowField::new("item", column.data_type().clone(), true)), - offsets.unwrap(), - column, - None, - )?); - } - - Ok(column) -} - -/// Representation of a field in the TFRecord file. It can be a leaf type, a -/// tensor, or a list of either. -struct TypeInfo { - leaf_type: DataType, - fsl_size: Option, - in_list: bool, -} - -fn parse_type(data_type: &DataType) -> TypeInfo { - match data_type { - DataType::FixedSizeList(inner_field, list_size) => { - let inner_type = parse_type(inner_field.data_type()); - TypeInfo { - leaf_type: inner_type.leaf_type, - fsl_size: Some(*list_size), - in_list: false, - } - } - DataType::List(inner_field) => { - let inner_type = parse_type(inner_field.data_type()); - TypeInfo { - leaf_type: inner_type.leaf_type, - fsl_size: inner_type.fsl_size, - in_list: true, - } - } - _ => TypeInfo { - leaf_type: data_type.clone(), - fsl_size: None, - in_list: false, - }, - } -} - -fn convert_leaf( - records: &[Example], - name: &str, - type_info: &TypeInfo, -) -> Result<(ArrayRef, Option>)> { - use arrow::array::*; - let features: Vec> = records - .iter() - .map(|record| { - let features = record.features.as_ref().unwrap(); - features.feature.get(name) - }) - .collect(); - let (values, offsets): (ArrayRef, Option>) = match type_info { - // First, the Non-tensor leaf types - TypeInfo { - leaf_type: DataType::Int64, - fsl_size: None, - in_list, - } => { - let mut values = Int64Builder::with_capacity(features.len()); - for feature in features.iter() { - if let Some(Feature { - kind: Some(Kind::Int64List(list)), - }) = feature - { - values.append_slice(&list.value); - } else if !type_info.in_list { - values.append_null(); - } - } - let offsets = if *in_list { - Some(compute_offsets(&features, type_info)) - } else { - None - }; - (Arc::new(values.finish()), offsets) - } - TypeInfo { - leaf_type: DataType::Float32, - fsl_size: None, - in_list, - } => { - let mut values = Float32Builder::with_capacity(features.len()); - for feature in features.iter() { - if let Some(Feature { - kind: Some(Kind::FloatList(list)), - }) = feature - { - values.append_slice(&list.value); - } else if !type_info.in_list { - values.append_null(); - } - } - let offsets = if *in_list { - Some(compute_offsets(&features, type_info)) - } else { - None - }; - (Arc::new(values.finish()), offsets) - } - TypeInfo { - leaf_type: DataType::Binary, - fsl_size: None, - in_list, - } => { - let mut values = BinaryBuilder::with_capacity(features.len(), 1024); - for feature in features.iter() { - if let Some(Feature { - kind: Some(Kind::BytesList(list)), - }) = feature - { - for value in &list.value { - values.append_value(value); - } - } else if !type_info.in_list { - values.append_null(); - } - } - let offsets = if *in_list { - Some(compute_offsets(&features, type_info)) - } else { - None - }; - (Arc::new(values.finish()), offsets) - } - TypeInfo { - leaf_type: DataType::Utf8, - fsl_size: None, - in_list, - } => { - let mut values = StringBuilder::with_capacity(features.len(), 1024); - for feature in features.iter() { - if let Some(Feature { - kind: Some(Kind::BytesList(list)), - }) = feature - { - for value in &list.value { - values.append_value(String::from_utf8_lossy(value)); - } - } else if !type_info.in_list { - values.append_null(); - } - } - let offsets = if *in_list { - Some(compute_offsets(&features, type_info)) - } else { - None - }; - (Arc::new(values.finish()), offsets) - } - // Now, handle tensors - TypeInfo { - fsl_size: Some(_), .. - } => convert_fixedshape_tensor(&features, type_info)?, - _ => Err(Error::io( - format!("unsupported type {:?}", type_info.leaf_type), - location!(), - ))?, - }; - - Ok((values, offsets)) -} - -fn compute_offsets(features: &[Option<&Feature>], type_info: &TypeInfo) -> OffsetBuffer { - let mut offsets: Vec = Vec::with_capacity(features.len() + 1); - offsets.push(0); - - let mut current = 0; - for feature in features.iter() { - if let Some(feature) = feature { - match ( - type_info.fsl_size.is_some(), - &type_info.leaf_type, - feature.kind.as_ref().unwrap(), - ) { - (true, _, Kind::BytesList(list)) => { - current += list.value.len() as i32; - } - (false, DataType::Binary, Kind::BytesList(list)) => { - current += list.value.len() as i32; - } - (false, DataType::Utf8, Kind::BytesList(list)) => { - current += list.value.len() as i32; - } - (false, DataType::Float32, Kind::FloatList(list)) => { - current += list.value.len() as i32; - } - (false, DataType::Int64, Kind::Int64List(list)) => { - current += list.value.len() as i32; - } - _ => {} // Ignore mismatched types - } - } - offsets.push(current); - } - - OffsetBuffer::new(ScalarBuffer::from(offsets)) -} - -// /// Convert TensorProto message into an element of a FixedShapeTensor array and -// /// append it to the builder. -// /// -// /// TensorProto definition: -// /// https://github.com/tensorflow/tensorboard/blob/master/tensorboard/compat/proto/tensor.proto -// /// -// /// FixedShapeTensor definition: -// /// https://arrow.apache.org/docs/format/CanonicalExtensions.html#fixed-shape-tensor -fn convert_fixedshape_tensor( - features: &[Option<&Feature>], - type_info: &TypeInfo, -) -> Result<(ArrayRef, Option>)> { - use arrow::array::*; - let tensor_iter = features.iter().map(|maybe_feature| { - if let Some(feature) = maybe_feature { - if let Some(Kind::BytesList(list)) = &feature.kind { - list.value - .iter() - .map(|val| TensorProto::decode(val.as_slice())) - .collect::, _>>() - .map(Some) - } else { - Ok(None) - } - } else { - Ok(None) - } - }); - - let offsets = if type_info.in_list { - Some(compute_offsets(features, type_info)) - } else { - None - }; - - let list_size = type_info.fsl_size.unwrap() as usize; - - let values: ArrayRef = match type_info.leaf_type { - DataType::Float16 => { - let mut values = Float16Builder::with_capacity(features.len()); - for tensors in tensor_iter { - if let Some(tensors) = tensors.map_prost_err(location!())? { - for tensor in tensors { - validate_tensor(&tensor, type_info)?; - if tensor.half_val.is_empty() { - append_primitive_from_slice( - &mut values, - tensor.tensor_content.as_slice(), - |bytes| f16::from_le_bytes(bytes.try_into().unwrap()), - ) - } else { - // The individual values have padding (they are stored as i32) - // because protobuf has no 2-byte type - for value in tensor.half_val { - values.append_value(f16::from_bits(value as u16)); - } - } - } - } else { - values.append_nulls(list_size); - } - } - Arc::new(values.finish()) - } - // BFloat16 - DataType::FixedSizeBinary(2) => { - let mut values = FixedSizeBinaryBuilder::with_capacity(features.len(), 2); - - for tensors in tensor_iter { - if let Some(tensors) = tensors.map_prost_err(location!())? { - for tensor in tensors { - validate_tensor(&tensor, type_info)?; - if tensor.half_val.is_empty() { - // Just directly move the bytes - for bytes in tensor.tensor_content.as_slice().chunks_exact(2) { - values.append_value(bytes)?; - } - } else { - // The individual values have padding (they are stored as i32) - // because protobuf has no 2-byte type - for value in tensor.half_val { - let bf16_value = bf16::from_bits(value as u16); - values.append_value(bf16_value.to_le_bytes())?; - } - } - } - } else { - for _ in 0..list_size { - values.append_null(); - } - } - } - Arc::new(values.finish()) - } - DataType::Float32 => { - let mut values = Float32Builder::with_capacity(features.len()); - for tensors in tensor_iter { - if let Some(tensors) = tensors.map_prost_err(location!())? { - for tensor in tensors { - validate_tensor(&tensor, type_info)?; - if tensor.float_val.is_empty() { - append_primitive_from_slice( - &mut values, - tensor.tensor_content.as_slice(), - |bytes| f32::from_le_bytes(bytes.try_into().unwrap()), - ) - } else { - values.append_slice(tensor.float_val.as_slice()); - } - } - } else { - values.append_nulls(list_size); - } - } - Arc::new(values.finish()) - } - DataType::Float64 => { - let mut values = Float64Builder::with_capacity(features.len()); - for tensors in tensor_iter { - if let Some(tensors) = tensors.map_prost_err(location!())? { - for tensor in tensors { - validate_tensor(&tensor, type_info)?; - if tensor.float_val.is_empty() { - append_primitive_from_slice( - &mut values, - tensor.tensor_content.as_slice(), - |bytes| f64::from_le_bytes(bytes.try_into().unwrap()), - ) - } else { - values.append_slice(tensor.double_val.as_slice()) - }; - } - } else { - values.append_nulls(list_size); - } - } - Arc::new(values.finish()) - } - _ => Err(Error::io( - format!("unsupported type {:?}", type_info.leaf_type), - location!(), - ))?, - }; - - Ok((values, offsets)) -} - -fn validate_tensor(tensor: &TensorProto, type_info: &TypeInfo) -> Result<()> { - let tensor_shape = tensor.tensor_shape.as_ref().unwrap(); - let length = tensor_shape.dim.iter().map(|d| d.size as i32).product(); - if type_info.fsl_size != Some(length) { - return Err(Error::io( - format!( - "tensor length mismatch: expected {}, got {}", - type_info.fsl_size.unwrap(), - length - ), - location!(), - )); - } - - let data_type = tensor_dtype_to_arrow(&tensor.dtype())?; - if data_type != type_info.leaf_type { - return Err(Error::io( - format!( - "tensor type mismatch: expected {:?}, got {:?}", - type_info.leaf_type, - tensor.dtype() - ), - location!(), - )); - } - - Ok(()) -} - -/// Given a potentially unaligned slice, append the slice to the builder. -fn append_primitive_from_slice( - builder: &mut PrimitiveBuilder, - slice: &[u8], - parse_val: impl Fn(&[u8]) -> T::Native, -) where - T: arrow::datatypes::ArrowPrimitiveType, -{ - // Safety: we are trusting that the data in the buffer are valid for the - // datatype T::Native, as claimed by the file. There isn't anywhere for - // TensorProto to tell us the original endianness, so it's possible there - // could be a mismatch here. - let (prefix, middle, suffix) = unsafe { slice.align_to::() }; - for val in prefix.chunks_exact(T::Native::get_byte_width()) { - builder.append_value(parse_val(val)); - } - - builder.append_slice(middle); - - for val in suffix.chunks_exact(T::Native::get_byte_width()) { - builder.append_value(parse_val(val)); - } -} From 50f566fc6686ea0f161d93ca935be621d5ae88fc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 17:31:29 +0800 Subject: [PATCH 2/7] cleanup Signed-off-by: Xuanwo --- .github/workflows/rust.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 10ec656c6d6..db5655d15af 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -174,13 +174,13 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Build tests run: | - cargo test --profile ci --locked --features fp16kernels,cli,tensorflow,dynamodb,substrait --no-run + cargo test --profile ci --locked --features fp16kernels,cli,dynamodb,substrait --no-run - name: Run tests run: | - cargo test --profile ci --features fp16kernels,cli,tensorflow,dynamodb,substrait + cargo test --profile ci --features fp16kernels,cli,dynamodb,substrait - name: Check benchmarks run: | - cargo check --profile ci --benches --features fp16kernels,cli,tensorflow,dynamodb,substrait + cargo check --profile ci --benches --features fp16kernels,cli,dynamodb,substrait windows-build: runs-on: windows-latest defaults: From fa233a24ac84ae1fbec5ebd29e1fd8beddbd26ea Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 17:42:39 +0800 Subject: [PATCH 3/7] revert unexpected changes Signed-off-by: Xuanwo --- python/src/executor.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/src/executor.rs b/python/src/executor.rs index 26d2236e409..25a983c80ce 100644 --- a/python/src/executor.rs +++ b/python/src/executor.rs @@ -102,6 +102,26 @@ impl BackgroundExecutor { } } + /// Spawn a task in the background. + pub fn spawn_background(&self, py: Option>, task: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + if let Some(py) = py { + py.allow_threads(|| { + self.runtime.spawn(task); + }) + } else { + // Python::with_gil is a no-op if the GIL is already held by the thread. + Python::with_gil(|py| { + py.allow_threads(|| { + self.runtime.spawn(task); + }) + }) + } + } + /// Block on a future and wait for it to complete. /// /// This helper method also frees the GIL before blocking. From b03ab19def7c4f9a0d5b26540d53b422e5485c18 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 18:44:36 +0800 Subject: [PATCH 4/7] revert not related changes Signed-off-by: Xuanwo --- deny.toml | 1 + python/src/executor.rs | 2 +- rust/lance-core/src/utils/testing.rs | 2 -- rust/lance/src/dataset/transaction.rs | 7 +++---- rust/lance/src/dataset/write/commit.rs | 4 ++-- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/deny.toml b/deny.toml index e192267b97b..4c67b0767c9 100644 --- a/deny.toml +++ b/deny.toml @@ -83,6 +83,7 @@ ignore = [ { id = "RUSTSEC-2021-0153", reason = "`encoding` is used by lindera" }, { id = "RUSTSEC-2024-0370", reason = "`proc-macro-error` is used by jieba-rs via include-flate" }, { id = "RUSTSEC-2024-0436", reason = "`paste` is used by datafusion" }, + { id = "RUSTSEC-2025-0052", reason = "`async-std` is used by tfrecord" }, { id = "RUSTSEC-2023-0071", reason = "`rsa` is used by opendal via reqsign" }, { id = "RUSTSEC-2025-0119", reason = "`number_prefix` used by hf-hub in examples" } ] diff --git a/python/src/executor.rs b/python/src/executor.rs index 25a983c80ce..6e446bef377 100644 --- a/python/src/executor.rs +++ b/python/src/executor.rs @@ -102,7 +102,7 @@ impl BackgroundExecutor { } } - /// Spawn a task in the background. + /// Spawn a task in the background pub fn spawn_background(&self, py: Option>, task: T) where T: Future + Send + 'static, diff --git a/rust/lance-core/src/utils/testing.rs b/rust/lance-core/src/utils/testing.rs index 3b81b9de785..f19d66f67e2 100644 --- a/rust/lance-core/src/utils/testing.rs +++ b/rust/lance-core/src/utils/testing.rs @@ -3,8 +3,6 @@ //! Testing utilities -#![allow(deprecated)] // For PutMultipartOpts which is deprecated but PutMultipartOpts doesn't exist yet - use crate::Result; use async_trait::async_trait; use bytes::Bytes; diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index c89c1a36c65..364b5fe800b 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -205,6 +205,8 @@ pub enum Operation { replacements: Vec, }, /// Merge a new column in + /// 'fragments' is the final fragments include all data files, the new fragments must align with old ones at rows. + /// 'schema' is not forced to include existed columns, which means we could use Merge to drop column data Merge { fragments: Vec, schema: Schema, @@ -3348,10 +3350,7 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> ( None, Operation::Overwrite { - fragments, - schema, - config_upsert_values: None, - initial_bases: _, + fragments, schema, .. }, ) => { // Validate here because we are going to return early. diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 21a11248ba5..ba7bf04a5df 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -260,8 +260,8 @@ impl<'a> CommitBuilder<'a> { }); } - // Validate the operation before proceeding with the commit to ensure - // manifest invariants stay intact (e.g., Merge validation). + // Validate the operation before proceeding with the commit + // This ensures that operations like Merge have proper validation for data integrity if let Some(dataset) = dest.dataset() { validate_operation(Some(&dataset.manifest), &transaction.operation)?; } else { From a7632d9258ba846eeb3277414643b589e8e3a816 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 18:45:49 +0800 Subject: [PATCH 5/7] Revert utils Signed-off-by: Xuanwo --- rust/lance/src/utils.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/utils.rs b/rust/lance/src/utils.rs index 518fcae2417..4ae847cfb9c 100644 --- a/rust/lance/src/utils.rs +++ b/rust/lance/src/utils.rs @@ -7,7 +7,5 @@ pub(crate) mod future; pub(crate) mod temporal; #[cfg(test)] pub(crate) mod test; - -// Re-export -pub use lance_datafusion::sql; -pub use lance_index::vector::kmeans; +#[cfg(feature = "tensorflow")] +pub mod tfrecord; From e3c9c894352886d878ea9034b093c6f84ee6ce46 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 19:27:33 +0800 Subject: [PATCH 6/7] Address utils Signed-off-by: Xuanwo --- rust/lance/src/utils.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/lance/src/utils.rs b/rust/lance/src/utils.rs index 4ae847cfb9c..b2997d58e63 100644 --- a/rust/lance/src/utils.rs +++ b/rust/lance/src/utils.rs @@ -7,5 +7,3 @@ pub(crate) mod future; pub(crate) mod temporal; #[cfg(test)] pub(crate) mod test; -#[cfg(feature = "tensorflow")] -pub mod tfrecord; From 76239b71ab4e4add676426472ab8ee3b717dafaf Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 19 Nov 2025 19:28:12 +0800 Subject: [PATCH 7/7] Fix ci Signed-off-by: Xuanwo --- deny.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/deny.toml b/deny.toml index 4c67b0767c9..e192267b97b 100644 --- a/deny.toml +++ b/deny.toml @@ -83,7 +83,6 @@ ignore = [ { id = "RUSTSEC-2021-0153", reason = "`encoding` is used by lindera" }, { id = "RUSTSEC-2024-0370", reason = "`proc-macro-error` is used by jieba-rs via include-flate" }, { id = "RUSTSEC-2024-0436", reason = "`paste` is used by datafusion" }, - { id = "RUSTSEC-2025-0052", reason = "`async-std` is used by tfrecord" }, { id = "RUSTSEC-2023-0071", reason = "`rsa` is used by opendal via reqsign" }, { id = "RUSTSEC-2025-0119", reason = "`number_prefix` used by hf-hub in examples" } ]