diff --git a/Cargo.toml b/Cargo.toml index 26a0ae22e2..730d156cfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,14 +42,14 @@ rust-version = "1.77.1" anyhow = "1.0.72" apache-avro = "0.17" array-init = "2" -arrow-arith = { version = "54.1.0" } -arrow-array = { version = "54.1.0" } -arrow-buffer = { version = "54.1.0" } -arrow-cast = { version = "54.1.0" } -arrow-ord = { version = "54.1.0" } -arrow-schema = { version = "54.1.0" } -arrow-select = { version = "54.1.0" } -arrow-string = { version = "54.1.0" } +arrow-arith = { version = "54.2.0" } +arrow-array = { version = "54.2.0" } +arrow-buffer = { version = "54.2.0" } +arrow-cast = { version = "54.2.0" } +arrow-ord = { version = "54.2.0" } +arrow-schema = { version = "54.2.0" } +arrow-select = { version = "54.2.0" } +arrow-string = { version = "54.2.0" } async-trait = "0.1.86" async-std = "1.12" aws-config = "1" @@ -76,7 +76,7 @@ num-bigint = "0.4.6" once_cell = "1.20" opendal = "0.51.2" ordered-float = "4" -parquet = "54.1.0" +parquet = "54.2.0" pilota = "0.11.2" pretty_assertions = "1.4" port_scanner = "0.1.5" diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 1ee89963d6..631755620a 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -15,15 +15,21 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; -use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; -use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMicrosecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field}; use futures::{stream, StreamExt}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use crate::arrow::{schema_to_arrow_schema, DEFAULT_MAP_FIELD_NAME}; use crate::scan::ArrowRecordBatchStream; +use crate::spec::{ + MapType, NestedField, PrimitiveType, Type, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, +}; use crate::table::Table; use crate::Result; @@ -38,51 +44,72 @@ impl<'a> SnapshotsTable<'a> { Self { table } } - /// Returns the schema of the snapshots table. - pub fn schema(&self) -> Schema { - Schema::new(vec![ - Field::new( + /// Returns the iceberg schema of the snapshots table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required( + 1, "committed_at", - DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), - false, + Type::Primitive(PrimitiveType::Timestamptz), ), - Field::new("snapshot_id", DataType::Int64, false), - Field::new("parent_id", DataType::Int64, true), - Field::new("operation", DataType::Utf8, false), - Field::new("manifest_list", DataType::Utf8, false), - Field::new( + NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional(4, "operation", Type::Primitive(PrimitiveType::String)), + NestedField::optional(5, "manifest_list", Type::Primitive(PrimitiveType::String)), + NestedField::optional( + 6, "summary", - DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct( - vec![ - Field::new("keys", DataType::Utf8, false), - Field::new("values", DataType::Utf8, true), - ] - .into(), - ), + Type::Map(MapType { + key_field: Arc::new(NestedField::map_key_element( + 7, + Type::Primitive(PrimitiveType::String), + )), + value_field: Arc::new(NestedField::map_value_element( + 8, + Type::Primitive(PrimitiveType::String), false, )), - false, - ), - false, + }), ), - ]) + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() } /// Scans the snapshots table. pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); + PrimitiveBuilder::::new().with_timezone("+00:00"); let mut snapshot_id = PrimitiveBuilder::::new(); let mut parent_id = PrimitiveBuilder::::new(); let mut operation = StringBuilder::new(); let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - + let mut summary = MapBuilder::new( + Some(MapFieldNames { + entry: DEFAULT_MAP_FIELD_NAME.to_string(), + key: MAP_KEY_FIELD_NAME.to_string(), + value: MAP_VALUE_FIELD_NAME.to_string(), + }), + StringBuilder::new(), + StringBuilder::new(), + ) + .with_keys_field(Arc::new( + Field::new(MAP_KEY_FIELD_NAME, DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + )) + .with_values_field(Arc::new( + Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8, true).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()), + ])), + )); for snapshot in self.table.metadata().snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); + committed_at.append_value(snapshot.timestamp_ms() * 1000); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); manifest_list.append_value(snapshot.manifest_list()); @@ -94,7 +121,7 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ + let batch = RecordBatch::try_new(Arc::new(schema), vec![ Arc::new(committed_at.finish()), Arc::new(snapshot_id.finish()), Arc::new(parent_id.finish()), @@ -123,14 +150,14 @@ mod tests { check_record_batches( batch_stream, expect![[r#" - Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + Field { name: "committed_at", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, + Field { name: "operation", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, + Field { name: "manifest_list", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} }, + Field { name: "summary", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]], expect![[r#" - committed_at: PrimitiveArray + committed_at: PrimitiveArray [ 2018-01-04T21:22:35.770+00:00, 2019-04-12T20:29:15.770+00:00, @@ -158,11 +185,11 @@ mod tests { [ ] [ - -- child 0: "keys" (Utf8) + -- child 0: "key" (Utf8) StringArray [ ] - -- child 1: "values" (Utf8) + -- child 1: "value" (Utf8) StringArray [ ] @@ -172,11 +199,11 @@ mod tests { [ ] [ - -- child 0: "keys" (Utf8) + -- child 0: "key" (Utf8) StringArray [ ] - -- child 1: "values" (Utf8) + -- child 1: "value" (Utf8) StringArray [ ]