From fcee74d70f05fef37652045ae12f08115e254db4 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 23 May 2024 12:32:50 -0700 Subject: [PATCH 1/3] Toggle case sensitivity while reading columns from iceberg --- docs/ingestion/input-sources.md | 24 +++++++------- .../iceberg/input/HiveIcebergCatalog.java | 11 +++++++ .../druid/iceberg/input/IcebergCatalog.java | 9 +++++ .../druid/iceberg/input/LocalCatalog.java | 13 +++++++- .../iceberg/input/HiveIcebergCatalogTest.java | 3 ++ .../iceberg/input/IcebergInputSourceTest.java | 33 +++++++++++++++++-- .../druid/iceberg/input/LocalCatalogTest.java | 2 +- 7 files changed, 79 insertions(+), 16 deletions(-) diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index f01bb26096cd..559aeda75b0b 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -1065,20 +1065,22 @@ The catalog object supports `local` and `hive` catalog types. The following table lists the properties of a `local` catalog: -|Property|Description|Required| -|--------|-----------|---------| -|type|Set this value to `local`.|yes| -|warehousePath|The location of the warehouse associated with the catalog|yes| -|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no| +|Property|Description|Default|Required| +|--------|-----------|-------|---------| +|type|Set this value to `local`.|None|yes| +|warehousePath|The location of the warehouse associated with the catalog.|None|yes| +|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no| +|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no| The following table lists the properties of a `hive` catalog: -|Property|Description|Required| -|--------|-----------|---------| -|type|Set this value to `hive`.|yes| -|warehousePath|The location of the warehouse associated with the catalog|yes| -|catalogUri|The URI associated with the hive catalog|yes| -|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no| +|Property|Description|Default|Required| +|--------|-----------|-------|---------| +|type|Set this value to `hive`.|None|yes| +|warehousePath|The location of the warehouse associated with the catalog.|None|yes| +|catalogUri|The URI associated with the hive catalog.|None|yes| +|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no| +|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no| ### Iceberg filter object diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java index a22f886cb31b..7b659d031296 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java @@ -57,6 +57,9 @@ public class HiveIcebergCatalog extends IcebergCatalog @JsonProperty private Map catalogProperties; + @JsonProperty + private final Boolean caseSensitive; + private final Configuration configuration; private BaseMetastoreCatalog hiveCatalog; @@ -69,6 +72,7 @@ public HiveIcebergCatalog( @JsonProperty("catalogUri") String catalogUri, @JsonProperty("catalogProperties") @Nullable Map catalogProperties, + @JsonProperty("caseSensitive") Boolean caseSensitive, @JacksonInject @Json ObjectMapper mapper, @JacksonInject @HiveConf Configuration configuration ) @@ -76,6 +80,7 @@ public HiveIcebergCatalog( this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null"); this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null"); this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper); + this.caseSensitive = caseSensitive == null ? true : caseSensitive; this.configuration = configuration; this.catalogProperties .forEach(this.configuration::set); @@ -137,4 +142,10 @@ public Map getCatalogProperties() { return catalogProperties; } + + @Override + public boolean isCaseSensitive() + { + return caseSensitive; + } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 07b41b6e10b1..5be8e5b31e46 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -50,6 +50,11 @@ public abstract class IcebergCatalog public abstract BaseMetastoreCatalog retrieveCatalog(); + public boolean isCaseSensitive() + { + return true; + } + /** * Extract the iceberg data files upto the latest snapshot associated with the table * @@ -92,6 +97,10 @@ public List extractSnapshotDataFiles( if (snapshotTime != null) { tableScan = tableScan.asOfTime(snapshotTime.getMillis()); } + //Default case sensitivity is true for Iceberg TableScanContext + if (!isCaseSensitive()) { + tableScan = tableScan.caseSensitive(isCaseSensitive()); + } CloseableIterable tasks = tableScan.planFiles(); CloseableIterable.transform(tasks, FileScanTask::file) .forEach(dataFile -> dataFilePaths.add(dataFile.path().toString())); diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java index d4961bb09678..ede56a495521 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java @@ -43,18 +43,23 @@ public class LocalCatalog extends IcebergCatalog @JsonProperty private final Map catalogProperties; + @JsonProperty + private final Boolean caseSensitive; + private BaseMetastoreCatalog catalog; @JsonCreator public LocalCatalog( @JsonProperty("warehousePath") String warehousePath, @JsonProperty("catalogProperties") @Nullable - Map catalogProperties + Map catalogProperties, + @JsonProperty("caseSensitive") Boolean caseSensitive ) { Preconditions.checkNotNull(warehousePath, "warehousePath is null"); this.warehousePath = warehousePath; this.catalogProperties = catalogProperties; + this.caseSensitive = caseSensitive == null ? true : caseSensitive; this.catalog = retrieveCatalog(); } @@ -71,6 +76,12 @@ public Map getCatalogProperties() return catalogProperties; } + @Override + public boolean isCaseSensitive() + { + return caseSensitive; + } + @Override public BaseMetastoreCatalog retrieveCatalog() { diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java index def778ee9063..d7e181e2c1cb 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java @@ -42,6 +42,7 @@ public void testCatalogCreate() warehouseDir.getPath(), "hdfs://testuri", new HashMap<>(), + true, mapper, new Configuration() ); @@ -49,6 +50,7 @@ public void testCatalogCreate() warehouseDir.getPath(), "hdfs://testuri", null, + null, mapper, new Configuration() ); @@ -68,6 +70,7 @@ public void testAuthenticate() warehouseDir.getPath(), "hdfs://testuri", catalogMap, + null, mapper, new Configuration() ); diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 668a34b352b1..5a2429d6c7cf 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -67,6 +67,7 @@ public class IcebergInputSourceTest private IcebergCatalog testCatalog; private TableIdentifier tableIdentifier; + private File warehouseDir; private Schema tableSchema = new Schema( Types.NestedField.required(1, "id", Types.StringType.get()), @@ -80,8 +81,8 @@ public class IcebergInputSourceTest @Before public void setup() throws IOException { - final File warehouseDir = FileUtils.createTempDir(); - testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); createAndLoadTable(tableIdentifier); @@ -187,6 +188,33 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException Assert.assertEquals(1, splits.count()); } + @Test + public void testCaseInsensitiveFiltering() throws IOException + { + LocalCatalog caseInsensitiveCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), false); + Table icebergTableFromSchema = testCatalog.retrieveCatalog().loadTable(tableIdentifier); + + icebergTableFromSchema.updateSchema().renameColumn("name", "Name").commit(); + IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("name", "Foo"), + caseInsensitiveCatalog, + new LocalInputSourceFactory(), + null + ); + + Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); + List localInputSourceList = splits.map(inputSource::withSplit) + .map(inpSource -> (LocalInputSource) inpSource) + .map(LocalInputSource::getFiles) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null))); + Assert.assertEquals(1, localInputSourceList.size()); + } + @After public void tearDown() { @@ -197,7 +225,6 @@ private void createAndLoadTable(TableIdentifier tableIdentifier) throws IOExcept { //Setup iceberg table and schema Table icebergTableFromSchema = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema); - //Generate an iceberg record and write it to a file GenericRecord record = GenericRecord.create(tableSchema); ImmutableList.Builder builder = ImmutableList.builder(); diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java index b0a7b5528a17..69b1df595158 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java @@ -36,7 +36,7 @@ public void testCatalogSerDe() throws JsonProcessingException { final File warehouseDir = FileUtils.createTempDir(); DefaultObjectMapper mapper = new DefaultObjectMapper(); - LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); LocalCatalog after = mapper.readValue( mapper.writeValueAsString(before), LocalCatalog.class); Assert.assertEquals(before, after); From a51324d9dc34deee7f7e8679b3415d2cde1765f3 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 23 May 2024 16:29:41 -0700 Subject: [PATCH 2/3] Fix tests --- .../java/org/apache/druid/iceberg/input/LocalCatalog.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java index ede56a495521..4539a582670f 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java @@ -111,12 +111,13 @@ public boolean equals(Object o) } LocalCatalog that = (LocalCatalog) o; return warehousePath.equals(that.warehousePath) - && Objects.equals(catalogProperties, that.catalogProperties); + && Objects.equals(catalogProperties, that.catalogProperties) + && Objects.equals(caseSensitive, that.caseSensitive); } @Override public int hashCode() { - return Objects.hash(warehousePath, catalogProperties); + return Objects.hash(warehousePath, catalogProperties, caseSensitive); } } From 34ba4108f2db8508685cbe2b26a9e9e37943c2e6 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Thu, 30 May 2024 14:59:43 -0700 Subject: [PATCH 3/3] Drop case check and set unconditionally --- .../java/org/apache/druid/iceberg/input/IcebergCatalog.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 5be8e5b31e46..fe08dedef4d5 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -97,10 +97,8 @@ public List extractSnapshotDataFiles( if (snapshotTime != null) { tableScan = tableScan.asOfTime(snapshotTime.getMillis()); } - //Default case sensitivity is true for Iceberg TableScanContext - if (!isCaseSensitive()) { - tableScan = tableScan.caseSensitive(isCaseSensitive()); - } + + tableScan = tableScan.caseSensitive(isCaseSensitive()); CloseableIterable tasks = tableScan.planFiles(); CloseableIterable.transform(tasks, FileScanTask::file) .forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));