diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml
index fb5ca5912ba6..053f6556573c 100644
--- a/extensions-contrib/druid-deltalake-extensions/pom.xml
+++ b/extensions-contrib/druid-deltalake-extensions/pom.xml
@@ -35,7 +35,7 @@
4.0.0
- 3.1.0
+ 3.2.0
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
index 56ccd2a41ae9..7d126caef349 100644
--- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
@@ -28,12 +28,12 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
-import io.delta.kernel.TableNotFoundException;
-import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
@@ -120,7 +120,7 @@ public boolean needsFormat()
/**
* Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied,
- * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is
+ * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, the Delta engine is
* instantiated with the supplied configuration to read the table.
*
* @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow}
@@ -134,40 +134,40 @@ public InputSourceReader reader(
File temporaryDirectory
)
{
- final TableClient tableClient = createTableClient();
+ final Engine engine = createDeltaEngine();
try {
final List> scanFileDataIters = new ArrayList<>();
if (deltaSplit != null) {
- final Row scanState = deserialize(tableClient, deltaSplit.getStateRow());
+ final Row scanState = deserialize(engine, deltaSplit.getStateRow());
final StructType physicalReadSchema =
- ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
+ ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
for (String file : deltaSplit.getFiles()) {
- final Row scanFile = deserialize(tableClient, file);
+ final Row scanFile = deserialize(engine, file);
scanFileDataIters.add(
- getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, Optional.empty())
+ getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, Optional.empty())
);
}
} else {
- final Table table = Table.forPath(tableClient, tablePath);
- final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
- final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
+ final Table table = Table.forPath(engine, tablePath);
+ final Snapshot latestSnapshot = table.getLatestSnapshot(engine);
+ final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType prunedSchema = pruneSchema(
fullSnapshotSchema,
inputRowSchema.getColumnsFilter()
);
- final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
+ final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
- scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
+ scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
- final Scan scan = scanBuilder.withReadSchema(tableClient, prunedSchema).build();
- final CloseableIterator scanFilesIter = scan.getScanFiles(tableClient);
- final Row scanState = scan.getScanState(tableClient);
+ final Scan scan = scanBuilder.withReadSchema(engine, prunedSchema).build();
+ final CloseableIterator scanFilesIter = scan.getScanFiles(engine);
+ final Row scanState = scan.getScanState(engine);
final StructType physicalReadSchema =
- ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
+ ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
while (scanFilesIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFilesIter.next();
@@ -176,7 +176,7 @@ public InputSourceReader reader(
while (scanFileRows.hasNext()) {
final Row scanFile = scanFileRows.next();
scanFileDataIters.add(
- getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
+ getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
);
}
}
@@ -203,26 +203,26 @@ public Stream> createSplits(InputFormat inputFormat, @Nul
return Stream.of(new InputSplit<>(deltaSplit));
}
- final TableClient tableClient = createTableClient();
+ final Engine engine = createDeltaEngine();
final Snapshot latestSnapshot;
+ final Table table = Table.forPath(engine, tablePath);
try {
- final Table table = Table.forPath(tableClient, tablePath);
- latestSnapshot = table.getLatestSnapshot(tableClient);
+ latestSnapshot = table.getLatestSnapshot(engine);
}
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
}
- final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
+ final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
- final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
+ final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
- scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
+ scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
- final Scan scan = scanBuilder.withReadSchema(tableClient, fullSnapshotSchema).build();
+ final Scan scan = scanBuilder.withReadSchema(engine, fullSnapshotSchema).build();
// scan files iterator for the current snapshot
- final CloseableIterator scanFilesIterator = scan.getScanFiles(tableClient);
+ final CloseableIterator scanFilesIterator = scan.getScanFiles(engine);
- final Row scanState = scan.getScanState(tableClient);
+ final Row scanState = scan.getScanState(engine);
final String scanStateStr = RowSerde.serializeRowToJson(scanState);
Iterator deltaSplitIterator = Iterators.transform(
@@ -256,9 +256,9 @@ public InputSource withSplit(InputSplit split)
);
}
- private Row deserialize(TableClient tableClient, String row)
+ private Row deserialize(Engine engine, String row)
{
- return RowSerde.deserializeRowFromJson(tableClient, row);
+ return RowSerde.deserializeRowFromJson(engine, row);
}
/**
@@ -285,17 +285,17 @@ private StructType pruneSchema(final StructType baseSchema, final ColumnsFilter
}
/**
- * @return a table client where the client is initialized with {@link Configuration} class that uses the class's
+ * @return a Delta engine initialized with {@link Configuration} class that uses the class's
* class loader instead of the context classloader. The latter by default doesn't know about the extension classes,
- * so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}.
+ * so the Delta engine cannot load runtime classes resulting in {@link ClassNotFoundException}.
*/
- private TableClient createTableClient()
+ private Engine createDeltaEngine()
{
final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Configuration conf = new Configuration();
- return DefaultTableClient.create(conf);
+ return DefaultEngine.create(conf);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxClassloader);
@@ -308,7 +308,7 @@ private TableClient createTableClient()
* SingleThreadedTableReader.java.
*/
private CloseableIterator getTransformedDataIterator(
- final TableClient tableClient,
+ final Engine engine,
final Row scanState,
final Row scanFile,
final StructType physicalReadSchema,
@@ -317,14 +317,14 @@ private CloseableIterator getTransformedDataIterator(
{
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
- final CloseableIterator physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
+ final CloseableIterator physicalDataIter = engine.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
optionalPredicate
);
return Scan.transformPhysicalData(
- tableClient,
+ engine,
scanState,
scanFile,
physicalDataIter
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
index e37c5d503314..bad6191496d7 100644
--- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
@@ -23,9 +23,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BooleanType;
@@ -84,13 +84,12 @@ public static String serializeRowToJson(Row row)
/**
* Utility method to deserialize a {@link Row} object from the JSON form.
*/
- public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema)
+ public static Row deserializeRowFromJson(Engine engine, String jsonRowWithSchema)
{
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
JsonNode schemaNode = jsonNode.get("schema");
- StructType schema =
- tableClient.getJsonHandler().deserializeStructType(schemaNode.asText());
+ StructType schema = engine.getJsonHandler().deserializeStructType(schemaNode.asText());
return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
}
catch (JsonProcessingException e) {
diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
index 9e597894d05c..4e1c2566f02e 100644
--- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
+++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
@@ -20,12 +20,12 @@
package org.apache.druid.delta.input;
import io.delta.kernel.Scan;
-import io.delta.kernel.TableNotFoundException;
-import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
@@ -68,13 +68,13 @@ public void testDeltaInputRow(
final List