Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 22 additions & 42 deletions tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,40 +500,34 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
// In this case, we should use default tablespace.
Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, ""));

List<PartitionDescProto> partitions = queryContext.hasPartition() ? query.getPartitions() : null;
Path finalOutputDir = space.commitTable(
query.context.getQueryContext(),
lastStage.getId(),
lastStage.getMasterPlan().getLogicalPlan(),
lastStage.getSchema(),
tableDesc);
query.context.getQueryContext(),
lastStage.getId(),
lastStage.getMasterPlan().getLogicalPlan(),
lastStage.getSchema(),
tableDesc,
partitions);

QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);

// Add dynamic partitions to catalog for partition table.
if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) {
List<PartitionDescProto> partitions = query.getPartitions();
if (partitions != null) {
// Set contents length and file count to PartitionDescProto by listing final output directories.
List<PartitionDescProto> finalPartitions = getPartitionsWithContentsSummary(query.systemConf,
finalOutputDir, partitions);

String databaseName, simpleTableName;
if (CatalogUtil.isFQTableName(tableDesc.getName())) {
String[] split = CatalogUtil.splitFQTableName(tableDesc.getName());
databaseName = split[0];
simpleTableName = split[1];
} else {
databaseName = queryContext.getCurrentDatabase();
simpleTableName = tableDesc.getName();
}

// Store partitions to CatalogStore using alter table statement.
catalog.addPartitions(databaseName, simpleTableName, finalPartitions, true);
LOG.info("Added partitions to catalog (total=" + partitions.size() + ")");
if (!query.getPartitions().isEmpty()) {
String databaseName, simpleTableName;

if (CatalogUtil.isFQTableName(tableDesc.getName())) {
String[] split = CatalogUtil.splitFQTableName(tableDesc.getName());
databaseName = split[0];
simpleTableName = split[1];
} else {
LOG.info("Can't find partitions for adding.");
databaseName = queryContext.getCurrentDatabase();
simpleTableName = tableDesc.getName();
}

// Store partitions to CatalogStore using alter table statement.
catalog.addPartitions(databaseName, simpleTableName, partitions, true);
LOG.info("Added partitions to catalog (total=" + partitions.size() + ")");
query.clearPartitions();
}
} catch (Throwable e) {
Expand All @@ -546,21 +540,6 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
return QueryState.QUERY_SUCCEEDED;
}

private List<PartitionDescProto> getPartitionsWithContentsSummary(TajoConf conf, Path outputDir,
List<PartitionDescProto> partitions) throws IOException {
List<PartitionDescProto> finalPartitions = new ArrayList<>();

FileSystem fileSystem = outputDir.getFileSystem(conf);
for (PartitionDescProto partition : partitions) {
PartitionDescProto.Builder builder = partition.toBuilder();
Path partitionPath = new Path(outputDir, partition.getPath());
ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath);
builder.setNumBytes(contentSummary.getLength());
finalPartitions.add(builder.build());
}
return finalPartitions;
}

private static interface QueryHook {
boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
Expand Down Expand Up @@ -695,7 +674,8 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo
tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
}

stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
long totalVolume = getTableVolume(query.systemConf, finalOutputDir);
stats.setNumBytes(totalVolume);
tableDescTobeCreated.setStats(stats);
query.setResultDesc(tableDescTobeCreated);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoRuntimeException;
Expand Down Expand Up @@ -363,7 +364,7 @@ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoEx
public abstract Path commitTable(OverridableConf queryContext,
ExecutionBlockId finalEbId,
LogicalPlan plan, Schema schema,
TableDesc tableDesc) throws IOException;
TableDesc tableDesc, List<PartitionDescProto> partitions) throws IOException;

public abstract void rollbackTable(LogicalNode node) throws IOException, TajoException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
Expand Down Expand Up @@ -911,7 +912,7 @@ public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping
@Override
public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId,
LogicalPlan plan, Schema schema,
TableDesc tableDesc) throws IOException {
TableDesc tableDesc, List<PartitionDescProto> partitions) throws IOException {
if (tableDesc == null) {
throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
}
Expand Down
Loading