diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 0cd178f464..18d2f5e3e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -500,40 +500,34 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { // In this case, we should use default tablespace. Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")); + List partitions = queryContext.hasPartition() ? query.getPartitions() : null; Path finalOutputDir = space.commitTable( - query.context.getQueryContext(), - lastStage.getId(), - lastStage.getMasterPlan().getLogicalPlan(), - lastStage.getSchema(), - tableDesc); + query.context.getQueryContext(), + lastStage.getId(), + lastStage.getMasterPlan().getLogicalPlan(), + lastStage.getSchema(), + tableDesc, + partitions); QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); // Add dynamic partitions to catalog for partition table. - if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { - List partitions = query.getPartitions(); - if (partitions != null) { - // Set contents length and file count to PartitionDescProto by listing final output directories. - List finalPartitions = getPartitionsWithContentsSummary(query.systemConf, - finalOutputDir, partitions); - - String databaseName, simpleTableName; - if (CatalogUtil.isFQTableName(tableDesc.getName())) { - String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); - databaseName = split[0]; - simpleTableName = split[1]; - } else { - databaseName = queryContext.getCurrentDatabase(); - simpleTableName = tableDesc.getName(); - } - - // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, finalPartitions, true); - LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); + if (!query.getPartitions().isEmpty()) { + String databaseName, simpleTableName; + + if (CatalogUtil.isFQTableName(tableDesc.getName())) { + String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); + databaseName = split[0]; + simpleTableName = split[1]; } else { - LOG.info("Can't find partitions for adding."); + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = tableDesc.getName(); } + + // Store partitions to CatalogStore using alter table statement. + catalog.addPartitions(databaseName, simpleTableName, partitions, true); + LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); query.clearPartitions(); } } catch (Throwable e) { @@ -546,21 +540,6 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { return QueryState.QUERY_SUCCEEDED; } - private List getPartitionsWithContentsSummary(TajoConf conf, Path outputDir, - List partitions) throws IOException { - List finalPartitions = new ArrayList<>(); - - FileSystem fileSystem = outputDir.getFileSystem(conf); - for (PartitionDescProto partition : partitions) { - PartitionDescProto.Builder builder = partition.toBuilder(); - Path partitionPath = new Path(outputDir, partition.getPath()); - ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath); - builder.setNumBytes(contentSummary.getLength()); - finalPartitions.add(builder.build()); - } - return finalPartitions; - } - private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, @@ -695,7 +674,8 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod()); } - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + long totalVolume = getTableVolume(query.systemConf, finalOutputDir); + stats.setNumBytes(totalVolume); tableDescTobeCreated.setStats(stats); query.setResultDesc(tableDescTobeCreated); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 00e6d75a12..51e047112d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -25,6 +25,7 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoRuntimeException; @@ -363,7 +364,7 @@ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoEx public abstract Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException; + TableDesc tableDesc, List partitions) throws IOException; public abstract void rollbackTable(LogicalNode node) throws IOException, TajoException; diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 132ceff0ae..4260e8ecc6 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -40,6 +40,7 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -911,7 +912,7 @@ public Pair getIndexablePredicateValue(ColumnMapping columnMapping @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { + TableDesc tableDesc, List partitions) throws IOException { if (tableDesc == null) { throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 3d12a409e6..4470d2c261 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -33,6 +33,7 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnsupportedException; @@ -50,6 +51,7 @@ import java.text.NumberFormat; import java.util.*; +import static java.lang.String.format; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; @@ -766,8 +768,8 @@ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, - Schema schema, TableDesc tableDesc) throws IOException { - return commitOutputData(queryContext, true); + Schema schema, TableDesc tableDesc, List partitions) throws IOException { + return commitOutputData(queryContext, true, partitions); } @Override @@ -787,177 +789,241 @@ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc * @return Saved path * @throws java.io.IOException */ - protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { + protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq, + List partitions) throws IOException { + Path finalOutputDir = null; Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + OutputCommitHandle commitHandle = new OutputCommitHandle(); + if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); - try { - FileSystem fs = stagingResultDir.getFileSystem(conf); + boolean checkExistingPartition = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); + try { if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + if (partitions != null) { + commitInsertOverwriteOrCreateWithPartition(stagingResultDir, finalOutputDir, oldTableDir, partitions, + checkExistingPartition, commitHandle); + } else { + commitInsertOverwrite(stagingResultDir, finalOutputDir, oldTableDir); + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + Preconditions.checkNotNull(queryContext); + if (queryType.equals(NodeType.INSERT.name())) { // INSERT INTO + if (partitions != null) { + commitInsertWithPartition(stagingResultDir, finalOutputDir, partitions, commitHandle, changeFileSeq); + } else { + commitInsert(stagingResultDir, finalOutputDir, changeFileSeq); + } + cleanupTemporaryDirectory(stagingResultDir); + } else if (queryType.equals(NodeType.CREATE_TABLE.name())){ // CREATE TABLE AS SELECT (CTAS) + if (partitions != null) { + commitInsertOverwriteOrCreateWithPartition(stagingResultDir, finalOutputDir, oldTableDir, partitions, + checkExistingPartition, commitHandle); + } else { + commitCreate(stagingResultDir, finalOutputDir); + } + } else { + throw new IOException("Cannot handle query type:" + queryType); + } + } - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); - - // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. - boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); - - // If existing data doesn't need to keep, check if there are some files. - if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) - && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map renameDirs = new HashMap<>(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map recoveryDirs = new HashMap<>(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + rollback(stagingResultDir, finalOutputDir, oldTableDir, commitHandle); + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + return finalOutputDir; + } - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } + private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, Path finalOutputDir, + Path oldTableDir, List partitions, boolean checkExistingPartition, + OutputCommitHandle commitHandle) throws IOException { + String stagingResultPath = stagingResultDir.toString(); + String finalOutputPath = finalOutputDir.toString(); + String oldTablePath = oldTableDir.toString(); - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } + partitions.stream().forEach(partition -> { + try { + Path targetPath = new Path(partition.getPath() + "/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); + Path backupPath = new Path(partition.getPath().replaceAll(finalOutputPath, oldTablePath)); + + // Move existing directory to backup directory. + if (checkExistingPartition && fs.exists(targetPath)) { + renameDirectory(targetPath, backupPath); + commitHandle.addBackupPath(backupPath); + } - // Recovery renamed dirs - for(Map.Entry entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } + // Move staging directory to target directory + commitTask(stagingPath, targetPath); + commitHandle.addTargetPath(targetPath); + + // Summarize the volume of partitions + long totalSize = calculateSize(targetPath); + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(totalSize); + PartitionDescProto partitionDescProto = builder.build(); + commitHandle.addPartition(partitionDescProto); + } catch (IOException e) { + throw new ConcurrentModificationException(); + } + }); + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); + } - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { + private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDir, + List partitions, OutputCommitHandle commitHandle, boolean changeFileSeq) throws IOException { + String stagingResultPath = stagingResultDir.toString(); + String finalOutputPath = finalOutputDir.toString(); - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } + partitions.stream().forEach(partition -> { + try { + Path targetPath = new Path(partition.getPath() + "/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } + if (!fs.exists(targetPath)) { + commitTask(stagingPath, targetPath); + } else { + moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, + changeFileSeq, commitHandle); + } - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } + // Summarize the volume of partitions + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(calculateSize(targetPath)); + commitHandle.addPartition(builder.build()); + } catch (IOException e) { + throw new ConcurrentModificationException(); + } + }); + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); + } - // Check the final output dir - committed = fs.exists(finalOutputDir); + private void rollback(Path stagingResultDir, Path finalOutputDir, Path oldTableDir, + OutputCommitHandle commitHandle) throws IOException { + String finalOutputPath = finalOutputDir.toString(); + String oldTablePath = oldTableDir != null ? oldTableDir.toString() : null; - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { + // Delete data from the output directory + List targetPaths = commitHandle.getTargetPaths(); + for(Path targetPath: targetPaths) { + fs.delete(targetPath, true); + } - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } + // Move from backup directory to output directory + List backupPaths = commitHandle.getBackupPaths(); + for(Path backupPath: backupPaths) { + Path targetPath = new Path(backupPath.toString().replaceAll(oldTablePath, finalOutputPath)); + fs.delete(targetPath, true); + renameDirectory(backupPath, targetPath); + } - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } + // Delete staging directory + fs.delete(stagingResultDir, true); + } - throw new IOException(ioe.getMessage()); - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + private void commitInsertOverwrite(Path stagingResultDir, Path finalOutputDir, Path oldTableDir) throws IOException { + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + try { + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + commitTask(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.delete(status.getPath(), true); } - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = stagingDir.getParent(); - fs.delete(stagingDirRoot, true); - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + + private void commitInsert(Path stagingResultDir, Path finalOutputDir, boolean changeFileSeq) throws IOException { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq, null); + } + } + + private void commitCreate(Path stagingResultDir, Path finalOutputDir) throws IOException { + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + commitTask(status.getPath(), finalOutputDir); } } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + commitTask(stagingResultDir, finalOutputDir); } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } - return finalOutputDir; + /** + * checking all file moved and remove empty dir + * @param stagingResultDir + * @throws IOException + */ + private void cleanupTemporaryDirectory(Path stagingResultDir) throws IOException { + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } } /** @@ -972,9 +1038,8 @@ protected Path commitOutputData(OverridableConf queryContext, boolean changeFile * @throws java.io.IOException */ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq, boolean changeFileSeq) throws IOException { + FileStatus fileStatus, Path finalOutputPath, NumberFormat nf, + int fileSeq, boolean changeFileSeq, OutputCommitHandle commitHandle) throws IOException { if (fileStatus.isDirectory()) { String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); if (subPath != null) { @@ -987,7 +1052,8 @@ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, if (eachFile.getPath().getName().startsWith("_")) { continue; } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq, + commitHandle); } } else { throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); @@ -1005,13 +1071,16 @@ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, if (fs.exists(finalSubPath)) { throw new IOException("Already exists data file:" + finalSubPath); } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + boolean success = commitTask(fileStatus.getPath(), finalSubPath); if (success) { LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + "to final output[" + finalSubPath + "]"); + if (commitHandle != null) { + commitHandle.addTargetPath(finalSubPath); + } } else { LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); + "to final output[" + finalSubPath + "]"); } } } @@ -1083,64 +1152,42 @@ private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOExc return true; } - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws java.io.IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } + protected void renameDirectory(Path sourcePath, Path targetPath) throws IOException { + try { + if (!fs.exists(targetPath.getParent())) { + createDirectory(targetPath.getParent()); + } + if (!rename(sourcePath, targetPath)) { + throw new IOException(format("Failed to rename %s to %s: rename returned false", sourcePath, targetPath)); } + } catch (IOException e) { + e.printStackTrace(); + throw new IOException(format("Failed to rename %s to %s", sourcePath, targetPath), e); } - } - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; + } - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; + protected void createDirectory(Path path) throws IOException { + try { + if (!fs.mkdirs(path)) { + throw new IOException(format("mkdirs %s returned false", path)); } + } catch (IOException e) { + throw new IOException("Failed to create directory:" + path, e); } - - return retValue; } + protected boolean rename(Path sourcePath, Path targetPath) throws IOException { + return fs.rename(sourcePath, targetPath); + } + protected boolean commitTask(Path sourcePath, Path targetPath) throws IOException { + try { + renameDirectory(sourcePath, targetPath); + return true; + } catch (IOException e) { + throw new IOException("Failed to commit Task - source:" + sourcePath + ", target:" + targetPath, e); + } + } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java new file mode 100644 index 0000000000..a71d2fb2bd --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class OutputCommitHandle { + + private List backupPaths; + private List targetPaths; + private Set partitions; + + public OutputCommitHandle() { + backupPaths = new ArrayList(); + targetPaths = new ArrayList(); + partitions = Collections.newSetFromMap(new ConcurrentHashMap<>()); + } + + public List getBackupPaths() { + return backupPaths; + } + + public void setBackupPaths(List backupPaths) { + this.backupPaths = backupPaths; + } + + public void addBackupPath(Path path) { + this.backupPaths.add(path); + } + + public List getTargetPaths() { + return targetPaths; + } + + public void setTargetPaths(List renamedPaths) { + this.targetPaths = renamedPaths; + } + + public void addTargetPath(Path path) { + this.targetPaths.add(path); + } + + public Set getPartitions() { + return partitions; + } + + public void setPartitions(Set partitions) { + this.partitions = partitions; + } + + public void addPartition(PartitionDescProto partition) { + this.partitions.add(partition); + } +} diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index fa6cf486e2..536e238c63 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -27,6 +27,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.TajoRuntimeException; @@ -178,7 +179,7 @@ public void prepareTable(LogicalNode node) throws IOException { @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { + TableDesc tableDesc, List partitions) throws IOException { throw new TajoRuntimeException(new NotImplementedException()); } diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml index a9a541aed1..681eea64cd 100644 --- a/tajo-storage/tajo-storage-s3/pom.xml +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -106,6 +106,11 @@ tajo-common provided + + org.apache.tajo + tajo-catalog-common + provided + org.apache.tajo tajo-storage-common diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 4bcdb60a68..e27131b623 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -18,14 +18,66 @@ package org.apache.tajo.storage.s3; +import java.io.IOException; import java.net.URI; -import org.apache.tajo.storage.FileTablespace; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryVars; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.*; import net.minidev.json.JSONObject; public class S3TableSpace extends FileTablespace { + private static final StorageProperty s3StorageProperties = new StorageProperty("TEXT", false, true, true, false); + public S3TableSpace(String spaceName, URI uri, JSONObject config) { super(spaceName, uri, config); } -} + + @Override + public StorageProperty getProperty() { + return s3StorageProperties; + } + + @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + if (!context.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { + String outputPath = stagingRootPath.toString(); + + URI stagingRootUri = stagingRootPath.toUri(); + + outputPath = outputPath.replaceAll(stagingRootUri.getScheme() + "://" + stagingRootUri.getHost(), "file://"); + + FileSystem localFileSystem = FileSystem.getLocal(conf); + Path stagingDir = localFileSystem.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, + queryId)); + + return stagingDir.toUri(); + } else { + return super.getStagingUri(context, queryId, meta); + } + } + + @Override + protected boolean commitTask(Path sourcePath, Path targetPath) throws IOException { + try { + FileSystem localFileSystem = FileSystem.getLocal(conf); + if (localFileSystem.isDirectory(sourcePath)) { + FileStatus[] statuses = localFileSystem.listStatus(sourcePath); + for(FileStatus status: statuses) { + this.fs.copyFromLocalFile(status.getPath(), targetPath); + } + } else { + this.fs.copyFromLocalFile(sourcePath, targetPath); + } + return true; + } catch (IOException e) { + throw new IOException("Failed to commit Task - source:" + sourcePath + ", target:" + targetPath, e); + } + } + +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java index 2d0677885c..e6a3a18340 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -19,8 +19,17 @@ package org.apache.tajo.storage.s3; import net.minidev.json.JSONObject; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.QueryVars; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TajoIdUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -29,6 +38,7 @@ import java.net.URI; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TestS3TableSpace { @@ -59,4 +69,33 @@ public void testTablespaceHandler() throws Exception { assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace); assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString()); } + + @Test + public void testGetStagingUriWithOutputTableUri() throws Exception { + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + QueryId queryId = QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0)); + conf.put(QueryVars.OUTPUT_TABLE_URI, S3_URI + "tajo/warehouse/test1"); + + S3TableSpace tableSpace = TablespaceManager.get(S3_URI); + assertNotNull(tableSpace); + + URI stagingUri = tableSpace.getStagingUri(conf, queryId.getId(), meta); + assertEquals(stagingUri.getScheme(), "file"); + assertTrue(stagingUri.toString().startsWith("file:///tmp/tajo")); + } + + @Test + public void testGetStagingUriWithoutOutputTableUri() throws Exception { + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + QueryId queryId = QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0)); + + S3TableSpace tableSpace = TablespaceManager.get(S3_URI); + assertNotNull(tableSpace); + + URI stagingUri = tableSpace.getStagingUri(conf, queryId.getId(), meta); + assertEquals(stagingUri.getScheme(), null); + assertTrue(stagingUri.toString().startsWith("/tmp/tajo")); + } }