diff --git a/Cargo.toml b/Cargo.toml index 54f2f203fcdcb..ffddbb6c095e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,20 +64,28 @@ version = "38.0.0" ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -arrow = { version = "51.0.0", features = ["prettyprint"] } -arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] } -arrow-buffer = { version = "51.0.0", default-features = false } -arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] } -arrow-ord = { version = "51.0.0", default-features = false } -arrow-schema = { version = "51.0.0", default-features = false } -arrow-string = { version = "51.0.0", default-features = false } +arrow = { version = "52.0.0", features = [ + "prettyprint", +] } +arrow-array = { version = "52.0.0", default-features = false, features = [ + "chrono-tz", +] } +arrow-buffer = { version = "52.0.0", default-features = false } +arrow-flight = { version = "52.0.0", features = [ + "flight-sql-experimental", +] } +arrow-ipc = { version = "52.0.0", default-features = false, features = [ + "lz4", +] } +arrow-ord = { version = "52.0.0", default-features = false } +arrow-schema = { version = "52.0.0", default-features = false } +arrow-string = { version = "52.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" chrono = { version = "0.4.34", default-features = false } ctor = "0.2.0" -dashmap = "5.4.0" +dashmap = "5.5.0" datafusion = { path = "datafusion/core", version = "38.0.0", default-features = false } datafusion-common = { path = "datafusion/common", version = "38.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "38.0.0" } @@ -104,9 +112,13 @@ indexmap = "2.0.0" itertools = "0.12" log = "^0.4" num_cpus = "1.13.0" -object_store = { version = "0.9.1", default-features = false } +object_store = { version = "0.10.1", default-features = false } parking_lot = "0.12" -parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } +parquet = { version = "52.0.0", default-features = false, features = [ + "arrow", + "async", + "object_store", +] } rand = "0.8" regex = "1.8" rstest = "0.21.0" diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index b165070c60605..566b90b45478d 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -130,9 +130,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" +checksum = "7ae9728f104939be6d8d9b368a354b4929b0569160ea1641f0721b55a861ce38" dependencies = [ "arrow-arith", "arrow-array", @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" +checksum = "a7029a5b3efbeafbf4a12d12dc16b8f9e9bff20a410b8c25c5d28acc089e1043" dependencies = [ "arrow-array", "arrow-buffer", @@ -166,9 +166,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" +checksum = "d33238427c60271710695f17742f45b1a5dc5bcfc5c15331c25ddfe7abf70d97" dependencies = [ "ahash", "arrow-buffer", @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" +checksum = "fe9b95e825ae838efaf77e366c00d3fc8cca78134c9db497d6bda425f2e7b7c1" dependencies = [ "bytes", "half", @@ -194,9 +194,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" +checksum = "87cf8385a9d5b5fcde771661dd07652b79b9139fea66193eda6a88664400ccab" dependencies = [ "arrow-array", "arrow-buffer", @@ -215,9 +215,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" +checksum = "cea5068bef430a86690059665e40034625ec323ffa4dd21972048eebb0127adc" dependencies = [ "arrow-array", "arrow-buffer", @@ -234,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" +checksum = "cb29be98f987bcf217b070512bb7afba2f65180858bca462edf4a39d84a23e10" dependencies = [ "arrow-buffer", "arrow-schema", @@ -246,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" +checksum = "ffc68f6523970aa6f7ce1dc9a33a7d9284cfb9af77d4ad3e617dbe5d79cc6ec8" dependencies = [ "arrow-array", "arrow-buffer", @@ -261,9 +261,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" +checksum = "2041380f94bd6437ab648e6c2085a045e45a0c44f91a1b9a4fe3fed3d379bfb1" dependencies = [ "arrow-array", "arrow-buffer", @@ -281,9 +281,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" +checksum = "fcb56ed1547004e12203652f12fe12e824161ff9d1e5cf2a7dc4ff02ba94f413" dependencies = [ "arrow-array", "arrow-buffer", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" +checksum = "575b42f1fc588f2da6977b94a5ca565459f5ab07b60545e17243fb9a7ed6d43e" dependencies = [ "ahash", "arrow-array", @@ -311,15 +311,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" +checksum = "32aae6a60458a2389c0da89c9de0b7932427776127da1a738e2efc21d32f3393" [[package]] name = "arrow-select" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" +checksum = "de36abaef8767b4220d7b4a8c2fe5ffc78b47db81b03d77e2136091c3ba39102" dependencies = [ "ahash", "arrow-array", @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" dependencies = [ "arrow-array", "arrow-buffer", @@ -375,8 +375,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.1", - "zstd-safe 7.1.0", + "zstd 0.13.0", + "zstd-safe 7.0.0", ] [[package]] @@ -399,6 +399,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -436,8 +442,8 @@ dependencies = [ "bytes", "fastrand 1.9.0", "hex", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "ring 0.16.20", "time", "tokio", @@ -469,7 +475,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "http", + "http 0.2.12", "regex", "tracing", ] @@ -485,8 +491,8 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "lazy_static", "percent-encoding", "pin-project-lite", @@ -511,7 +517,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.12", "regex", "tokio-stream", "tower", @@ -538,7 +544,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http", + "http 0.2.12", "regex", "tower", "tracing", @@ -554,7 +560,7 @@ dependencies = [ "aws-sigv4", "aws-smithy-http", "aws-types", - "http", + "http 0.2.12", "tracing", ] @@ -568,7 +574,7 @@ dependencies = [ "form_urlencoded", "hex", "hmac", - "http", + "http 0.2.12", "once_cell", "percent-encoding", "regex", @@ -601,9 +607,9 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand 1.9.0", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", "hyper-rustls 0.23.2", "lazy_static", "pin-project-lite", @@ -623,9 +629,9 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.29", "once_cell", "percent-encoding", "pin-project-lite", @@ -642,8 +648,8 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "bytes", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "pin-project-lite", "tower", "tracing", @@ -701,7 +707,7 @@ dependencies = [ "aws-smithy-client", "aws-smithy-http", "aws-smithy-types", - "http", + "http 0.2.12", "rustc_version", "tracing", ] @@ -788,9 +794,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.5.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d640d25bc63c50fb1f0b545ffd80207d2e10a4c965530809b40ba3386825c391" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -799,9 +805,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.5.1" +version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -899,9 +905,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.6" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" dependencies = [ "chrono", "chrono-tz-build", @@ -910,9 +916,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" dependencies = [ "parse-zoneinfo", "phf", @@ -1166,7 +1172,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.13.1", + "zstd 0.13.0", ] [[package]] @@ -1250,6 +1256,7 @@ dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", "chrono", "datafusion-common", "paste", @@ -1500,15 +1507,6 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "endian-type" version = "0.1.2" @@ -1588,9 +1586,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "23.5.26" +version = "24.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1769,7 +1767,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", "indexmap 2.2.6", "slab", "tokio", @@ -1857,6 +1874,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1864,7 +1892,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1896,9 +1947,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1910,33 +1961,76 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.29", "log", "rustls 0.20.9", - "rustls-native-certs", + "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.23.4", ] [[package]] name = "hyper-rustls" -version = "0.24.2" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http", - "hyper", - "rustls 0.21.12", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.4", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", "tokio", - "tokio-rustls 0.24.1", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2395,17 +2489,17 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", - "base64 0.21.7", + "base64 0.22.1", "bytes", "chrono", "futures", "humantime", - "hyper", + "hyper 1.3.1", "itertools", "md-5", "parking_lot", @@ -2482,9 +2576,9 @@ dependencies = [ [[package]] name = "parquet" -version = "51.0.0" +version = "52.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "096795d4f47f65fd3ee1ec5a98b77ab26d602f2cc785b0e4be5443add17ecc32" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" dependencies = [ "ahash", "arrow-array", @@ -2512,7 +2606,8 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd 0.13.1", + "zstd 0.13.0", + "zstd-sys", ] [[package]] @@ -2819,20 +2914,21 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" -version = "0.11.27" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls 0.24.2", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-util", "ipnet", "js-sys", "log", @@ -2840,16 +2936,16 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.12", - "rustls-native-certs", - "rustls-pemfile 1.0.4", + "rustls 0.22.4", + "rustls-native-certs 0.7.0", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", "tokio-util", "tower-service", "url", @@ -2964,14 +3060,16 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", "ring 0.17.8", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] @@ -2986,6 +3084,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -3013,11 +3124,12 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "ring 0.17.8", + "rustls-pki-types", "untrusted 0.9.0", ] @@ -3373,27 +3485,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "system-configuration" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" -dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tempfile" version = "3.10.1" @@ -3555,11 +3646,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.21.12", + "rustls 0.22.4", + "rustls-pki-types", "tokio", ] @@ -4093,9 +4185,9 @@ checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winreg" -version = "0.50.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" dependencies = [ "cfg-if", "windows-sys 0.48.0", @@ -4153,11 +4245,11 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" dependencies = [ - "zstd-safe 7.1.0", + "zstd-safe 7.0.0", ] [[package]] @@ -4172,18 +4264,18 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.1.0" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.10+zstd.1.5.6" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", "pkg-config", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 4e3d800cfe978..5578d7fe5839e 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -30,7 +30,7 @@ rust-version = "1.73" readme = "README.md" [dependencies] -arrow = "51.0.0" +arrow = { version = "52.0.0" } async-trait = "0.1.41" aws-config = "0.55" aws-credential-types = "0.55" @@ -49,9 +49,9 @@ dirs = "4.0.0" env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", default-features = false } -object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] } +object_store = { version = "0.10.1", features = ["aws", "gcp", "http"] } parking_lot = { version = "0.12" } -parquet = { version = "51.0.0", default-features = false } +parquet = { version = "52.0.0", default-features = false } regex = "1.8" rustyline = "11.0" tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] } diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index e32fb9b096302..7bc431c5c5eef 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { {"num":2,"str":"hello"} {"num":4,"str":"foo"}"#, ); - object_store.put(&path, data).await.unwrap(); + object_store.put(&path, data.into()).await.unwrap(); let schema = Arc::new(Schema::new(vec![ Field::new("num", DataType::Int64, false), diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 7085732b562e9..62ea85a4a33d7 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -59,7 +59,7 @@ libc = "0.2.140" num_cpus = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } -pyo3 = { version = "0.20.0", optional = true } +pyo3 = { version = "0.21.0", optional = true } sqlparser = { workspace = true } [target.'cfg(target_family = "wasm")'.dependencies] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 9819fc7b344d7..7eecdec8abef5 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -24,6 +24,8 @@ use arrow::array::*; use arrow::datatypes::*; use arrow::row::Rows; use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use arrow_buffer::IntervalDayTime; +use arrow_buffer::IntervalMonthDayNano; use crate::cast::{ as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, @@ -72,7 +74,7 @@ macro_rules! hash_value { }; } hash_value!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64); -hash_value!(bool, str, [u8]); +hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float_value { ($(($t:ty, $i:ty)),+) => { diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index f4356477532f4..87254a499fb11 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -22,8 +22,8 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use arrow_array::Array; use pyo3::exceptions::PyException; use pyo3::prelude::PyErr; -use pyo3::types::PyList; -use pyo3::{FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python}; +use pyo3::types::{PyAnyMethods, PyList}; +use pyo3::{Bound, FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python}; use crate::{DataFusionError, ScalarValue}; @@ -34,18 +34,18 @@ impl From for PyErr { } impl FromPyArrow for ScalarValue { - fn from_pyarrow(value: &PyAny) -> PyResult { + fn from_pyarrow_bound(value: &pyo3::Bound<'_, pyo3::PyAny>) -> PyResult { let py = value.py(); let typ = value.getattr("type")?; let val = value.call_method0("as_py")?; // construct pyarrow array from the python value and pyarrow type - let factory = py.import("pyarrow")?.getattr("array")?; - let args = PyList::new(py, [val]); + let factory = py.import_bound("pyarrow")?.getattr("array")?; + let args = PyList::new_bound(py, [val]); let array = factory.call1((args, typ))?; // convert the pyarrow array to rust array using C data interface - let array = arrow::array::make_array(ArrayData::from_pyarrow(array)?); + let array = arrow::array::make_array(ArrayData::from_pyarrow_bound(&array)?); let scalar = ScalarValue::try_from_array(&array, 0)?; Ok(scalar) @@ -64,8 +64,8 @@ impl ToPyArrow for ScalarValue { } impl<'source> FromPyObject<'source> for ScalarValue { - fn extract(value: &'source PyAny) -> PyResult { - Self::from_pyarrow(value) + fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult { + Self::from_pyarrow_bound(value) } } @@ -86,19 +86,19 @@ mod tests { fn init_python() { prepare_freethreaded_python(); Python::with_gil(|py| { - if py.run("import pyarrow", None, None).is_err() { - let locals = PyDict::new(py); - py.run( + if py.run_bound("import pyarrow", None, None).is_err() { + let locals = PyDict::new_bound(py); + py.run_bound( "import sys; executable = sys.executable; python_path = sys.path", None, - Some(locals), + Some(&locals), ) .expect("Couldn't get python info"); - let executable = locals.get_item("executable").unwrap().unwrap(); + let executable = locals.get_item("executable").unwrap(); let executable: String = executable.extract().unwrap(); - let python_path = locals.get_item("python_path").unwrap().unwrap(); - let python_path: Vec<&str> = python_path.extract().unwrap(); + let python_path = locals.get_item("python_path").unwrap(); + let python_path: Vec = python_path.extract().unwrap(); panic!("pyarrow not found\nExecutable: {executable}\nPython path: {python_path:?}\n\ HINT: try `pip install pyarrow`\n\ @@ -125,9 +125,10 @@ mod tests { Python::with_gil(|py| { for scalar in example_scalars.iter() { - let result = - ScalarValue::from_pyarrow(scalar.to_pyarrow(py).unwrap().as_ref(py)) - .unwrap(); + let result = ScalarValue::from_pyarrow_bound( + scalar.to_pyarrow(py).unwrap().bind(py), + ) + .unwrap(); assert_eq!(scalar, &result); } }); diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index ba006247cd708..8073b21cdde0d 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -52,7 +52,7 @@ use arrow::{ UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION, }, }; -use arrow_buffer::Buffer; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, ScalarBuffer}; use arrow_schema::{UnionFields, UnionMode}; use half::f16; @@ -266,11 +266,11 @@ pub enum ScalarValue { IntervalYearMonth(Option), /// Number of elapsed days and milliseconds (no leap seconds) /// stored as 2 contiguous 32-bit signed integers - IntervalDayTime(Option), + IntervalDayTime(Option), /// A triple of the number of elapsed months, days, and nanoseconds. /// Months and days are encoded as 32-bit signed integers. /// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds). - IntervalMonthDayNano(Option), + IntervalMonthDayNano(Option), /// Duration in seconds DurationSecond(Option), /// Duration in milliseconds @@ -988,10 +988,10 @@ impl ScalarValue { ScalarValue::IntervalYearMonth(Some(0)) } DataType::Interval(IntervalUnit::DayTime) => { - ScalarValue::IntervalDayTime(Some(0)) + ScalarValue::IntervalDayTime(Some(IntervalDayTime::ZERO)) } DataType::Interval(IntervalUnit::MonthDayNano) => { - ScalarValue::IntervalMonthDayNano(Some(0)) + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::ZERO)) } DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(Some(0)), DataType::Duration(TimeUnit::Millisecond) => { @@ -2151,9 +2151,8 @@ impl ScalarValue { ), ScalarValue::Union(value, fields, _mode) => match value { Some((v_id, value)) => { - let mut field_type_ids = Vec::::with_capacity(fields.len()); - let mut child_arrays = - Vec::<(Field, ArrayRef)>::with_capacity(fields.len()); + let mut new_fields = Vec::with_capacity(fields.len()); + let mut child_arrays = Vec::::with_capacity(fields.len()); for (f_id, field) in fields.iter() { let ar = if f_id == *v_id { value.to_array_of_size(size)? @@ -2162,14 +2161,14 @@ impl ScalarValue { new_null_array(dt, size) }; let field = (**field).clone(); - child_arrays.push((field, ar)); - field_type_ids.push(f_id); + child_arrays.push(ar); + new_fields.push(field.clone()); } - let type_ids = repeat(*v_id).take(size).collect::>(); - let type_ids = Buffer::from_slice_ref(type_ids); - let value_offsets: Option = None; + let type_ids = repeat(*v_id).take(size); + let type_ids = ScalarBuffer::::from_iter(type_ids); + let value_offsets: Option> = None; let ar = UnionArray::try_new( - field_type_ids.as_slice(), + fields.clone(), type_ids, value_offsets, child_arrays, @@ -3219,9 +3218,13 @@ impl fmt::Display for ScalarValue { ScalarValue::Time32Millisecond(e) => format_option!(f, e)?, ScalarValue::Time64Microsecond(e) => format_option!(f, e)?, ScalarValue::Time64Nanosecond(e) => format_option!(f, e)?, - ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, - ScalarValue::IntervalMonthDayNano(e) => format_option!(f, e)?, + ScalarValue::IntervalMonthDayNano(e) => { + format_option!(f, e.map(|v| format!("{v:?}")))? + } + ScalarValue::IntervalDayTime(e) => { + format_option!(f, e.map(|v| format!("{v:?}")))?; + } ScalarValue::DurationSecond(e) => format_option!(f, e)?, ScalarValue::DurationMillisecond(e) => format_option!(f, e)?, ScalarValue::DurationMicrosecond(e) => format_option!(f, e)?, @@ -3447,6 +3450,7 @@ mod tests { use arrow::buffer::OffsetBuffer; use arrow::compute::{is_null, kernels}; use arrow::util::pretty::pretty_format_columns; + use arrow_buffer::Buffer; use arrow_schema::Fields; use chrono::NaiveDate; use rand::Rng; @@ -3988,7 +3992,11 @@ mod tests { #[test] fn test_interval_add_timestamp() -> Result<()> { - let interval = ScalarValue::IntervalMonthDayNano(Some(123)); + let interval = ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 1, + days: 2, + nanoseconds: 3, + })); let timestamp = ScalarValue::TimestampNanosecond(Some(123), None); let result = interval.add(×tamp)?; let expect = timestamp.add(&interval)?; @@ -4000,7 +4008,10 @@ mod tests { let expect = timestamp.add(&interval)?; assert_eq!(result, expect); - let interval = ScalarValue::IntervalDayTime(Some(123)); + let interval = ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 1, + milliseconds: 23, + })); let timestamp = ScalarValue::TimestampNanosecond(Some(123), None); let result = interval.add(×tamp)?; let expect = timestamp.add(&interval)?; @@ -4650,6 +4661,17 @@ mod tests { let str_vals = [Some("foo"), None, Some("bar")]; + let interval_dt_vals = [ + Some(IntervalDayTime::MINUS_ONE), + None, + Some(IntervalDayTime::ONE), + ]; + let interval_mdn_vals = [ + Some(IntervalMonthDayNano::MINUS_ONE), + None, + Some(IntervalMonthDayNano::ONE), + ]; + /// Test each value in `scalar` with the corresponding element /// at `array`. Assumes each element is unique (aka not equal /// with all other indexes) @@ -4795,7 +4817,12 @@ mod tests { Some("UTC".into()) ), make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), - make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), + make_test_case!(interval_dt_vals, IntervalDayTimeArray, IntervalDayTime), + make_test_case!( + interval_mdn_vals, + IntervalMonthDayNanoArray, + IntervalMonthDayNano + ), make_str_dict_test_case!(str_vals, Int8Type), make_str_dict_test_case!(str_vals, Int16Type), make_str_dict_test_case!(str_vals, Int32Type), diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3946758ff9370..7533e2cff1984 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -134,6 +134,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] +arrow-buffer = { workspace = true } async-trait = { workspace = true } bigdecimal = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } diff --git a/datafusion/core/benches/sql_query_with_io.rs b/datafusion/core/benches/sql_query_with_io.rs index 916f48ce40c67..aef39a04e47e8 100644 --- a/datafusion/core/benches/sql_query_with_io.rs +++ b/datafusion/core/benches/sql_query_with_io.rs @@ -96,7 +96,7 @@ async fn setup_files(store: Arc) { let location = Path::from(format!( "{table_name}/partition={partition}/{file}.parquet" )); - store.put(&location, data).await.unwrap(); + store.put(&location, data.into()).await.unwrap(); } } } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 7cc3421ebb480..9462cde436103 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -121,10 +121,9 @@ pub(crate) mod test_util { use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, PutOptions, - PutResult, + Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + PutMultipartOpts, PutOptions, PutPayload, PutResult, }; - use tokio::io::AsyncWrite; pub async fn scan_format( state: &SessionState, @@ -185,25 +184,17 @@ pub(crate) mod test_util { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { unimplemented!() } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> - { - unimplemented!() - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { unimplemented!() } @@ -229,6 +220,7 @@ pub(crate) mod test_util { version: None, }, range: Default::default(), + attributes: Attributes::default(), }) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 39e6900ed53a1..99c38d3f09808 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1115,7 +1115,6 @@ mod tests { use arrow::array::{Array, ArrayRef, StringArray}; use arrow_schema::Field; use async_trait::async_trait; - use bytes::Bytes; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, as_int32_array, as_timestamp_nanosecond_array, @@ -1129,7 +1128,8 @@ mod tests { use log::error; use object_store::local::LocalFileSystem; use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, PutOptions, PutResult, + GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOpts, PutOptions, + PutPayload, PutResult, }; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::ParquetRecordBatchStreamBuilder; @@ -1252,25 +1252,17 @@ mod tests { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { Err(object_store::Error::NotImplemented) } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> - { - Err(object_store::Error::NotImplemented) - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { Err(object_store::Error::NotImplemented) } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index a1e43b20a2daf..8203e414de97b 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -965,7 +965,7 @@ mod tests { let data = bytes::Bytes::from("a,b\n1,2\n3,4"); let path = object_store::path::Path::from("a.csv"); - store.put(&path, data).await.unwrap(); + store.put(&path, data.into()).await.unwrap(); let url = Url::parse("memory://").unwrap(); session_ctx.register_object_store(&url, Arc::new(store)); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index e2548412cc9de..20656634c472e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1376,7 +1376,7 @@ mod tests { }; let in_memory = object_store::memory::InMemory::new(); in_memory - .put(&object_meta.location, data) + .put(&object_meta.location, data.into()) .await .expect("put parquet file into in memory object store"); diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index 9d714df331c30..9ce47153ba4a8 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -19,6 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Int32Array}; +use arrow_buffer::IntervalDayTime; use chrono::{DateTime, TimeZone, Utc}; use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::cast::as_int32_array; @@ -281,7 +282,10 @@ fn select_date_plus_interval() -> Result<()> { let date_plus_interval_expr = to_timestamp_expr(ts_string) .cast_to(&DataType::Date32, schema)? - + Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32))); + + Expr::Literal(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 123, + milliseconds: 0, + }))); let plan = LogicalPlanBuilder::from(table_scan.clone()) .project(vec![date_plus_interval_expr])? @@ -289,7 +293,7 @@ fn select_date_plus_interval() -> Result<()> { // Note that constant folder runs and folds the entire // expression down to a single constant (true) - let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408") + let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("IntervalDayTime { days: 123, milliseconds: 0 }") TableScan: test"#; let actual = get_optimized_plan_formatted(plan, &time); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 0e515fd4647ba..7c1e199ceb95a 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -192,7 +192,7 @@ async fn store_parquet_in_memory( let mut objects = Vec::with_capacity(parquet_batches.len()); for (meta, bytes) in parquet_batches { in_memory - .put(&meta.location, bytes) + .put(&meta.location, bytes.into()) .await .expect("put parquet file into in memory object store"); objects.push(meta); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index ce71c890698e4..bfc5b59f0952e 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -45,10 +45,10 @@ use bytes::Bytes; use chrono::{TimeZone, Utc}; use futures::stream::{self, BoxStream}; use object_store::{ - path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, - ObjectMeta, ObjectStore, PutOptions, PutResult, + path::Path, GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, + ObjectStore, PutOptions, PutResult, }; -use tokio::io::AsyncWrite; +use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload}; use url::Url; #[tokio::test] @@ -631,24 +631,17 @@ impl ObjectStore for MirroringObjectStore { async fn put_opts( &self, _location: &Path, - _bytes: Bytes, + _put_payload: PutPayload, _opts: PutOptions, ) -> object_store::Result { unimplemented!() } - async fn put_multipart( + async fn put_multipart_opts( &self, _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - unimplemented!() - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { + _opts: PutMultipartOpts, + ) -> object_store::Result> { unimplemented!() } @@ -673,6 +666,7 @@ impl ObjectStore for MirroringObjectStore { range: 0..meta.size, payload: GetResultPayload::File(file, path), meta, + attributes: Attributes::default(), }) } diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index df91d93137466..1b6878b6f49e8 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -41,6 +41,7 @@ path = "src/lib.rs" ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } paste = "^1.0" diff --git a/datafusion/expr/src/interval_arithmetic.rs b/datafusion/expr/src/interval_arithmetic.rs index c4890b97e7489..18f92334ff14d 100644 --- a/datafusion/expr/src/interval_arithmetic.rs +++ b/datafusion/expr/src/interval_arithmetic.rs @@ -17,13 +17,13 @@ //! Interval arithmetic library +use crate::type_coercion::binary::get_result_type; +use crate::Operator; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use std::borrow::Borrow; use std::fmt::{self, Display, Formatter}; use std::ops::{AddAssign, SubAssign}; -use crate::type_coercion::binary::get_result_type; -use crate::Operator; - use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::DataType; use arrow::datatypes::{IntervalUnit, TimeUnit}; @@ -71,10 +71,10 @@ macro_rules! get_extreme_value { ScalarValue::IntervalYearMonth(Some(i32::$extreme)) } DataType::Interval(IntervalUnit::DayTime) => { - ScalarValue::IntervalDayTime(Some(i64::$extreme)) + ScalarValue::IntervalDayTime(Some(IntervalDayTime::$extreme)) } DataType::Interval(IntervalUnit::MonthDayNano) => { - ScalarValue::IntervalMonthDayNano(Some(i128::$extreme)) + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::$extreme)) } _ => unreachable!(), } @@ -119,8 +119,14 @@ macro_rules! value_transition { IntervalYearMonth(Some(value)) if value == i32::$bound => { IntervalYearMonth(None) } - IntervalDayTime(Some(value)) if value == i64::$bound => IntervalDayTime(None), - IntervalMonthDayNano(Some(value)) if value == i128::$bound => { + IntervalDayTime(Some(value)) + if value == arrow_buffer::IntervalDayTime::$bound => + { + IntervalDayTime(None) + } + IntervalMonthDayNano(Some(value)) + if value == arrow_buffer::IntervalMonthDayNano::$bound => + { IntervalMonthDayNano(None) } _ => next_value_helper::<$direction>($value), @@ -1013,6 +1019,25 @@ macro_rules! impl_OneTrait{ } impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, i128} +impl OneTrait for IntervalDayTime { + fn one() -> Self { + IntervalDayTime { + days: 0, + milliseconds: 1, + } + } +} + +impl OneTrait for IntervalMonthDayNano { + fn one() -> Self { + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + } + } +} + /// This function either increments or decrements its argument, depending on /// the `INC` value (where a `true` value corresponds to the increment). fn increment_decrement( @@ -1075,11 +1100,15 @@ fn next_value_helper(value: ScalarValue) -> ScalarValue { IntervalYearMonth(Some(val)) => { IntervalYearMonth(Some(increment_decrement::(val))) } - IntervalDayTime(Some(val)) => { - IntervalDayTime(Some(increment_decrement::(val))) - } + IntervalDayTime(Some(val)) => IntervalDayTime(Some(increment_decrement::< + INC, + arrow_buffer::IntervalDayTime, + >(val))), IntervalMonthDayNano(Some(val)) => { - IntervalMonthDayNano(Some(increment_decrement::(val))) + IntervalMonthDayNano(Some(increment_decrement::< + INC, + arrow_buffer::IntervalMonthDayNano, + >(val))) } _ => value, // Unbounded values return without change. } diff --git a/datafusion/functions-array/src/range.rs b/datafusion/functions-array/src/range.rs index 8c73bd8213466..269eaa5602305 100644 --- a/datafusion/functions-array/src/range.rs +++ b/datafusion/functions-array/src/range.rs @@ -22,7 +22,9 @@ use arrow::array::{Array, ArrayRef, Int64Array, ListArray}; use arrow::datatypes::{DataType, Field}; use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; use arrow_array::{Date32Array, NullArray}; -use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; +use arrow_buffer::{ + BooleanBufferBuilder, IntervalMonthDayNano, NullBuffer, OffsetBuffer, +}; use arrow_schema::DataType::{Date32, Int64, Interval, List}; use arrow_schema::IntervalUnit::MonthDayNano; use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; @@ -314,7 +316,13 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result { for (idx, stop) in stop_array.iter().enumerate() { let mut stop = stop.unwrap_or(0); let start = start_array.as_ref().map(|x| x.value(idx)).unwrap_or(0); - let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or(1); + let step = step_array.as_ref().map(|arr| arr.value(idx)).unwrap_or( + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + }, + ); let (months, days, _) = IntervalMonthDayNanoType::to_parts(step); let neg = months < 0 || days < 0; if !include_upper { diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index efc12e71a9ac3..20d6cbc374595 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -87,6 +87,7 @@ uuid = { version = "1.7", features = ["v4"], optional = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } +arrow-buffer = { workspace = true } criterion = "0.5" rand = { workspace = true } rstest = { workspace = true } diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index a5404532ace64..e777e5ea95d07 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -445,6 +445,7 @@ mod tests { use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::{DataType, TimeUnit}; + use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion_common::ScalarValue; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; @@ -453,7 +454,10 @@ mod tests { #[test] fn test_date_bin() { let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -461,21 +465,33 @@ mod tests { let timestamps = Arc::new((1..6).map(Some).collect::()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Array(timestamps), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); // stride supports month-day-nano let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some( + IntervalMonthDayNano { + months: 0, + days: 0, + nanoseconds: 1, + }, + ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -486,8 +502,12 @@ mod tests { // // invalid number of arguments - let res = DateBinFunc::new() - .invoke(&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1)))]); + let res = DateBinFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + })), + )]); assert_eq!( res.err().unwrap().strip_backtrace(), "Execution error: DATE_BIN expected two or three arguments" @@ -506,7 +526,10 @@ mod tests { // stride: invalid value let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(0))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 0, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -517,7 +540,9 @@ mod tests { // stride: overflow of day-time interval let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(i64::MAX))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some( + IntervalDayTime::MAX, + ))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); @@ -550,7 +575,10 @@ mod tests { // origin: invalid type let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ]); @@ -560,14 +588,26 @@ mod tests { ); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ]); assert!(res.is_ok()); // unsupported array type for stride - let intervals = Arc::new((1..6).map(Some).collect::()); + let intervals = Arc::new( + (1..6) + .map(|x| { + Some(IntervalDayTime { + days: 0, + milliseconds: x, + }) + }) + .collect::(), + ); let res = DateBinFunc::new().invoke(&[ ColumnarValue::Array(intervals), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), @@ -581,7 +621,10 @@ mod tests { // unsupported array type for origin let timestamps = Arc::new((1..6).map(Some).collect::()); let res = DateBinFunc::new().invoke(&[ - ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(1))), + ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 1, + }))), ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)), ColumnarValue::Array(timestamps), ]); diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index af878b4505bc7..7c6f2e42605a2 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -591,7 +591,7 @@ mod tests { ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef); let expected_err = - "Arrow error: Parser error: Invalid timezone \"ZZ\": 'ZZ' is not a valid timezone"; + "Arrow error: Parser error: Invalid timezone \"ZZ\": failed to parse timezone"; match to_timestamp(&[string_array]) { Ok(_) => panic!("Expected error but got success"), Err(e) => { diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 67d5c9b23b74b..e703250c92e15 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -51,7 +51,9 @@ indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" + [dev-dependencies] +arrow-buffer = { workspace = true } ctor = { workspace = true } datafusion-sql = { workspace = true } env_logger = { workspace = true } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 31dc9028b915f..0c8e4ae34a90a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1082,13 +1082,13 @@ mod test { #[test] fn binary_op_date32_op_interval() -> Result<()> { - //CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("386547056640") + // CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("...") let expr = cast(lit("1998-03-18"), DataType::Date32) - + lit(ScalarValue::IntervalDayTime(Some(386547056640))); + + lit(ScalarValue::new_interval_dt(123, 456)); let empty = empty(); let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?); let expected = - "Projection: CAST(Utf8(\"1998-03-18\") AS Date32) + IntervalDayTime(\"386547056640\")\n EmptyRelation"; + "Projection: CAST(Utf8(\"1998-03-18\") AS Date32) + IntervalDayTime(\"IntervalDayTime { days: 123, milliseconds: 456 }\")\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index dd61fc802441f..a36ec9c8ebdc7 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -33,6 +33,7 @@ use arrow::compute::take; use arrow::datatypes::*; use arrow::util::bit_iterator::BitIndexIterator; use arrow::{downcast_dictionary_array, downcast_primitive_array}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion_common::cast::{ as_boolean_array, as_generic_binary_array, as_string_array, }; @@ -258,6 +259,7 @@ macro_rules! is_equal { } is_equal!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64); is_equal!(bool, str, [u8]); +is_equal!(IntervalDayTime, IntervalMonthDayNano); macro_rules! is_equal_float { ($($t:ty),+) => { diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 0c25e26d17aa3..5ba628e7ce404 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -723,6 +723,7 @@ mod tests { use crate::intervals::test_utils::gen_conjunctive_numerical_expr; use arrow::datatypes::TimeUnit; + use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::Field; use datafusion_common::ScalarValue; @@ -1390,9 +1391,17 @@ mod tests { )?; let right_child = Interval::try_new( // 1 day 321 ns - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), // 1 day 321 ns - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), )?; let children = vec![&left_child, &right_child]; let result = expression @@ -1415,9 +1424,17 @@ mod tests { )?, Interval::try_new( // 1 day 321 ns in Duration type - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), // 1 day 321 ns in Duration type - ScalarValue::IntervalMonthDayNano(Some(0x1_0000_0000_0000_0141)), + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months: 0, + days: 1, + nanoseconds: 321, + })), )? ], result @@ -1446,10 +1463,16 @@ mod tests { ScalarValue::TimestampMillisecond(Some(1_603_188_672_000), None), )?; let left_child = Interval::try_new( - // 2 days - ScalarValue::IntervalDayTime(Some(172_800_000)), - // 10 days - ScalarValue::IntervalDayTime(Some(864_000_000)), + // 2 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 172_800_000, + })), + // 10 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 864_000_000, + })), )?; let children = vec![&left_child, &right_child]; let result = expression @@ -1459,10 +1482,16 @@ mod tests { assert_eq!( vec![ Interval::try_new( - // 2 days - ScalarValue::IntervalDayTime(Some(172_800_000)), + // 2 days in millisecond + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 172_800_000, + })), // 6 days - ScalarValue::IntervalDayTime(Some(518_400_000)), + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days: 0, + milliseconds: 518_400_000, + })), )?, Interval::try_new( // 10.10.2020 - 10:11:12 AM diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index e188b2d56baeb..b426a656fba9e 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -24,15 +24,12 @@ use crate::{ PhysicalExpr, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::{DataType, SchemaRef}; -use datafusion_common::{internal_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000; -const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF; -const DT_MS_MASK: i64 = 0xFFFF_FFFF; - /// Indicates whether interval arithmetic is supported for the given expression. /// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. /// We do not support every type of [`Operator`]s either. Over time, this check @@ -172,15 +169,9 @@ fn convert_duration_bound_to_interval( /// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part. /// Otherwise, it returns an error. -fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { - let months = mdn >> 96; - let days = (mdn & MDN_DAY_MASK) >> 64; - let nanoseconds = mdn & MDN_NS_MASK; - - if months == 0 && days == 0 { - nanoseconds - .try_into() - .map_err(|_| internal_datafusion_err!("Resulting duration exceeds i64::MAX")) +fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result { + if mdn.months == 0 && mdn.days == 0 { + Ok(mdn.nanoseconds) } else { internal_err!( "The interval cannot have a non-zero month or day value for duration convertibility" @@ -190,12 +181,10 @@ fn interval_mdn_to_duration_ns(mdn: &i128) -> Result { /// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part. /// Otherwise, it returns an error. -fn interval_dt_to_duration_ms(dt: &i64) -> Result { - let days = dt >> 32; - let milliseconds = dt & DT_MS_MASK; - - if days == 0 { - Ok(milliseconds) +fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result { + if dt.days == 0 { + // Safe to cast i32 to i64 + Ok(dt.milliseconds as i64) } else { internal_err!( "The interval cannot have a non-zero day value for duration convertibility" diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs index 18d20f3c47e68..d5b7f1b11ac55 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs @@ -23,6 +23,7 @@ use arrow::datatypes::i256; use arrow::record_batch::RecordBatch; use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; @@ -53,6 +54,7 @@ macro_rules! hash_integer { } hash_integer!(i8, i16, i32, i64, i128, i256); hash_integer!(u8, u16, u32, u64); +hash_integer!(IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float { ($($t:ty),+) => { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bb95852ff437..356eb7c86a695 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1484,6 +1484,7 @@ mod tests { ))]; let task_ctx = if spill { + // set to an appropriate value to trigger spill new_spill_ctx(2, 1600) } else { Arc::new(TaskContext::default()) @@ -1545,8 +1546,13 @@ mod tests { input_schema, )?); - let result = - common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; + let task_ctx = if spill { + // enlarge memory limit to let the final aggregation finish + new_spill_ctx(2, 2600) + } else { + task_ctx.clone() + }; + let result = common::collect(merged_aggregate.execute(0, task_ctx)?).await?; let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 2); assert_eq!(batch.num_rows(), 3); @@ -1941,8 +1947,13 @@ mod tests { for use_coalesce_batches in [false, true] { for is_first_acc in [false, true] { for spill in [false, true] { - first_last_multi_partitions(use_coalesce_batches, is_first_acc, spill) - .await? + first_last_multi_partitions( + use_coalesce_batches, + is_first_acc, + spill, + 4200, + ) + .await? } } } @@ -2030,9 +2041,10 @@ mod tests { use_coalesce_batches: bool, is_first_acc: bool, spill: bool, + max_memory: usize, ) -> Result<()> { let task_ctx = if spill { - new_spill_ctx(2, 3200) + new_spill_ctx(2, max_memory) } else { Arc::new(TaskContext::default()) }; diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index bae4c6133b9f7..2b02fff1f5737 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -26,6 +26,7 @@ use arrow_array::cast::AsArray; use arrow_array::{ downcast_primitive, Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, StringArray, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -363,6 +364,7 @@ macro_rules! has_integer { has_integer!(i8, i16, i32, i64, i128, i256); has_integer!(u8, u16, u32, u64); +has_integer!(IntervalDayTime, IntervalMonthDayNano); hash_float!(f16, f32, f64); pub fn new_hash_table(limit: usize, kt: DataType) -> Result> { diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index 41826ed728539..51593f5c28cef 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -20,6 +20,7 @@ use arrow::datatypes::i256; use arrow_array::cast::AsArray; use arrow_array::{downcast_primitive, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -431,6 +432,7 @@ macro_rules! compare_integer { compare_integer!(i8, i16, i32, i64, i128, i256); compare_integer!(u8, u16, u32, u64); +compare_integer!(IntervalDayTime, IntervalMonthDayNano); compare_float!(f16, f32, f64); pub fn new_heap(limit: usize, desc: bool, vt: DataType) -> Result> { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 449c42d697976..7b4d790479b14 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -2290,8 +2290,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), #[values(0, 1, 2)] case_expr: usize, @@ -2375,8 +2375,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), ) -> Result<()> { @@ -2452,8 +2452,8 @@ mod tests { )] join_type: JoinType, #[values( - (4, 5), - (12, 17), + (4, 5), + (12, 17), )] cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5)] case_expr: usize, diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 6fb3aef5d5bfd..9598ed83aa580 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -33,6 +33,7 @@ use arrow_array::{ ArrayRef, Float64Array, Int32Array, IntervalDayTimeArray, RecordBatch, TimestampMillisecondArray, }; +use arrow_buffer::IntervalDayTime; use arrow_schema::{DataType, Schema}; use datafusion_common::{Result, ScalarValue}; use datafusion_execution::TaskContext; @@ -462,8 +463,11 @@ pub fn build_sides_record_batches( )); let interval_time: ArrayRef = Arc::new(IntervalDayTimeArray::from( initial_range - .map(|x| x as i64 * 100) // x * 100ms - .collect::>(), + .map(|x| IntervalDayTime { + days: 0, + milliseconds: x * 100, + }) // x * 100ms + .collect::>(), )); let float_asc = Arc::new(Float64Array::from_iter_values( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index bd99012064046..d9ec7dbb51438 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -206,6 +206,11 @@ message ScalarDictionaryValue { ScalarValue value = 2; } +message IntervalDayTimeValue { + int32 days = 1; + int32 milliseconds = 2; +} + message IntervalMonthDayNanoValue { int32 months = 1; int32 days = 2; @@ -266,7 +271,6 @@ message ScalarValue{ int64 date_64_value = 21; int32 interval_yearmonth_value = 24; - int64 interval_daytime_value = 25; int64 duration_second_value = 35; int64 duration_millisecond_value = 36; @@ -278,6 +282,7 @@ message ScalarValue{ bytes binary_value = 28; bytes large_binary_value = 29; ScalarTime64Value time64_value = 30; + IntervalDayTimeValue interval_daytime_value = 25; IntervalMonthDayNanoValue interval_month_day_nano = 31; ScalarFixedSizeBinary fixed_size_binary_value = 34; UnionValue union_value = 42; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index aa2f22a365606..3ae70318fa159 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -25,8 +25,8 @@ use arrow::array::{ArrayRef, AsArray}; use arrow::buffer::Buffer; use arrow::csv::WriterBuilder; use arrow::datatypes::{ - i256, DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit, - UnionFields, UnionMode, + i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, + Schema, TimeUnit, UnionFields, UnionMode, }; use arrow::ipc::{reader::read_record_batch, root_as_message}; @@ -525,7 +525,6 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { } } Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)), - Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)), Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)), Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)), Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)), @@ -573,6 +572,9 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { } Value::BinaryValue(v) => Self::Binary(Some(v.clone())), Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())), + Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some( + IntervalDayTimeType::make_value(v.days, v.milliseconds), + )), Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some( IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos), )), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index a5f7ec298e87d..6b23724336849 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -3417,6 +3417,118 @@ impl<'de> serde::Deserialize<'de> for FixedSizeList { deserializer.deserialize_struct("datafusion_common.FixedSizeList", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for IntervalDayTimeValue { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.days != 0 { + len += 1; + } + if self.milliseconds != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.IntervalDayTimeValue", len)?; + if self.days != 0 { + struct_ser.serialize_field("days", &self.days)?; + } + if self.milliseconds != 0 { + struct_ser.serialize_field("milliseconds", &self.milliseconds)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for IntervalDayTimeValue { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "days", + "milliseconds", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Days, + Milliseconds, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "days" => Ok(GeneratedField::Days), + "milliseconds" => Ok(GeneratedField::Milliseconds), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = IntervalDayTimeValue; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.IntervalDayTimeValue") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut days__ = None; + let mut milliseconds__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Days => { + if days__.is_some() { + return Err(serde::de::Error::duplicate_field("days")); + } + days__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Milliseconds => { + if milliseconds__.is_some() { + return Err(serde::de::Error::duplicate_field("milliseconds")); + } + milliseconds__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(IntervalDayTimeValue { + days: days__.unwrap_or_default(), + milliseconds: milliseconds__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.IntervalDayTimeValue", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for IntervalMonthDayNanoValue { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -6186,10 +6298,6 @@ impl serde::Serialize for ScalarValue { scalar_value::Value::IntervalYearmonthValue(v) => { struct_ser.serialize_field("intervalYearmonthValue", v)?; } - scalar_value::Value::IntervalDaytimeValue(v) => { - #[allow(clippy::needless_borrow)] - struct_ser.serialize_field("intervalDaytimeValue", ToString::to_string(&v).as_str())?; - } scalar_value::Value::DurationSecondValue(v) => { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("durationSecondValue", ToString::to_string(&v).as_str())?; @@ -6223,6 +6331,9 @@ impl serde::Serialize for ScalarValue { scalar_value::Value::Time64Value(v) => { struct_ser.serialize_field("time64Value", v)?; } + scalar_value::Value::IntervalDaytimeValue(v) => { + struct_ser.serialize_field("intervalDaytimeValue", v)?; + } scalar_value::Value::IntervalMonthDayNano(v) => { struct_ser.serialize_field("intervalMonthDayNano", v)?; } @@ -6292,8 +6403,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "date64Value", "interval_yearmonth_value", "intervalYearmonthValue", - "interval_daytime_value", - "intervalDaytimeValue", "duration_second_value", "durationSecondValue", "duration_millisecond_value", @@ -6312,6 +6421,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "largeBinaryValue", "time64_value", "time64Value", + "interval_daytime_value", + "intervalDaytimeValue", "interval_month_day_nano", "intervalMonthDayNano", "fixed_size_binary_value", @@ -6346,7 +6457,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { Decimal256Value, Date64Value, IntervalYearmonthValue, - IntervalDaytimeValue, DurationSecondValue, DurationMillisecondValue, DurationMicrosecondValue, @@ -6356,6 +6466,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { BinaryValue, LargeBinaryValue, Time64Value, + IntervalDaytimeValue, IntervalMonthDayNano, FixedSizeBinaryValue, UnionValue, @@ -6404,7 +6515,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "decimal256Value" | "decimal256_value" => Ok(GeneratedField::Decimal256Value), "date64Value" | "date_64_value" => Ok(GeneratedField::Date64Value), "intervalYearmonthValue" | "interval_yearmonth_value" => Ok(GeneratedField::IntervalYearmonthValue), - "intervalDaytimeValue" | "interval_daytime_value" => Ok(GeneratedField::IntervalDaytimeValue), "durationSecondValue" | "duration_second_value" => Ok(GeneratedField::DurationSecondValue), "durationMillisecondValue" | "duration_millisecond_value" => Ok(GeneratedField::DurationMillisecondValue), "durationMicrosecondValue" | "duration_microsecond_value" => Ok(GeneratedField::DurationMicrosecondValue), @@ -6414,6 +6524,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "binaryValue" | "binary_value" => Ok(GeneratedField::BinaryValue), "largeBinaryValue" | "large_binary_value" => Ok(GeneratedField::LargeBinaryValue), "time64Value" | "time64_value" => Ok(GeneratedField::Time64Value), + "intervalDaytimeValue" | "interval_daytime_value" => Ok(GeneratedField::IntervalDaytimeValue), "intervalMonthDayNano" | "interval_month_day_nano" => Ok(GeneratedField::IntervalMonthDayNano), "fixedSizeBinaryValue" | "fixed_size_binary_value" => Ok(GeneratedField::FixedSizeBinaryValue), "unionValue" | "union_value" => Ok(GeneratedField::UnionValue), @@ -6591,12 +6702,6 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { } value__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::IntervalYearmonthValue(x.0)); } - GeneratedField::IntervalDaytimeValue => { - if value__.is_some() { - return Err(serde::de::Error::duplicate_field("intervalDaytimeValue")); - } - value__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::IntervalDaytimeValue(x.0)); - } GeneratedField::DurationSecondValue => { if value__.is_some() { return Err(serde::de::Error::duplicate_field("durationSecondValue")); @@ -6652,6 +6757,13 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { return Err(serde::de::Error::duplicate_field("time64Value")); } value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::Time64Value) +; + } + GeneratedField::IntervalDaytimeValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("intervalDaytimeValue")); + } + value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::IntervalDaytimeValue) ; } GeneratedField::IntervalMonthDayNano => { diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index c8f277c8f37e1..48da143bc7ed7 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -275,6 +275,14 @@ pub struct ScalarDictionaryValue { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IntervalDayTimeValue { + #[prost(int32, tag = "1")] + pub days: i32, + #[prost(int32, tag = "2")] + pub milliseconds: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IntervalMonthDayNanoValue { #[prost(int32, tag = "1")] pub months: i32, @@ -318,7 +326,7 @@ pub struct ScalarFixedSizeBinary { pub struct ScalarValue { #[prost( oneof = "scalar_value::Value", - tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 25, 35, 36, 37, 38, 26, 27, 28, 29, 30, 31, 34, 42" + tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 30, 25, 31, 34, 42" )] pub value: ::core::option::Option, } @@ -378,8 +386,6 @@ pub mod scalar_value { Date64Value(i64), #[prost(int32, tag = "24")] IntervalYearmonthValue(i32), - #[prost(int64, tag = "25")] - IntervalDaytimeValue(i64), #[prost(int64, tag = "35")] DurationSecondValue(i64), #[prost(int64, tag = "36")] @@ -398,6 +404,8 @@ pub mod scalar_value { LargeBinaryValue(::prost::alloc::vec::Vec), #[prost(message, tag = "30")] Time64Value(super::ScalarTime64Value), + #[prost(message, tag = "25")] + IntervalDaytimeValue(super::IntervalDayTimeValue), #[prost(message, tag = "31")] IntervalMonthDayNano(super::IntervalMonthDayNanoValue), #[prost(message, tag = "34")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a92deaa88b1ca..28f6952aac44e 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -24,8 +24,8 @@ use crate::protobuf_common::{ use arrow::array::{ArrayRef, RecordBatch}; use arrow::csv::WriterBuilder; use arrow::datatypes::{ - DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, - UnionMode, + DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, + SchemaRef, TimeUnit, UnionMode, }; use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator}; use datafusion_common::{ @@ -452,11 +452,6 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { Value::IntervalYearmonthValue(*s) }) } - ScalarValue::IntervalDayTime(val) => { - create_proto_scalar(val.as_ref(), &data_type, |s| { - Value::IntervalDaytimeValue(*s) - }) - } ScalarValue::Null => Ok(protobuf::ScalarValue { value: Some(Value::NullValue((&data_type).try_into()?)), }), @@ -526,6 +521,20 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } + ScalarValue::IntervalDayTime(val) => { + let value = if let Some(v) = val { + let (days, milliseconds) = IntervalDayTimeType::to_parts(*v); + Value::IntervalDaytimeValue(protobuf::IntervalDayTimeValue { + days, + milliseconds, + }) + } else { + Value::NullValue((&data_type).try_into()?) + }; + + Ok(protobuf::ScalarValue { value: Some(value) }) + } + ScalarValue::IntervalMonthDayNano(v) => { let value = if let Some(v) = v { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v); diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index c8f277c8f37e1..48da143bc7ed7 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -275,6 +275,14 @@ pub struct ScalarDictionaryValue { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct IntervalDayTimeValue { + #[prost(int32, tag = "1")] + pub days: i32, + #[prost(int32, tag = "2")] + pub milliseconds: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct IntervalMonthDayNanoValue { #[prost(int32, tag = "1")] pub months: i32, @@ -318,7 +326,7 @@ pub struct ScalarFixedSizeBinary { pub struct ScalarValue { #[prost( oneof = "scalar_value::Value", - tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 25, 35, 36, 37, 38, 26, 27, 28, 29, 30, 31, 34, 42" + tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 30, 25, 31, 34, 42" )] pub value: ::core::option::Option, } @@ -378,8 +386,6 @@ pub mod scalar_value { Date64Value(i64), #[prost(int32, tag = "24")] IntervalYearmonthValue(i32), - #[prost(int64, tag = "25")] - IntervalDaytimeValue(i64), #[prost(int64, tag = "35")] DurationSecondValue(i64), #[prost(int64, tag = "36")] @@ -398,6 +404,8 @@ pub mod scalar_value { LargeBinaryValue(::prost::alloc::vec::Vec), #[prost(message, tag = "30")] Time64Value(super::ScalarTime64Value), + #[prost(message, tag = "25")] + IntervalDaytimeValue(super::IntervalDayTimeValue), #[prost(message, tag = "31")] IntervalMonthDayNano(super::IntervalMonthDayNanoValue), #[prost(message, tag = "34")] diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 1f064ea0f5430..f7c4edbcc7ad2 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2139,7 +2139,7 @@ fn union_with_incompatible_data_type() { .expect_err("query should have failed") .strip_backtrace(); assert_eq!( - "Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with column IntervalMonthDayNano(\"950737950189618795196236955648\") (type: Interval(MonthDayNano))", + "Error during planning: UNION Column Int64(1) (type: Int64) is not compatible with column IntervalMonthDayNano(\"IntervalMonthDayNano { months: 12, days: 1, nanoseconds: 0 }\") (type: Interval(MonthDayNano))", err ); } @@ -2829,7 +2829,7 @@ fn join_with_aliases() { fn negative_interval_plus_interval_in_projection() { let sql = "select -interval '2 days' + interval '5 days';"; let expected = - "Projection: IntervalMonthDayNano(\"79228162477370849446124847104\") + IntervalMonthDayNano(\"92233720368547758080\")\n EmptyRelation"; + "Projection: IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n EmptyRelation"; quick_test(sql, expected); } @@ -2837,7 +2837,7 @@ fn negative_interval_plus_interval_in_projection() { fn complex_interval_expression_in_projection() { let sql = "select -interval '2 days' + interval '5 days'+ (-interval '3 days' + interval '5 days');"; let expected = - "Projection: IntervalMonthDayNano(\"79228162477370849446124847104\") + IntervalMonthDayNano(\"92233720368547758080\") + IntervalMonthDayNano(\"79228162458924105372415295488\") + IntervalMonthDayNano(\"92233720368547758080\")\n EmptyRelation"; + "Projection: IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: -3, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n EmptyRelation"; quick_test(sql, expected); } @@ -2845,7 +2845,7 @@ fn complex_interval_expression_in_projection() { fn negative_sum_intervals_in_projection() { let sql = "select -((interval '2 days' + interval '5 days') + -(interval '4 days' + interval '7 days'));"; let expected = - "Projection: (- IntervalMonthDayNano(\"36893488147419103232\") + IntervalMonthDayNano(\"92233720368547758080\") + (- IntervalMonthDayNano(\"73786976294838206464\") + IntervalMonthDayNano(\"129127208515966861312\")))\n EmptyRelation"; + "Projection: (- IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 2, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\") + (- IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 4, nanoseconds: 0 }\") + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 7, nanoseconds: 0 }\")))\n EmptyRelation"; quick_test(sql, expected); } @@ -2853,8 +2853,7 @@ fn negative_sum_intervals_in_projection() { fn date_plus_interval_in_projection() { let sql = "select t_date32 + interval '5 days' FROM test"; let expected = - "Projection: test.t_date32 + IntervalMonthDayNano(\"92233720368547758080\")\ - \n TableScan: test"; + "Projection: test.t_date32 + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }\")\n TableScan: test"; quick_test(sql, expected); } @@ -2866,7 +2865,7 @@ fn date_plus_interval_in_filter() { AND cast('1999-12-31' as date) + interval '30 days'"; let expected = "Projection: test.t_date64\ - \n Filter: test.t_date64 BETWEEN CAST(Utf8(\"1999-12-31\") AS Date32) AND CAST(Utf8(\"1999-12-31\") AS Date32) + IntervalMonthDayNano(\"553402322211286548480\")\ + \n Filter: test.t_date64 BETWEEN CAST(Utf8(\"1999-12-31\") AS Date32) AND CAST(Utf8(\"1999-12-31\") AS Date32) + IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 30, nanoseconds: 0 }\")\ \n TableScan: test"; quick_test(sql, expected); } diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 94cce61245e17..c928b96e03216 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -336,7 +336,7 @@ select arrow_cast(timestamp '2000-01-01T00:00:00Z', 'Timestamp(Nanosecond, Some( ---- 2000-01-01T00:00:00+08:00 -statement error DataFusion error: Arrow error: Parser error: Invalid timezone "\+25:00": '\+25:00' is not a valid timezone +statement error DataFusion error: Arrow error: Parser error: Invalid timezone "\+25:00": failed to parse timezone select arrow_cast(timestamp '2000-01-01T00:00:00', 'Timestamp(Nanosecond, Some( "+25:00" ))'); diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index cb2bb9fad1b70..033ea2208f1aa 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -63,7 +63,7 @@ SELECT NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL # test_array_cast_invalid_timezone_will_panic -statement error Parser error: Invalid timezone "Foo": 'Foo' is not a valid timezone +statement error Parser error: Invalid timezone "Foo": failed to parse timezone SELECT arrow_cast('2021-01-02T03:04:00', 'Timestamp(Nanosecond, Some("Foo"))') # test_array_index diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index efbf0df3830ce..cbcfc93645654 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4226,7 +4226,7 @@ logical_plan 01)Limit: skip=0, fetch=5 02)--Sort: time_chunks DESC NULLS FIRST, fetch=5 03)----Projection: date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts) AS time_chunks -04)------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), unbounded_csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)]], aggr=[[]] +04)------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)]], aggr=[[]] 05)--------TableScan: unbounded_csv_with_timestamps projection=[ts] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 @@ -4235,7 +4235,7 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted 05)--------CoalesceBatchesExec: target_batch_size=2 06)----------RepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0], 8), input_partitions=8, preserve_order=true, sort_exprs=date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)@0 DESC -07)------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +07)------------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted 08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 09)----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] @@ -4328,12 +4328,12 @@ EXPLAIN SELECT name, date_bin('15 minutes', ts) as time_chunks logical_plan 01)Limit: skip=0, fetch=5 02)--Sort: unbounded_csv_with_timestamps2.name DESC NULLS FIRST, time_chunks DESC NULLS FIRST, fetch=5 -03)----Projection: unbounded_csv_with_timestamps2.name, date_bin(IntervalMonthDayNano("900000000000"), unbounded_csv_with_timestamps2.ts) AS time_chunks +03)----Projection: unbounded_csv_with_timestamps2.name, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps2.ts) AS time_chunks 04)------TableScan: unbounded_csv_with_timestamps2 projection=[name, ts] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--SortPreservingMergeExec: [name@0 DESC,time_chunks@1 DESC], fetch=5 -03)----ProjectionExec: expr=[name@0 as name, date_bin(900000000000, ts@1) as time_chunks] +03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] 04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 2678e8cbd1bab..51de40fb19723 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -465,11 +465,11 @@ ORDER BY db15; ---- logical_plan 01)Sort: db15 ASC NULLS LAST -02)--Projection: date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 +02)--Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 03)----TableScan: csv_with_timestamps projection=[ts] physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[date_bin(900000000000, ts@0, 1659537600000000000) as db15] +02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index fccd144a37fb6..6f19c9f4d42ff 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -216,19 +216,19 @@ set datafusion.catalog.information_schema = true statement ok SET TIME ZONE = '+08:00:00' -statement error Arrow error: Parser error: Invalid timezone "\+08:00:00": '\+08:00:00' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "\+08:00:00": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = '08:00' -statement error Arrow error: Parser error: Invalid timezone "08:00": '08:00' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "08:00": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = '08' -statement error Arrow error: Parser error: Invalid timezone "08": '08' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "08": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok @@ -242,5 +242,5 @@ SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ statement ok SET TIME ZONE = 'Asia/Taipei2' -statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": 'Asia/Taipei2' is not a valid timezone +statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": failed to parse timezone SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 5f75bca4f0fab..7d5d601bbfdd3 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -466,7 +466,7 @@ query error Cannot cast string '24:01:02' to value of Time64\(Nanosecond\) type SELECT TIME '24:01:02' as time; # invalid timezone -query error Arrow error: Parser error: Invalid timezone "ZZ": 'ZZ' is not a valid timezone +query error Arrow error: Parser error: Invalid timezone "ZZ": failed to parse timezone SELECT TIMESTAMP '2023-12-05T21:58:10.45ZZ'; statement ok diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index f614fd6b3fd05..9322412c0ddb4 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -32,6 +32,7 @@ rust-version = "1.73" workspace = true [dependencies] +arrow-buffer = { workspace = true } async-recursion = "1.0" chrono = { workspace = true } datafusion = { workspace = true, default-features = true } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 597f34e89a02a..d68711e8609cb 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -23,11 +23,13 @@ use datafusion::common::{ not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::{ aggregate_function, expr::find_df_window_func, BinaryExpr, Case, EmptyRelation, Expr, LogicalPlan, Operator, ScalarUDF, Values, }; + use datafusion::logical_expr::{ expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning, Repartition, Subquery, WindowFrameBound, WindowFrameUnits, @@ -1563,7 +1565,13 @@ fn from_substrait_literal( "Failed to parse interval day time value" ) })?; - ScalarValue::IntervalDayTime(Some(i64::from_le_bytes(value_slice))) + let days = i32::from_le_bytes(value_slice[0..4].try_into().unwrap()); + let milliseconds = + i32::from_le_bytes(value_slice[4..8].try_into().unwrap()); + ScalarValue::IntervalDayTime(Some(IntervalDayTime { + days, + milliseconds, + })) } INTERVAL_MONTH_DAY_NANO_TYPE_REF => { let Some(Val::Value(raw_val)) = user_defined.val.as_ref() else { @@ -1575,9 +1583,16 @@ fn from_substrait_literal( "Failed to parse interval month day nano value" ) })?; - ScalarValue::IntervalMonthDayNano(Some(i128::from_le_bytes( - value_slice, - ))) + let months = + i32::from_le_bytes(value_slice[0..4].try_into().unwrap()); + let days = i32::from_le_bytes(value_slice[4..8].try_into().unwrap()); + let nanoseconds = + i64::from_le_bytes(value_slice[8..16].try_into().unwrap()); + ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano { + months, + days, + nanoseconds, + })) } _ => { return not_impl_err!( diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 0208b010c856b..6c8be4aa9b121 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; +use arrow_buffer::ToByteSlice; use datafusion::arrow::datatypes::IntervalUnit; use datafusion::logical_expr::{ CrossJoin, Distinct, Like, Partitioning, WindowFrameUnits, @@ -1949,7 +1950,7 @@ fn to_substrait_literal(value: &ScalarValue) -> Result { } ScalarValue::IntervalMonthDayNano(Some(i)) => { // treat `i128` as two contiguous `i64` - let bytes = i.to_le_bytes(); + let bytes = i.to_byte_slice(); let i64_param = Parameter { parameter: Some(parameter::Parameter::DataType(substrait::proto::Type { kind: Some(r#type::Kind::I64(r#type::I64 { @@ -1971,7 +1972,7 @@ fn to_substrait_literal(value: &ScalarValue) -> Result { ) } ScalarValue::IntervalDayTime(Some(i)) => { - let bytes = i.to_le_bytes(); + let bytes = i.to_byte_slice(); ( LiteralType::UserDefined(UserDefined { type_reference: INTERVAL_DAY_TIME_TYPE_REF,