Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -1061,20 +1061,22 @@ The catalog object supports `local` and `hive` catalog types.

The following table lists the properties of a `local` catalog:

|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `local`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set this value to `local`.|None|yes|
|warehousePath|The location of the warehouse associated with the catalog.|None|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no|

The following table lists the properties of a `hive` catalog:

|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `hive`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogUri|The URI associated with the hive catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set this value to `hive`.|None|yes|
|warehousePath|The location of the warehouse associated with the catalog.|None|yes|
|catalogUri|The URI associated with the hive catalog.|None|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no|

### Iceberg filter object

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class HiveIcebergCatalog extends IcebergCatalog
@JsonProperty
private Map<String, String> catalogProperties;

@JsonProperty
private final Boolean caseSensitive;

private final Configuration configuration;

private BaseMetastoreCatalog hiveCatalog;
Expand All @@ -69,13 +72,15 @@ public HiveIcebergCatalog(
@JsonProperty("catalogUri") String catalogUri,
@JsonProperty("catalogProperties") @Nullable
Map<String, Object> catalogProperties,
@JsonProperty("caseSensitive") Boolean caseSensitive,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject @HiveConf Configuration configuration
)
{
this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null");
this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null");
this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper);
this.caseSensitive = caseSensitive == null ? true : caseSensitive;
this.configuration = configuration;
this.catalogProperties
.forEach(this.configuration::set);
Expand Down Expand Up @@ -137,4 +142,10 @@ public Map<String, String> getCatalogProperties()
{
return catalogProperties;
}

@Override
public boolean isCaseSensitive()
{
return caseSensitive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public abstract class IcebergCatalog

public abstract BaseMetastoreCatalog retrieveCatalog();

public boolean isCaseSensitive()
{
return true;
}

/**
* Extract the iceberg data files upto the latest snapshot associated with the table
*
Expand Down Expand Up @@ -92,6 +97,8 @@ public List<String> extractSnapshotDataFiles(
if (snapshotTime != null) {
tableScan = tableScan.asOfTime(snapshotTime.getMillis());
}

tableScan = tableScan.caseSensitive(isCaseSensitive());
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ public class LocalCatalog extends IcebergCatalog
@JsonProperty
private final Map<String, String> catalogProperties;

@JsonProperty
private final Boolean caseSensitive;

private BaseMetastoreCatalog catalog;

@JsonCreator
public LocalCatalog(
@JsonProperty("warehousePath") String warehousePath,
@JsonProperty("catalogProperties") @Nullable
Map<String, String> catalogProperties
Map<String, String> catalogProperties,
@JsonProperty("caseSensitive") Boolean caseSensitive
)
{
Preconditions.checkNotNull(warehousePath, "warehousePath is null");
this.warehousePath = warehousePath;
this.catalogProperties = catalogProperties;
this.caseSensitive = caseSensitive == null ? true : caseSensitive;
this.catalog = retrieveCatalog();

}
Expand All @@ -71,6 +76,12 @@ public Map<String, String> getCatalogProperties()
return catalogProperties;
}

@Override
public boolean isCaseSensitive()
{
return caseSensitive;
}

@Override
public BaseMetastoreCatalog retrieveCatalog()
{
Expand Down Expand Up @@ -100,12 +111,13 @@ public boolean equals(Object o)
}
LocalCatalog that = (LocalCatalog) o;
return warehousePath.equals(that.warehousePath)
&& Objects.equals(catalogProperties, that.catalogProperties);
&& Objects.equals(catalogProperties, that.catalogProperties)
&& Objects.equals(caseSensitive, that.caseSensitive);
}

@Override
public int hashCode()
{
return Objects.hash(warehousePath, catalogProperties);
return Objects.hash(warehousePath, catalogProperties, caseSensitive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ public void testCatalogCreate()
warehouseDir.getPath(),
"hdfs://testuri",
new HashMap<>(),
true,
mapper,
new Configuration()
);
HiveIcebergCatalog hiveCatalogNullProps = new HiveIcebergCatalog(
warehouseDir.getPath(),
"hdfs://testuri",
null,
null,
mapper,
new Configuration()
);
Expand All @@ -68,6 +70,7 @@ public void testAuthenticate()
warehouseDir.getPath(),
"hdfs://testuri",
catalogMap,
null,
mapper,
new Configuration()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class IcebergInputSourceTest

private IcebergCatalog testCatalog;
private TableIdentifier tableIdentifier;
private File warehouseDir;

private Schema tableSchema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Expand All @@ -80,8 +81,8 @@ public class IcebergInputSourceTest
@Before
public void setup() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);
Expand Down Expand Up @@ -187,6 +188,33 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException
Assert.assertEquals(1, splits.count());
}

@Test
public void testCaseInsensitiveFiltering() throws IOException
{
LocalCatalog caseInsensitiveCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), false);
Table icebergTableFromSchema = testCatalog.retrieveCatalog().loadTable(tableIdentifier);

icebergTableFromSchema.updateSchema().renameColumn("name", "Name").commit();
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("name", "Foo"),
caseInsensitiveCatalog,
new LocalInputSourceFactory(),
null
);

Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
.map(inpSource -> (LocalInputSource) inpSource)
.map(LocalInputSource::getFiles)
.flatMap(List::stream)
.collect(Collectors.toList());

Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null)));
Assert.assertEquals(1, localInputSourceList.size());
}

@After
public void tearDown()
{
Expand All @@ -197,7 +225,6 @@ private void createAndLoadTable(TableIdentifier tableIdentifier) throws IOExcept
{
//Setup iceberg table and schema
Table icebergTableFromSchema = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema);

//Generate an iceberg record and write it to a file
GenericRecord record = GenericRecord.create(tableSchema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testCatalogSerDe() throws JsonProcessingException
{
final File warehouseDir = FileUtils.createTempDir();
DefaultObjectMapper mapper = new DefaultObjectMapper();
LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
LocalCatalog after = mapper.readValue(
mapper.writeValueAsString(before), LocalCatalog.class);
Assert.assertEquals(before, after);
Expand Down