From 145b8b209cee134ffdf548f2cf197cf97cf8c420 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 1 Feb 2016 10:34:01 +0900 Subject: [PATCH 01/13] TAJO-2063: Refactor FileTablespace::commitOutputData. --- .../org/apache/tajo/querymaster/Query.java | 70 ++- .../org/apache/tajo/storage/Tablespace.java | 3 +- .../tajo/storage/hbase/HBaseTablespace.java | 3 +- .../apache/tajo/storage/FileTablespace.java | 443 ++++++++++-------- .../tajo/storage/OutputCommitHandle.java | 74 +++ .../tajo/storage/jdbc/JdbcTablespace.java | 3 +- 6 files changed, 353 insertions(+), 243 deletions(-) create mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 0cd178f464..dae7058094 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -500,40 +500,34 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { // In this case, we should use default tablespace. Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")); + List partitions = queryContext.hasPartition() ? query.getPartitions() : null; Path finalOutputDir = space.commitTable( - query.context.getQueryContext(), - lastStage.getId(), - lastStage.getMasterPlan().getLogicalPlan(), - lastStage.getSchema(), - tableDesc); + query.context.getQueryContext(), + lastStage.getId(), + lastStage.getMasterPlan().getLogicalPlan(), + lastStage.getSchema(), + tableDesc, + partitions); QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); // Add dynamic partitions to catalog for partition table. - if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { - List partitions = query.getPartitions(); - if (partitions != null) { - // Set contents length and file count to PartitionDescProto by listing final output directories. - List finalPartitions = getPartitionsWithContentsSummary(query.systemConf, - finalOutputDir, partitions); - - String databaseName, simpleTableName; - if (CatalogUtil.isFQTableName(tableDesc.getName())) { - String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); - databaseName = split[0]; - simpleTableName = split[1]; - } else { - databaseName = queryContext.getCurrentDatabase(); - simpleTableName = tableDesc.getName(); - } - - // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, finalPartitions, true); - LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); + if (!query.getPartitions().isEmpty()) { + String databaseName, simpleTableName; + + if (CatalogUtil.isFQTableName(tableDesc.getName())) { + String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); + databaseName = split[0]; + simpleTableName = split[1]; } else { - LOG.info("Can't find partitions for adding."); + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = tableDesc.getName(); } + + // Store partitions to CatalogStore using alter table statement. + catalog.addPartitions(databaseName, simpleTableName, partitions, true); + LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); query.clearPartitions(); } } catch (Throwable e) { @@ -546,21 +540,6 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { return QueryState.QUERY_SUCCEEDED; } - private List getPartitionsWithContentsSummary(TajoConf conf, Path outputDir, - List partitions) throws IOException { - List finalPartitions = new ArrayList<>(); - - FileSystem fileSystem = outputDir.getFileSystem(conf); - for (PartitionDescProto partition : partitions) { - PartitionDescProto.Builder builder = partition.toBuilder(); - Path partitionPath = new Path(outputDir, partition.getPath()); - ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath); - builder.setNumBytes(contentSummary.getLength()); - finalPartitions.add(builder.build()); - } - return finalPartitions; - } - private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, @@ -695,7 +674,14 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod()); } - stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + long totalVolume = 0L; + if (!query.getPartitions().isEmpty()) { + totalVolume = query.getPartitions().stream().mapToLong(partition -> partition.getNumBytes()).sum(); + } else { + totalVolume = getTableVolume(query.systemConf, finalOutputDir); + } + + stats.setNumBytes(totalVolume); tableDescTobeCreated.setStats(stats); query.setResultDesc(tableDescTobeCreated); diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 00e6d75a12..51e047112d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -25,6 +25,7 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoRuntimeException; @@ -363,7 +364,7 @@ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoEx public abstract Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException; + TableDesc tableDesc, List partitions) throws IOException; public abstract void rollbackTable(LogicalNode node) throws IOException, TajoException; diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 132ceff0ae..4260e8ecc6 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -40,6 +40,7 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -911,7 +912,7 @@ public Pair getIndexablePredicateValue(ColumnMapping columnMapping @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { + TableDesc tableDesc, List partitions) throws IOException { if (tableDesc == null) { throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 35504afaf3..495afba75f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -33,6 +33,7 @@ import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.UnsupportedException; @@ -50,6 +51,7 @@ import java.text.NumberFormat; import java.util.*; +import static java.lang.String.format; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; @@ -768,8 +770,8 @@ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, - Schema schema, TableDesc tableDesc) throws IOException { - return commitOutputData(queryContext, true); + Schema schema, TableDesc tableDesc, List partitions) throws IOException { + return commitOutputData(queryContext, true, partitions); } @Override @@ -789,177 +791,248 @@ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc * @return Saved path * @throws java.io.IOException */ - protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { + protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq, + List partitions) throws IOException { + Path finalOutputDir = null; Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + OutputCommitHandle commitHandle = new OutputCommitHandle(); + if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); - try { - FileSystem fs = stagingResultDir.getFileSystem(conf); + boolean checkExistingPartition = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); + try { if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + if (partitions != null) { + commitInsertOverwriteOrCreateWithPartition(stagingResultDir, finalOutputDir, oldTableDir, partitions, + checkExistingPartition, commitHandle); + } else { + commitInsertOverwrite(stagingResultDir, finalOutputDir, oldTableDir); + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + Preconditions.checkNotNull(queryContext); + if (queryType.equals(NodeType.INSERT.name())) { // INSERT INTO + if (partitions != null) { + commitInsertWithPartition(stagingResultDir, finalOutputDir, partitions, commitHandle, changeFileSeq); + } else { + commitInsert(stagingResultDir, finalOutputDir, changeFileSeq); + } + cleanupTemporaryDirectory(stagingResultDir); + } else if (queryType.equals(NodeType.CREATE_TABLE.name())){ // CREATE TABLE AS SELECT (CTAS) + if (partitions != null) { + commitInsertOverwriteOrCreateWithPartition(stagingResultDir, finalOutputDir, oldTableDir, partitions, + checkExistingPartition, commitHandle); + } else { + commitCreate(stagingResultDir, finalOutputDir); + } + } else { + throw new IOException("Cannot handle query type:" + queryType); + } + } - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); - - // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. - boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); - - // If existing data doesn't need to keep, check if there are some files. - if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) - && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map renameDirs = new HashMap<>(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map recoveryDirs = new HashMap<>(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + return finalOutputDir; + } - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } + private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, Path finalOutputDir, + Path oldTableDir, List partitions, boolean checkExistingPartition, + OutputCommitHandle commitHandle) throws IOException { + String stagingResultPath = stagingResultDir.toString(); + String finalOutputPath = finalOutputDir.toString(); + String oldTablePath = oldTableDir.toString(); - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } + try { + for(PartitionDescProto partition : partitions) { + Path targetPath = new Path(partition.getPath() + "/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath)+"/"); + Path backupPath = new Path(partition.getPath().replaceAll(finalOutputPath, oldTablePath)); + + // Move existing directory to backup directory. + if (checkExistingPartition && fs.exists(targetPath)) { + renameDirectory(targetPath, backupPath); + commitHandle.addBackupPath(backupPath); + } - // Recovery renamed dirs - for(Map.Entry entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } + // Move staging directory to target directory + renameDirectory(stagingPath, targetPath); + commitHandle.addTargetPath(targetPath); - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { + // Summarize the volume of partitions + // TODO : This will improved at TAJO-2069 + long totalSize = calculateSize(targetPath); + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(totalSize); + commitHandle.addPartition(builder.build()); + } + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); + } catch (Exception e) { + rollback(stagingResultDir, finalOutputDir, oldTableDir, commitHandle); + throw new IOException("Failed to create partition table:", e); + } + } - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); + private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDir, + List partitions, OutputCommitHandle commitHandle, boolean changeFileSeq) throws IOException { + String stagingResultPath = stagingResultDir.toString(); + String finalOutputPath = finalOutputDir.toString(); - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } + try { + for(PartitionDescProto partition : partitions) { + Path targetPath = new Path(partition.getPath() + "/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath)+"/"); - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } + if (!fs.exists(targetPath)) { + renameDirectory(stagingPath, targetPath); + } else { + moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, + changeFileSeq, commitHandle); + } - // Check the final output dir - committed = fs.exists(finalOutputDir); + // Summarize the volume of partitions + PartitionDescProto.Builder builder = partition.toBuilder(); + // TODO: This will improved at TAJO-2069 + builder.setNumBytes(calculateSize(targetPath)); + commitHandle.addPartition(builder.build()); + } + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); + } catch (Exception e) { + rollback(stagingResultDir, finalOutputDir, commitHandle); + throw new IOException("Failed to create partition table:", e); + } + } - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { + private void rollback(Path stagingResultDir, Path finalOutputDir, + OutputCommitHandle commitHandle) throws IOException { + rollback(stagingResultDir, finalOutputDir, null, commitHandle); + } - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } + private void rollback(Path stagingResultDir, Path finalOutputDir, Path oldTableDir, + OutputCommitHandle commitHandle) throws IOException { + String finalOutputPath = finalOutputDir.toString(); + String oldTablePath = oldTableDir != null ? oldTableDir.toString() : null; - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } + // Delete data from the output directory + List targetPaths = commitHandle.getTargetPaths(); + for(Path targetPath: targetPaths) { + fs.delete(targetPath, true); + } - throw new IOException(ioe.getMessage()); - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + // Move from backup directory to output directory + List backupPaths = commitHandle.getBackupPaths(); + for(Path backupPath: backupPaths) { + Path targetPath = new Path(backupPath.toString().replaceAll(oldTablePath, finalOutputPath)); + fs.delete(targetPath, true); + renameDirectory(backupPath, targetPath); + } - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + // Delete staging directory + fs.delete(stagingResultDir, true); + } - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); + private void commitInsertOverwrite(Path stagingResultDir, Path finalOutputDir, Path oldTableDir) throws IOException { + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } + try { + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); } - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = stagingDir.getParent(); - fs.delete(stagingDirRoot, true); - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + + private void commitInsert(Path stagingResultDir, Path finalOutputDir, boolean changeFileSeq) throws IOException { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq, null); + } + } + + private void commitCreate(Path stagingResultDir, Path finalOutputDir) throws IOException { + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); } } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.rename(stagingResultDir, finalOutputDir); } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } - return finalOutputDir; + /** + * checking all file moved and remove empty dir + * @param stagingResultDir + * @throws IOException + */ + private void cleanupTemporaryDirectory(Path stagingResultDir) throws IOException { + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } } /** @@ -974,9 +1047,8 @@ protected Path commitOutputData(OverridableConf queryContext, boolean changeFile * @throws java.io.IOException */ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq, boolean changeFileSeq) throws IOException { + FileStatus fileStatus, Path finalOutputPath, NumberFormat nf, + int fileSeq, boolean changeFileSeq, OutputCommitHandle commitHandle) throws IOException { if (fileStatus.isDirectory()) { String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); if (subPath != null) { @@ -989,7 +1061,8 @@ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, if (eachFile.getPath().getName().startsWith("_")) { continue; } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq, + commitHandle); } } else { throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); @@ -1011,9 +1084,12 @@ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, if (success) { LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + "to final output[" + finalSubPath + "]"); + if (commitHandle != null) { + commitHandle.addTargetPath(finalSubPath); + } } else { LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); + "to final output[" + finalSubPath + "]"); } } } @@ -1085,63 +1161,34 @@ private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOExc return true; } - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws java.io.IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } + protected void renameDirectory(Path sourcePath, Path targetPath) throws IOException { + try { + if (!fs.exists(targetPath.getParent())) { + createDirectory(targetPath.getParent()); + } + if (!rename(sourcePath, targetPath)) { + throw new IOException(format("Failed to rename %s to %s: rename returned false", sourcePath, targetPath)); } + } catch (IOException e) { + e.printStackTrace(); + throw new IOException(format("Failed to rename %s to %s", sourcePath, targetPath), e); } - } - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; + } - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; + protected void createDirectory(Path path) throws IOException { + try { + if (!fs.mkdirs(path)) { + throw new IOException(format("mkdirs %s returned false", path)); } + } catch (IOException e) { + throw new IOException("Failed to create directory:" + path, e); } + } - return retValue; + protected boolean rename(Path sourcePath, Path targetPath) throws IOException { + return fs.rename(sourcePath, targetPath); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java new file mode 100644 index 0000000000..c6e977756b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; + +import java.util.ArrayList; +import java.util.List; + +public class OutputCommitHandle { + + private List backupPaths; + private List targetPaths; + private List partitions; + + public OutputCommitHandle() { + backupPaths = new ArrayList(); + targetPaths = new ArrayList(); + partitions = new ArrayList(); + } + + public List getBackupPaths() { + return backupPaths; + } + + public void setBackupPaths(List backupPaths) { + this.backupPaths = backupPaths; + } + + public void addBackupPath(Path path) { + this.backupPaths.add(path); + } + + public List getTargetPaths() { + return targetPaths; + } + + public void setTargetPaths(List renamedPaths) { + this.targetPaths = renamedPaths; + } + + public void addTargetPath(Path path) { + this.targetPaths.add(path); + } + + public List getPartitions() { + return partitions; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + + public void addPartition(PartitionDescProto partition) { + this.partitions.add(partition); + } +} diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index fa6cf486e2..536e238c63 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -27,6 +27,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.TajoRuntimeException; @@ -178,7 +179,7 @@ public void prepareTable(LogicalNode node) throws IOException { @Override public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { + TableDesc tableDesc, List partitions) throws IOException { throw new TajoRuntimeException(new NotImplementedException()); } From 21382dd7fa0e3ed98ce4020910b5c086ba12f84d Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 1 Feb 2016 15:24:30 +0900 Subject: [PATCH 02/13] Apply parallelStream --- .../apache/tajo/storage/FileTablespace.java | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 495afba75f..70b67106d8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -837,6 +837,7 @@ protected Path commitOutputData(OverridableConf queryContext, boolean changeFile Path stagingDirRoot = stagingDir.getParent(); fs.delete(stagingDirRoot, true); } catch (Throwable t) { + rollback(stagingResultDir, finalOutputDir, oldTableDir, commitHandle); LOG.error(t); throw new IOException(t); } @@ -853,10 +854,10 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P String finalOutputPath = finalOutputDir.toString(); String oldTablePath = oldTableDir.toString(); - try { - for(PartitionDescProto partition : partitions) { + partitions.parallelStream().forEach(partition -> { + try { Path targetPath = new Path(partition.getPath() + "/"); - Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath)+"/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); Path backupPath = new Path(partition.getPath().replaceAll(finalOutputPath, oldTablePath)); // Move existing directory to backup directory. @@ -870,18 +871,16 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P commitHandle.addTargetPath(targetPath); // Summarize the volume of partitions - // TODO : This will improved at TAJO-2069 long totalSize = calculateSize(targetPath); PartitionDescProto.Builder builder = partition.toBuilder(); builder.setNumBytes(totalSize); commitHandle.addPartition(builder.build()); + } catch (IOException e) { + throw new ConcurrentModificationException(); } - partitions.clear(); - partitions.addAll(commitHandle.getPartitions()); - } catch (Exception e) { - rollback(stagingResultDir, finalOutputDir, oldTableDir, commitHandle); - throw new IOException("Failed to create partition table:", e); - } + }); + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); } private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDir, @@ -893,10 +892,10 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi fmt.setGroupingUsed(false); fmt.setMinimumIntegerDigits(3); - try { - for(PartitionDescProto partition : partitions) { + partitions.parallelStream().forEach(partition -> { + try { Path targetPath = new Path(partition.getPath() + "/"); - Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath)+"/"); + Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); if (!fs.exists(targetPath)) { renameDirectory(stagingPath, targetPath); @@ -907,21 +906,14 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi // Summarize the volume of partitions PartitionDescProto.Builder builder = partition.toBuilder(); - // TODO: This will improved at TAJO-2069 builder.setNumBytes(calculateSize(targetPath)); commitHandle.addPartition(builder.build()); + } catch (IOException e) { + throw new ConcurrentModificationException(); } - partitions.clear(); - partitions.addAll(commitHandle.getPartitions()); - } catch (Exception e) { - rollback(stagingResultDir, finalOutputDir, commitHandle); - throw new IOException("Failed to create partition table:", e); - } - } - - private void rollback(Path stagingResultDir, Path finalOutputDir, - OutputCommitHandle commitHandle) throws IOException { - rollback(stagingResultDir, finalOutputDir, null, commitHandle); + }); + partitions.clear(); + partitions.addAll(commitHandle.getPartitions()); } private void rollback(Path stagingResultDir, Path finalOutputDir, Path oldTableDir, From faf383ebeb3d97c7b9f9976c105320e4d0cf91d5 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 1 Feb 2016 17:27:39 +0900 Subject: [PATCH 03/13] Trigger for CI build --- .../src/main/java/org/apache/tajo/storage/FileTablespace.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 70b67106d8..d4580c86cc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -1182,6 +1182,4 @@ protected void createDirectory(Path path) throws IOException { protected boolean rename(Path sourcePath, Path targetPath) throws IOException { return fs.rename(sourcePath, targetPath); } - - } From fec2ff1ed7f8f475487e6c2418d18368be5e4a20 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 1 Feb 2016 20:27:29 +0900 Subject: [PATCH 04/13] Replace parallelStream to Stream --- .../main/java/org/apache/tajo/storage/FileTablespace.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index d4580c86cc..9fdd4c48c0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -854,7 +854,7 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P String finalOutputPath = finalOutputDir.toString(); String oldTablePath = oldTableDir.toString(); - partitions.parallelStream().forEach(partition -> { + partitions.stream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); @@ -874,7 +874,8 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P long totalSize = calculateSize(targetPath); PartitionDescProto.Builder builder = partition.toBuilder(); builder.setNumBytes(totalSize); - commitHandle.addPartition(builder.build()); + PartitionDescProto partitionDescProto = builder.build(); + commitHandle.addPartition(partitionDescProto); } catch (IOException e) { throw new ConcurrentModificationException(); } @@ -892,7 +893,7 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi fmt.setGroupingUsed(false); fmt.setMinimumIntegerDigits(3); - partitions.parallelStream().forEach(partition -> { + partitions.stream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); From dfa58f9eb084d54f1f7b621f3901ef7c1e7bf335 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 2 Feb 2016 10:39:27 +0900 Subject: [PATCH 05/13] Replace List type to Set type with ConcurrentHashMap --- .../org/apache/tajo/storage/OutputCommitHandle.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java index c6e977756b..a71d2fb2bd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/OutputCommitHandle.java @@ -22,18 +22,21 @@ import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class OutputCommitHandle { private List backupPaths; private List targetPaths; - private List partitions; + private Set partitions; public OutputCommitHandle() { backupPaths = new ArrayList(); targetPaths = new ArrayList(); - partitions = new ArrayList(); + partitions = Collections.newSetFromMap(new ConcurrentHashMap<>()); } public List getBackupPaths() { @@ -60,11 +63,11 @@ public void addTargetPath(Path path) { this.targetPaths.add(path); } - public List getPartitions() { + public Set getPartitions() { return partitions; } - public void setPartitions(List partitions) { + public void setPartitions(Set partitions) { this.partitions = partitions; } From b548b1bc4769e454b94bd978366566ed1aad6e47 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 2 Feb 2016 10:42:54 +0900 Subject: [PATCH 06/13] Use parallelStream instead of stream --- .../src/main/java/org/apache/tajo/storage/FileTablespace.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 9fdd4c48c0..9f4dc9acf3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -854,7 +854,7 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P String finalOutputPath = finalOutputDir.toString(); String oldTablePath = oldTableDir.toString(); - partitions.stream().forEach(partition -> { + partitions.parallelStream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); @@ -893,7 +893,7 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi fmt.setGroupingUsed(false); fmt.setMinimumIntegerDigits(3); - partitions.stream().forEach(partition -> { + partitions.parallelStream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); From 7e4b115653032eaa246db464f6a1cf4eb1b70763 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 2 Feb 2016 17:19:31 +0900 Subject: [PATCH 07/13] Check if staging directory exits --- .../apache/tajo/storage/FileTablespace.java | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 9f4dc9acf3..a1874ae947 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -860,22 +860,24 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); Path backupPath = new Path(partition.getPath().replaceAll(finalOutputPath, oldTablePath)); - // Move existing directory to backup directory. - if (checkExistingPartition && fs.exists(targetPath)) { - renameDirectory(targetPath, backupPath); - commitHandle.addBackupPath(backupPath); - } - - // Move staging directory to target directory - renameDirectory(stagingPath, targetPath); - commitHandle.addTargetPath(targetPath); + if (fs.exists(stagingPath)) { + // Move existing directory to backup directory. + if (checkExistingPartition && fs.exists(targetPath)) { + renameDirectory(targetPath, backupPath); + commitHandle.addBackupPath(backupPath); + } - // Summarize the volume of partitions - long totalSize = calculateSize(targetPath); - PartitionDescProto.Builder builder = partition.toBuilder(); - builder.setNumBytes(totalSize); - PartitionDescProto partitionDescProto = builder.build(); - commitHandle.addPartition(partitionDescProto); + // Move staging directory to target directory + renameDirectory(stagingPath, targetPath); + commitHandle.addTargetPath(targetPath); + + // Summarize the volume of partitions + long totalSize = calculateSize(targetPath); + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(totalSize); + PartitionDescProto partitionDescProto = builder.build(); + commitHandle.addPartition(partitionDescProto); + } } catch (IOException e) { throw new ConcurrentModificationException(); } @@ -898,17 +900,19 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); - if (!fs.exists(targetPath)) { - renameDirectory(stagingPath, targetPath); - } else { - moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, - changeFileSeq, commitHandle); - } + if (fs.exists(stagingPath)) { + if (!fs.exists(targetPath)) { + renameDirectory(stagingPath, targetPath); + } else { + moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, + changeFileSeq, commitHandle); + } - // Summarize the volume of partitions - PartitionDescProto.Builder builder = partition.toBuilder(); - builder.setNumBytes(calculateSize(targetPath)); - commitHandle.addPartition(builder.build()); + // Summarize the volume of partitions + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(calculateSize(targetPath)); + commitHandle.addPartition(builder.build()); + } } catch (IOException e) { throw new ConcurrentModificationException(); } From 156578855a3e85074dadc730b12919b2dbcc1a9d Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 2 Feb 2016 18:02:31 +0900 Subject: [PATCH 08/13] Replace parallelStream to stream --- .../apache/tajo/storage/FileTablespace.java | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index a1874ae947..9fdd4c48c0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -854,30 +854,28 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P String finalOutputPath = finalOutputDir.toString(); String oldTablePath = oldTableDir.toString(); - partitions.parallelStream().forEach(partition -> { + partitions.stream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); Path backupPath = new Path(partition.getPath().replaceAll(finalOutputPath, oldTablePath)); - if (fs.exists(stagingPath)) { - // Move existing directory to backup directory. - if (checkExistingPartition && fs.exists(targetPath)) { - renameDirectory(targetPath, backupPath); - commitHandle.addBackupPath(backupPath); - } - - // Move staging directory to target directory - renameDirectory(stagingPath, targetPath); - commitHandle.addTargetPath(targetPath); - - // Summarize the volume of partitions - long totalSize = calculateSize(targetPath); - PartitionDescProto.Builder builder = partition.toBuilder(); - builder.setNumBytes(totalSize); - PartitionDescProto partitionDescProto = builder.build(); - commitHandle.addPartition(partitionDescProto); + // Move existing directory to backup directory. + if (checkExistingPartition && fs.exists(targetPath)) { + renameDirectory(targetPath, backupPath); + commitHandle.addBackupPath(backupPath); } + + // Move staging directory to target directory + renameDirectory(stagingPath, targetPath); + commitHandle.addTargetPath(targetPath); + + // Summarize the volume of partitions + long totalSize = calculateSize(targetPath); + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(totalSize); + PartitionDescProto partitionDescProto = builder.build(); + commitHandle.addPartition(partitionDescProto); } catch (IOException e) { throw new ConcurrentModificationException(); } @@ -895,24 +893,22 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi fmt.setGroupingUsed(false); fmt.setMinimumIntegerDigits(3); - partitions.parallelStream().forEach(partition -> { + partitions.stream().forEach(partition -> { try { Path targetPath = new Path(partition.getPath() + "/"); Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); - if (fs.exists(stagingPath)) { - if (!fs.exists(targetPath)) { - renameDirectory(stagingPath, targetPath); - } else { - moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, - changeFileSeq, commitHandle); - } - - // Summarize the volume of partitions - PartitionDescProto.Builder builder = partition.toBuilder(); - builder.setNumBytes(calculateSize(targetPath)); - commitHandle.addPartition(builder.build()); + if (!fs.exists(targetPath)) { + renameDirectory(stagingPath, targetPath); + } else { + moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, + changeFileSeq, commitHandle); } + + // Summarize the volume of partitions + PartitionDescProto.Builder builder = partition.toBuilder(); + builder.setNumBytes(calculateSize(targetPath)); + commitHandle.addPartition(builder.build()); } catch (IOException e) { throw new ConcurrentModificationException(); } From c6892d22fbffe1a609d53fa0eb2bd25b553bf087 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 5 Feb 2016 11:52:49 +0900 Subject: [PATCH 09/13] Remove unnecessary codes --- .../src/main/java/org/apache/tajo/querymaster/Query.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index dae7058094..18d2f5e3e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -674,13 +674,7 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod()); } - long totalVolume = 0L; - if (!query.getPartitions().isEmpty()) { - totalVolume = query.getPartitions().stream().mapToLong(partition -> partition.getNumBytes()).sum(); - } else { - totalVolume = getTableVolume(query.systemConf, finalOutputDir); - } - + long totalVolume = getTableVolume(query.systemConf, finalOutputDir); stats.setNumBytes(totalVolume); tableDescTobeCreated.setStats(stats); query.setResultDesc(tableDescTobeCreated); From 659f3c1ac37bc01ac005fdd49afa8e2e62bb6e53 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 12 Feb 2016 01:36:56 +0900 Subject: [PATCH 10/13] Implement a method for committing each tasks in FileTablespace and S3TableSpace --- .../apache/tajo/storage/FileTablespace.java | 55 +++++++++++-------- tajo-storage/tajo-storage-s3/pom.xml | 5 ++ .../apache/tajo/storage/s3/S3TableSpace.java | 55 ++++++++++++++++++- .../tajo/storage/s3/TestS3TableSpace.java | 26 +++++++++ 4 files changed, 117 insertions(+), 24 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 9fdd4c48c0..42f45ac03f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -128,14 +128,15 @@ protected void storageInit() throws IOException { @Override public long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException { - Path path = new Path(table.getUri()); - ContentSummary summary; - try { - summary = fs.getContentSummary(path); - } catch (IOException e) { - throw new TajoInternalError(e); - } - return summary.getLength(); +// Path path = new Path(table.getUri()); +// ContentSummary summary; +// try { +// summary = fs.getContentSummary(path); +// } catch (IOException e) { +// throw new TajoInternalError(e); +// } +// return summary.getLength(); + return -1L; } @Override @@ -248,14 +249,15 @@ public static FileFragment[] splitNG(Configuration conf, String tableName, Table } public long calculateSize(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - long totalSize = 0; - - if (fs.exists(tablePath)) { - totalSize = fs.getContentSummary(tablePath).getLength(); - } - - return totalSize; +// FileSystem fs = tablePath.getFileSystem(conf); +// long totalSize = 0; +// +// if (fs.exists(tablePath)) { +// totalSize = fs.getContentSummary(tablePath).getLength(); +// } +// +// return totalSize; + return -1L; } ///////////////////////////////////////////////////////////////////////////// @@ -867,7 +869,7 @@ private void commitInsertOverwriteOrCreateWithPartition(Path stagingResultDir, P } // Move staging directory to target directory - renameDirectory(stagingPath, targetPath); + commitTask(stagingPath, targetPath); commitHandle.addTargetPath(targetPath); // Summarize the volume of partitions @@ -899,7 +901,7 @@ private void commitInsertWithPartition(Path stagingResultDir, Path finalOutputDi Path stagingPath = new Path(partition.getPath().replaceAll(finalOutputPath, stagingResultPath) + "/"); if (!fs.exists(targetPath)) { - renameDirectory(stagingPath, targetPath); + commitTask(stagingPath, targetPath); } else { moveResultFromStageToFinal(fs, stagingResultDir, fs.getFileStatus(stagingPath), finalOutputDir, fmt, -1, changeFileSeq, commitHandle); @@ -964,7 +966,7 @@ private void commitInsertOverwrite(Path stagingResultDir, Path finalOutputDir, P // Move the results to the final output dir. for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); + commitTask(status.getPath(), finalOutputDir); } // Check the final output dir @@ -1005,10 +1007,10 @@ private void commitInsert(Path stagingResultDir, Path finalOutputDir, boolean ch private void commitCreate(Path stagingResultDir, Path finalOutputDir) throws IOException { if (fs.exists(finalOutputDir)) { for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); + commitTask(status.getPath(), finalOutputDir); } } else { - fs.rename(stagingResultDir, finalOutputDir); + commitTask(stagingResultDir, finalOutputDir); } LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); } @@ -1073,7 +1075,7 @@ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, if (fs.exists(finalSubPath)) { throw new IOException("Already exists data file:" + finalSubPath); } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + boolean success = commitTask(fileStatus.getPath(), finalSubPath); if (success) { LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + "to final output[" + finalSubPath + "]"); @@ -1183,4 +1185,13 @@ protected void createDirectory(Path path) throws IOException { protected boolean rename(Path sourcePath, Path targetPath) throws IOException { return fs.rename(sourcePath, targetPath); } + + protected boolean commitTask(Path sourcePath, Path targetPath) throws IOException { + try { + renameDirectory(sourcePath, targetPath); + return true; + } catch (IOException e) { + throw new IOException("Failed to commit Task - source:" + sourcePath + ", target:" + targetPath, e); + } + } } diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml index a9a541aed1..681eea64cd 100644 --- a/tajo-storage/tajo-storage-s3/pom.xml +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -106,6 +106,11 @@ tajo-common provided + + org.apache.tajo + tajo-catalog-common + provided + org.apache.tajo tajo-storage-common diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 4bcdb60a68..87f6283887 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -18,14 +18,65 @@ package org.apache.tajo.storage.s3; +import java.io.IOException; import java.net.URI; -import org.apache.tajo.storage.FileTablespace; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryVars; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.*; import net.minidev.json.JSONObject; public class S3TableSpace extends FileTablespace { + private static final StorageProperty s3StorageProperties = new StorageProperty("TEXT", false, true, true, false); + public S3TableSpace(String spaceName, URI uri, JSONObject config) { super(spaceName, uri, config); } -} + + @Override + public StorageProperty getProperty() { + return s3StorageProperties; + } + + @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + if (!context.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { + String outputPath = stagingRootPath.toString(); + + URI stagingRootUri = stagingRootPath.toUri(); + + outputPath = outputPath.replaceAll(stagingRootUri.getScheme() + "://" + stagingRootUri.getHost(), "file://"); + + FileSystem localFileSystem = FileSystem.getLocal(conf); + Path stagingDir = localFileSystem.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, + queryId)); + + return stagingDir.toUri(); + } else { + return super.getStagingUri(context, queryId, meta); + } + } + + @Override + protected boolean commitTask(Path sourcePath, Path targetPath) throws IOException { + try { + FileSystem localFileSystem = FileSystem.getLocal(conf); + if (localFileSystem.isDirectory(sourcePath)) { + FileStatus[] statuses = localFileSystem.listStatus(sourcePath); + for(FileStatus status: statuses) { + this.fs.copyFromLocalFile(status.getPath(), targetPath); + } + } else { + this.fs.copyFromLocalFile(sourcePath, targetPath); + } + return true; + } catch (IOException e) { + throw new IOException("Failed to commit Task - source:" + sourcePath + ", target:" + targetPath, e); + } + } +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java index 2d0677885c..ef1029b547 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -19,8 +19,17 @@ package org.apache.tajo.storage.s3; import net.minidev.json.JSONObject; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.QueryVars; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TajoIdUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -29,6 +38,7 @@ import java.net.URI; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TestS3TableSpace { @@ -59,4 +69,20 @@ public void testTablespaceHandler() throws Exception { assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace); assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString()); } + + @Test + public void testGetStagingUri() throws Exception { + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + QueryId queryId = QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0)); + conf.put(QueryVars.OUTPUT_TABLE_URI, S3_URI + "tajo/warehouse/test1"); + + S3TableSpace tableSpace = TablespaceManager.get(S3_URI); + assertNotNull(tableSpace); + + URI stagingUri = tableSpace.getStagingUri(conf, queryId.getId(), meta); + assertEquals(stagingUri.getScheme(), "file"); + assertTrue(stagingUri.toString().startsWith("file:///tmp/tajo")); + } + } From 768bcea4cdd9bfa41359151b6b02a66a7181d31e Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 12 Feb 2016 02:09:43 +0900 Subject: [PATCH 11/13] Trigger for Travis CI build --- .../src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 87f6283887..e27131b623 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -79,4 +79,5 @@ protected boolean commitTask(Path sourcePath, Path targetPath) throws IOExceptio throw new IOException("Failed to commit Task - source:" + sourcePath + ", target:" + targetPath, e); } } + } \ No newline at end of file From 5d728f411dab4b4ab5c5f733ca3db856b764988b Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 12 Feb 2016 10:48:42 +0900 Subject: [PATCH 12/13] Add more unit test cases --- .../apache/tajo/storage/s3/TestS3TableSpace.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java index ef1029b547..e6a3a18340 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -71,7 +71,7 @@ public void testTablespaceHandler() throws Exception { } @Test - public void testGetStagingUri() throws Exception { + public void testGetStagingUriWithOutputTableUri() throws Exception { OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); QueryId queryId = QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0)); @@ -85,4 +85,17 @@ public void testGetStagingUri() throws Exception { assertTrue(stagingUri.toString().startsWith("file:///tmp/tajo")); } + @Test + public void testGetStagingUriWithoutOutputTableUri() throws Exception { + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + QueryId queryId = QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0)); + + S3TableSpace tableSpace = TablespaceManager.get(S3_URI); + assertNotNull(tableSpace); + + URI stagingUri = tableSpace.getStagingUri(conf, queryId.getId(), meta); + assertEquals(stagingUri.getScheme(), null); + assertTrue(stagingUri.toString().startsWith("/tmp/tajo")); + } } From 054ba48ffb2f04656ba25e7178044f9e1cd4f4b6 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 12 Feb 2016 10:51:03 +0900 Subject: [PATCH 13/13] Remove unnecessary updates --- .../apache/tajo/storage/FileTablespace.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 42f45ac03f..9fbe1829c3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -128,15 +128,14 @@ protected void storageInit() throws IOException { @Override public long getTableVolume(TableDesc table, Optional filter) throws UnsupportedException { -// Path path = new Path(table.getUri()); -// ContentSummary summary; -// try { -// summary = fs.getContentSummary(path); -// } catch (IOException e) { -// throw new TajoInternalError(e); -// } -// return summary.getLength(); - return -1L; + Path path = new Path(table.getUri()); + ContentSummary summary; + try { + summary = fs.getContentSummary(path); + } catch (IOException e) { + throw new TajoInternalError(e); + } + return summary.getLength(); } @Override @@ -249,15 +248,14 @@ public static FileFragment[] splitNG(Configuration conf, String tableName, Table } public long calculateSize(Path tablePath) throws IOException { -// FileSystem fs = tablePath.getFileSystem(conf); -// long totalSize = 0; -// -// if (fs.exists(tablePath)) { -// totalSize = fs.getContentSummary(tablePath).getLength(); -// } -// -// return totalSize; - return -1L; + FileSystem fs = tablePath.getFileSystem(conf); + long totalSize = 0; + + if (fs.exists(tablePath)) { + totalSize = fs.getContentSummary(tablePath).getLength(); + } + + return totalSize; } /////////////////////////////////////////////////////////////////////////////