diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 8eacc7e2220a..9557ab5b759b 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -202,21 +202,6 @@ mockito-core test - - org.powermock - powermock-core - test - - - org.powermock - powermock-module-junit4 - test - - - org.powermock - powermock-api-easymock - test - diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java deleted file mode 100644 index e12f7fbf5dc6..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.timeline.DataSegment; - -import java.util.List; -import java.util.Objects; - -/** - * holds a {@link DataSegment} with the temporary file path where the corresponding index zip file is currently stored - * and the final path where the index zip file should eventually be moved to. - * see {@link JobHelper#renameIndexFilesForSegments(HadoopIngestionSpec, List)} - */ -public class DataSegmentAndIndexZipFilePath -{ - private final DataSegment segment; - private final String tmpIndexZipFilePath; - private final String finalIndexZipFilePath; - - @JsonCreator - public DataSegmentAndIndexZipFilePath( - @JsonProperty("segment") DataSegment segment, - @JsonProperty("tmpIndexZipFilePath") String tmpIndexZipFilePath, - @JsonProperty("finalIndexZipFilePath") String finalIndexZipFilePath - ) - { - this.segment = segment; - this.tmpIndexZipFilePath = tmpIndexZipFilePath; - this.finalIndexZipFilePath = finalIndexZipFilePath; - } - - @JsonProperty - public DataSegment getSegment() - { - return segment; - } - - @JsonProperty - public String getTmpIndexZipFilePath() - { - return tmpIndexZipFilePath; - } - - @JsonProperty - public String getFinalIndexZipFilePath() - { - return finalIndexZipFilePath; - } - - @Override - public boolean equals(Object o) - { - if (o instanceof DataSegmentAndIndexZipFilePath) { - DataSegmentAndIndexZipFilePath that = (DataSegmentAndIndexZipFilePath) o; - return segment.equals(((DataSegmentAndIndexZipFilePath) o).getSegment()) - && tmpIndexZipFilePath.equals(that.getTmpIndexZipFilePath()) - && finalIndexZipFilePath.equals(that.getFinalIndexZipFilePath()); - } - return false; - } - - @Override - public int hashCode() - { - return Objects.hash(segment.getId(), tmpIndexZipFilePath); - } - - @Override - public String toString() - { - return "DataSegmentAndIndexZipFilePath{" + - "segment=" + segment + - ", tmpIndexZipFilePath=" + tmpIndexZipFilePath + - ", finalIndexZipFilePath=" + finalIndexZipFilePath + - '}'; - } -} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java deleted file mode 100644 index 96fde6b8ecee..000000000000 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - -import java.io.IOException; -import java.net.URI; - -/** - * This class exists for testing purposes, see {@link JobHelperPowerMockTest}. Using the - * raw {@link FileSystem} class resulted in errors with java assist. - */ -public class FileSystemHelper -{ - public static FileSystem get(URI uri, Configuration conf) throws IOException - { - return FileSystem.get(uri, conf); - } -} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index ea37db1a10ec..8b5b4b6b0bba 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -59,12 +59,7 @@ public boolean run() if (config.isDeterminingPartitions()) { job = createPartitionJob(config); config.setHadoopJobIdFileName(hadoopJobIdFile); - boolean jobSucceeded = JobHelper.runSingleJob(job); - JobHelper.maybeDeleteIntermediatePath( - jobSucceeded, - config.getSchema() - ); - return jobSucceeded; + return JobHelper.runSingleJob(job, config); } else { final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); final int shardsPerInterval; diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java index 58977ad48401..25683f32bd8f 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.util.ArrayList; @@ -39,7 +40,7 @@ public class HadoopDruidIndexerJob implements Jobby @Nullable private IndexGeneratorJob indexJob; @Nullable - private volatile List publishedSegmentAndIndexZipFilePaths = null; + private volatile List publishedSegments = null; @Nullable private String hadoopJobIdFile; @@ -90,14 +91,14 @@ public boolean run() @Override public boolean run() { - publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); + publishedSegments = IndexGeneratorJob.getPublishedSegments(config); return true; } } ); config.setHadoopJobIdFileName(hadoopJobIdFile); - return JobHelper.runJobs(jobs); + return JobHelper.runJobs(jobs, config); } @Override @@ -121,12 +122,12 @@ public String getErrorMessage() return indexJob.getErrorMessage(); } - public List getPublishedSegmentAndIndexZipFilePaths() + public List getPublishedSegments() { - if (publishedSegmentAndIndexZipFilePaths == null) { + if (publishedSegments == null) { throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); } - return publishedSegmentAndIndexZipFilePaths; + return publishedSegments; } public void setHadoopJobIdFile(String hadoopJobIdFile) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 9124b9b4c1d8..a12e76571dba 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -102,14 +102,14 @@ public class IndexGeneratorJob implements Jobby { private static final Logger log = new Logger(IndexGeneratorJob.class); - public static List getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config) + public static List getPublishedSegments(HadoopDruidIndexerConfig config) { final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config); config.addJobProperties(conf); final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER; - ImmutableList.Builder publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder(); + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); final Path descriptorInfoDir = config.makeDescriptorInfoDir(); @@ -117,9 +117,9 @@ public static List getPublishedSegmentAndIndexZi FileSystem fs = descriptorInfoDir.getFileSystem(conf); for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class); - publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath); - log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId()); + final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class); + publishedSegmentsBuilder.add(segment); + log.info("Adding segment %s to the list of published segments", segment.getId()); } } catch (FileNotFoundException e) { @@ -133,9 +133,9 @@ public static List getPublishedSegmentAndIndexZi catch (IOException e) { throw new RuntimeException(e); } - List publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build(); + List publishedSegments = publishedSegmentsBuilder.build(); - return publishedSegmentAndIndexZipFilePaths; + return publishedSegments; } private final HadoopDruidIndexerConfig config; @@ -809,7 +809,7 @@ public void doRun() 0 ); - final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex( + final DataSegment segment = JobHelper.serializeOutIndex( segmentTemplate, context.getConfiguration(), context, @@ -831,7 +831,7 @@ public void doRun() HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER ); - Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment()); + Path descriptorPath = config.makeDescriptorInfoPath(segment); descriptorPath = JobHelper.prependFSIfNullScheme( FileSystem.get( descriptorPath.toUri(), @@ -842,7 +842,7 @@ public void doRun() log.info("Writing descriptor to path[%s]", descriptorPath); JobHelper.writeSegmentDescriptor( config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()), - segmentAndIndexZipFilePath, + segment, descriptorPath, context ); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java index b8c29b82f7a7..7d99d03ad565 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java @@ -386,13 +386,29 @@ public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJob } } - public static boolean runSingleJob(Jobby job) + public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) { boolean succeeded = job.run(); + + if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { + if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { + Path workingPath = config.makeIntermediatePath(); + log.info("Deleting path[%s]", workingPath); + try { + Configuration conf = injectSystemProperties(new Configuration(), config); + config.addJobProperties(conf); + workingPath.getFileSystem(conf).delete(workingPath, true); + } + catch (IOException e) { + log.error(e, "Failed to cleanup path[%s]", workingPath); + } + } + } + return succeeded; } - public static boolean runJobs(List jobs) + public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config) { boolean succeeded = true; for (Jobby job : jobs) { @@ -402,33 +418,25 @@ public static boolean runJobs(List jobs) } } - return succeeded; - } - - public static void maybeDeleteIntermediatePath( - boolean jobSucceeded, - HadoopIngestionSpec indexerSchema) - { - HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); - final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); - config.addJobProperties(configuration); - JobHelper.injectDruidProperties(configuration, config); if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { - if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { + if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { Path workingPath = config.makeIntermediatePath(); log.info("Deleting path[%s]", workingPath); try { - config.addJobProperties(configuration); - workingPath.getFileSystem(configuration).delete(workingPath, true); + Configuration conf = injectSystemProperties(new Configuration(), config); + config.addJobProperties(conf); + workingPath.getFileSystem(conf).delete(workingPath, true); } catch (IOException e) { log.error(e, "Failed to cleanup path[%s]", workingPath); } } } + + return succeeded; } - public static DataSegmentAndIndexZipFilePath serializeOutIndex( + public static DataSegment serializeOutIndex( final DataSegment segmentTemplate, final Configuration configuration, final Progressable progressable, @@ -474,16 +482,20 @@ public long push() throws IOException .withSize(size.get()) .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); - return new DataSegmentAndIndexZipFilePath( - finalSegment, - tmpPath.toUri().getPath(), - finalIndexZipFilePath.toUri().getPath() - ); + if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) { + throw new IOE( + "Unable to rename [%s] to [%s]", + tmpPath.toUri().toString(), + finalIndexZipFilePath.toUri().toString() + ); + } + + return finalSegment; } public static void writeSegmentDescriptor( final FileSystem outputFS, - final DataSegmentAndIndexZipFilePath segmentAndPath, + final DataSegment segment, final Path descriptorPath, final Progressable progressable ) @@ -499,12 +511,9 @@ public long push() throws IOException try { progressable.progress(); if (outputFS.exists(descriptorPath)) { - // If the descriptor path already exists, don't overwrite, and risk clobbering it. - // If it already exists, it means that the segment data is already written to the - // tmp path, and the existing descriptor written should give us the information we - // need to rename the segment index to final path and publish it in the top level task. - log.info("descriptor path [%s] already exists, not overwriting", descriptorPath); - return -1; + if (!outputFS.delete(descriptorPath, false)) { + throw new IOE("Failed to delete descriptor at [%s]", descriptorPath); + } } try (final OutputStream descriptorOut = outputFS.create( descriptorPath, @@ -512,7 +521,7 @@ public long push() throws IOException DEFAULT_FS_BUFFER_SIZE, progressable )) { - HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath); + HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segment); } } catch (RuntimeException | IOException ex) { @@ -623,39 +632,7 @@ public static Path makeTmpPath( } /** - * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename - * which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race - * conditions between job retries can cause the final segment index file path to get clobbered. - * - * @param indexerSchema the hadoop ingestion spec - * @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path - * that they should be renamed to. - */ - public static void renameIndexFilesForSegments( - HadoopIngestionSpec indexerSchema, - List segmentAndIndexZipFilePaths - ) throws IOException - { - HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); - final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); - config.addJobProperties(configuration); - JobHelper.injectDruidProperties(configuration, config); - for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) { - Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath()); - Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath()); - final FileSystem outputFS = FileSystemHelper.get(finalIndexZipFilePath.toUri(), configuration); - if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) { - throw new IOE( - "Unable to rename [%s] to [%s]", - tmpPath.toUri().toString(), - finalIndexZipFilePath.toUri().toString() - ); - } - } - } - - /** - * Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename + * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename * which will not overwrite * * @param outputFS The output fs @@ -664,7 +641,7 @@ public static void renameIndexFilesForSegments( * * @return False if a rename failed, true otherwise (rename success or no rename needed) */ - private static boolean renameIndexFile( + private static boolean renameIndexFiles( final FileSystem outputFS, final Path indexZipFilePath, final Path finalIndexZipFilePath diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java index bdadc95010e3..b7eb60bf0931 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java @@ -22,7 +22,6 @@ import org.apache.druid.timeline.DataSegment; import java.util.List; -import java.util.stream.Collectors; /** */ @@ -43,8 +42,7 @@ public MetadataStorageUpdaterJob( @Override public boolean run() { - final List segmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); - final List segments = segmentAndIndexZipFilePaths.stream().map(s -> s.getSegment()).collect(Collectors.toList()); + final List segments = IndexGeneratorJob.getPublishedSegments(config); final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable(); handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 913c6481e577..404d5ed67b8f 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -372,15 +372,7 @@ private void testIngestion( ) throws Exception { IndexGeneratorJob job = new IndexGeneratorJob(config); - Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job))); - - List dataSegmentAndIndexZipFilePaths = - IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); - JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths); - - JobHelper.maybeDeleteIntermediatePath(true, config.getSchema()); - File workingPath = new File(config.makeIntermediatePath().toUri().getPath()); - Assert.assertFalse(workingPath.exists()); + Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); File segmentFolder = new File( StringUtils.format( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java deleted file mode 100644 index 3dcd2033e2ff..000000000000 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.partition.NumberedShardSpec; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; - -public class DataSegmentAndIndexZipFilePathTest -{ - private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 1); - private static final SegmentId OTHER_SEGMENT_ID = SegmentId.dummy("data-source2", 1); - private static final DataSegment SEGMENT = new DataSegment( - SEGMENT_ID, - null, - null, - null, - new NumberedShardSpec(1, 10), - null, - 0, - 0 - ); - private static final DataSegment OTHER_SEGMENT = new DataSegment( - OTHER_SEGMENT_ID, - null, - null, - null, - new NumberedShardSpec(1, 10), - null, - 0, - 0 - ); - - private DataSegmentAndIndexZipFilePath target; - - @Test - public void test_equals_otherNull_notEqual() - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - Assert.assertNotEquals(target, null); - } - - @Test - public void test_equals_differentSegmentId_notEqual() - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( - OTHER_SEGMENT, - tmpPath, - finalPath - ); - Assert.assertNotEquals(target, other); - } - - @Test - public void test_equals_differentTmpPath_notEqual() - { - String tmpPath = "tmpPath"; - String otherTmpPath = "otherTmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( - SEGMENT, - otherTmpPath, - finalPath - ); - Assert.assertNotEquals(target, other); - } - - @Test - public void test_equals_differentFinalPath_notEqual() - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - String otherFinalPath = "otherFinalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - otherFinalPath - ); - Assert.assertNotEquals(target, other); - } - - @Test - public void test_equals_allFieldsEqualValue_equal() - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - Assert.assertEquals(target, other); - } - - @Test - public void test_equals_sameObject_equal() - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - Assert.assertEquals(target, target); - } - - @Test - public void test_serde() throws IOException - { - String tmpPath = "tmpPath"; - String finalPath = "finalPath"; - target = new DataSegmentAndIndexZipFilePath( - SEGMENT, - tmpPath, - finalPath - ); - - final InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); - final ObjectMapper mapper = new DefaultObjectMapper(); - mapper.setInjectableValues(injectableValues); - final String json = mapper.writeValueAsString(target); - final DataSegmentAndIndexZipFilePath fromJson = - mapper.readValue(json, DataSegmentAndIndexZipFilePath.class); - Assert.assertEquals(target, fromJson); - } -} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java deleted file mode 100644 index 8231b7ce4aae..000000000000 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.List; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ - JobHelper.class, - IndexGeneratorJob.class -}) -@PowerMockIgnore({"javax.net.ssl.*"}) -public class HadoopDruidIndexerJobTest -{ - private HadoopDruidIndexerConfig config; - private MetadataStorageUpdaterJobHandler handler; - private HadoopDruidIndexerJob target; - - @Test - public void test_run() - { - config = PowerMock.createMock(HadoopDruidIndexerConfig.class); - handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class); - PowerMock.mockStaticNice(JobHelper.class); - PowerMock.mockStaticNice(IndexGeneratorJob.class); - config.verify(); - EasyMock.expectLastCall(); - EasyMock.expect(config.isUpdaterJobSpecSet()).andReturn(false).anyTimes(); - config.setHadoopJobIdFileName(EasyMock.anyString()); - EasyMock.expectLastCall(); - JobHelper.ensurePaths(config); - EasyMock.expectLastCall(); - Capture> capturedJobs = Capture.newInstance(); - EasyMock.expect(JobHelper.runJobs(EasyMock.capture(capturedJobs))).andReturn(true); - EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(EasyMock.anyObject())).andReturn(null); - - - PowerMock.replayAll(); - - target = new HadoopDruidIndexerJob(config, handler); - target.run(); - - List jobs = capturedJobs.getValue(); - Assert.assertEquals(2, jobs.size()); - jobs.stream().filter(job -> !(job instanceof IndexGeneratorJob)).forEach(job -> Assert.assertTrue(job.run())); - - PowerMock.verifyAll(); - } -} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 24ff1227ee58..97967bdb3b7e 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -621,21 +621,13 @@ public void testIndexGeneratorJob() throws IOException private void verifyJob(IndexGeneratorJob job) throws IOException { - Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job))); + Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); final Map> intervalToSegments = new HashMap<>(); IndexGeneratorJob - .getPublishedSegmentAndIndexZipFilePaths(config) - .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>()) - .add(segmentAndIndexZipFilePath.getSegment())); - - List dataSegmentAndIndexZipFilePaths = - IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); - JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths); - - JobHelper.maybeDeleteIntermediatePath(true, config.getSchema()); - File workingPath = new File(config.makeIntermediatePath().toUri().getPath()); - Assert.assertTrue(workingPath.exists()); + .getPublishedSegments(config) + .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()) + .add(segment)); final Map> intervalToIndexFiles = new HashMap<>(); int segmentNum = 0; diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java deleted file mode 100644 index 48f653a8c3b9..000000000000 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.java.util.common.IOE; -import org.apache.druid.timeline.DataSegment; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.easymock.EasyMock; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.io.IOException; -import java.net.URI; -import java.util.List; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ - FileSystemHelper.class, - HadoopDruidIndexerConfig.class -}) -@PowerMockIgnore({"javax.net.ssl.*"}) -public class JobHelperPowerMockTest -{ - private static final String TMP_PATH = "/tmp/index.zip.0"; - private static final String FINAL_PATH = "/final/index.zip.0"; - - private HadoopDruidIndexerConfig indexerConfig; - - @Test - public void test_renameIndexFilesForSegments_emptySegments() throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - List segmentAndIndexZipFilePaths = ImmutableList.of(); - - PowerMock.replayAll(); - - JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); - - PowerMock.verifyAll(); - } - - @Test - public void test_renameIndexFilesForSegments_segmentIndexFileRenamedSuccessfully() - throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - mockFileSystem(true); - DataSegment segment = PowerMock.createMock(DataSegment.class); - - List segmentAndIndexZipFilePaths = ImmutableList.of( - new DataSegmentAndIndexZipFilePath( - segment, - TMP_PATH, - FINAL_PATH - ) - ); - PowerMock.replayAll(); - - JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); - - PowerMock.verifyAll(); - } - - @Test (expected = IOE.class) - public void test_renameIndexFilesForSegments_segmentIndexFileRenamedFailed_throwsException() - throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - mockFileSystem(false); - DataSegment segment = PowerMock.createMock(DataSegment.class); - List segmentAndIndexZipFilePaths = ImmutableList.of( - new DataSegmentAndIndexZipFilePath( - segment, - TMP_PATH, - FINAL_PATH - ) - ); - - PowerMock.replayAll(); - - JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); - - PowerMock.verifyAll(); - } - - @Test - public void test_maybeDeleteIntermediatePath_leaveIntermediate_doesNotDeleteIntermediatePath() - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); - EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(true); - EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig); - - PowerMock.replayAll(); - - JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec); - - PowerMock.verifyAll(); - } - - @Test - public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexerJobSucceeded_deleteIntermediatePath() - throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); - Path workingPath = PowerMock.createMock(Path.class); - FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); - EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); - EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig); - EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true); - EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); - EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); - - PowerMock.replayAll(); - - JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec); - - PowerMock.verifyAll(); - } - - @Test - public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexJobFailedAndCleanupOnFailure_deleteIntermediatePath() - throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); - Path workingPath = PowerMock.createMock(Path.class); - FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); - EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); - EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true); - EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes(); - EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true); - EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); - EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); - - PowerMock.replayAll(); - - JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec); - - PowerMock.verifyAll(); - } - - @Test - public void test_maybeDeleteIntermediatePath_deleteThrowsException_noExceptionPropogated() - throws IOException - { - HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); - HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); - Path workingPath = PowerMock.createMock(Path.class); - FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); - EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); - EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true); - EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes(); - EasyMock.expect(workingPathFs.delete(workingPath, true)).andThrow(new IOException("Delete Exception")); - EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); - EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); - - PowerMock.replayAll(); - - JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec); - - PowerMock.verifyAll(); - } - - private HadoopIngestionSpec mockIngestionSpec() - { - indexerConfig = PowerMock.createMock(HadoopDruidIndexerConfig.class); - HadoopIngestionSpec ingestionSpec = PowerMock.createMock(HadoopIngestionSpec.class); - PowerMock.mockStaticNice(HadoopDruidIndexerConfig.class); - EasyMock.expect(indexerConfig.getAllowedProperties()).andReturn(ImmutableMap.of()).anyTimes(); - indexerConfig.addJobProperties(EasyMock.anyObject(Configuration.class)); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(HadoopDruidIndexerConfig.fromSpec(ingestionSpec)).andReturn(indexerConfig); - EasyMock.expect(indexerConfig.getSchema()).andReturn(ingestionSpec).anyTimes(); - return ingestionSpec; - } - - private void mockFileSystem(boolean renameSuccess) throws IOException - { - PowerMock.mockStaticNice(FileSystemHelper.class); - FileSystem fileSystem = PowerMock.createMock(FileSystem.class); - EasyMock.expect(FileSystemHelper.get( - EasyMock.anyObject(URI.class), - EasyMock.anyObject(Configuration.class) - )).andReturn(fileSystem); - EasyMock.expect(fileSystem.exists(EasyMock.anyObject(Path.class))).andReturn(false); - EasyMock.expect(fileSystem.rename(EasyMock.anyObject(Path.class), EasyMock.anyObject(Path.class))) - .andReturn(renameSuccess); - } -} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java deleted file mode 100644 index 0b867630cd10..000000000000 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexer; - -import com.google.common.collect.ImmutableList; -import org.apache.druid.indexer.updater.MetadataStorageUpdaterJobSpec; -import org.easymock.EasyMock; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.List; -import java.util.stream.Collectors; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ - IndexGeneratorJob.class -}) -@PowerMockIgnore({"javax.net.ssl.*"}) -public class MetadataStorageUpdaterJobTest -{ - private static final List DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS = ImmutableList.of( - new DataSegmentAndIndexZipFilePath(null, null, null) - ); - private static final String SEGMENT_TABLE = "segments"; - private HadoopIngestionSpec spec; - private HadoopIOConfig ioConfig; - private MetadataStorageUpdaterJobSpec metadataUpdateSpec; - private HadoopDruidIndexerConfig config; - private MetadataStorageUpdaterJobHandler handler; - private MetadataStorageUpdaterJob target; - - @Test - public void test_run() - { - metadataUpdateSpec = PowerMock.createMock(MetadataStorageUpdaterJobSpec.class); - ioConfig = PowerMock.createMock(HadoopIOConfig.class); - spec = PowerMock.createMock(HadoopIngestionSpec.class); - config = PowerMock.createMock(HadoopDruidIndexerConfig.class); - handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class); - PowerMock.mockStaticNice(IndexGeneratorJob.class); - - EasyMock.expect(metadataUpdateSpec.getSegmentTable()).andReturn(SEGMENT_TABLE); - EasyMock.expect(ioConfig.getMetadataUpdateSpec()).andReturn(metadataUpdateSpec); - EasyMock.expect(spec.getIOConfig()).andReturn(ioConfig); - EasyMock.expect(config.getSchema()).andReturn(spec); - EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config)) - .andReturn(DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS); - handler.publishSegments( - SEGMENT_TABLE, - DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS.stream().map(s -> s.getSegment()).collect( - Collectors.toList()), HadoopDruidIndexerConfig.JSON_MAPPER); - EasyMock.expectLastCall(); - target = new MetadataStorageUpdaterJob(config, handler); - - PowerMock.replayAll(); - - target.run(); - - PowerMock.verifyAll(); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 37ffb4cd7905..b66ae471136c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -31,8 +30,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import org.apache.commons.lang.BooleanUtils; -import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath; import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob; import org.apache.druid.indexer.HadoopDruidIndexerConfig; import org.apache.druid.indexer.HadoopDruidIndexerJob; @@ -86,7 +83,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class HadoopIndexTask extends HadoopTask implements ChatHandler { @@ -311,197 +307,170 @@ public TaskStatus runTask(TaskToolbox toolbox) @SuppressWarnings("unchecked") private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { - boolean indexGeneratorJobAttempted = false; - boolean indexGeneratorJobSuccess = false; - HadoopIngestionSpec indexerSchema = null; - try { - registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); - String hadoopJobIdFile = getHadoopJobIdFileName(); - final ClassLoader loader = buildClassLoader(toolbox); - boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); - - HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( - spec, - jsonMapper, - new OverlordActionBasedUsedSegmentsRetriever(toolbox) - ); + registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); + String hadoopJobIdFile = getHadoopJobIdFileName(); + final ClassLoader loader = buildClassLoader(toolbox); + boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); + + HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + spec, + jsonMapper, + new OverlordActionBasedUsedSegmentsRetriever(toolbox) + ); - Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", - loader - ); - determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner); + Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", + loader + ); + determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner); - String[] determinePartitionsInput = new String[]{ - toolbox.getJsonMapper().writeValueAsString(spec), - toolbox.getConfig().getHadoopWorkingPath(), - toolbox.getSegmentPusher().getPathForHadoop(), - hadoopJobIdFile - }; + String[] determinePartitionsInput = new String[]{ + toolbox.getJsonMapper().writeValueAsString(spec), + toolbox.getConfig().getHadoopWorkingPath(), + toolbox.getSegmentPusher().getPathForHadoop(), + hadoopJobIdFile + }; - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - Class determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass(); - Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod( - "runTask", - determinePartitionsInput.getClass() - ); - try { - Thread.currentThread().setContextClassLoader(loader); + HadoopIngestionSpec indexerSchema; + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + Class determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass(); + Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod( + "runTask", + determinePartitionsInput.getClass() + ); + try { + Thread.currentThread().setContextClassLoader(loader); - ingestionState = IngestionState.DETERMINE_PARTITIONS; + ingestionState = IngestionState.DETERMINE_PARTITIONS; - final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke( - determinePartitionsInnerProcessingRunner, - new Object[]{determinePartitionsInput} - ); + final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke( + determinePartitionsInnerProcessingRunner, + new Object[]{determinePartitionsInput} + ); - determineConfigStatus = toolbox - .getJsonMapper() - .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class); + determineConfigStatus = toolbox + .getJsonMapper() + .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class); - indexerSchema = determineConfigStatus.getSchema(); - if (indexerSchema == null) { - errorMsg = determineConfigStatus.getErrorMsg(); - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.failure( - getId(), - errorMsg - ); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); - } - - // We should have a lock from before we started running only if interval was specified - String version; - if (determineIntervals) { - Interval interval = JodaUtils.umbrellaInterval( - JodaUtils.condenseIntervals( - indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() - ) - ); - final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); - // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. - final TaskLock lock = Preconditions.checkNotNull( - toolbox.getTaskActionClient().submit( - new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs) - ), - "Cannot acquire a lock for interval[%s]", interval + indexerSchema = determineConfigStatus.getSchema(); + if (indexerSchema == null) { + errorMsg = determineConfigStatus.getErrorMsg(); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure( + getId(), + errorMsg ); - version = lock.getVersion(); - } else { - Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); - final TaskLock myLock = Iterables.getOnlyElement(locks); - version = myLock.getVersion(); } + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } - final String specVersion = indexerSchema.getTuningConfig().getVersion(); - if (indexerSchema.getTuningConfig().isUseExplicitVersion()) { - if (specVersion.compareTo(version) < 0) { - version = specVersion; - } else { - log.error( - "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].", - specVersion, - version - ); - toolbox.getTaskReportFileWriter().write(getId(), null); - return TaskStatus.failure(getId()); - } + // We should have a lock from before we started running only if interval was specified + String version; + if (determineIntervals) { + Interval interval = JodaUtils.umbrellaInterval( + JodaUtils.condenseIntervals( + indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() + ) + ); + final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. + final TaskLock lock = Preconditions.checkNotNull( + toolbox.getTaskActionClient().submit( + new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs) + ), + "Cannot acquire a lock for interval[%s]", interval + ); + version = lock.getVersion(); + } else { + Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); + final TaskLock myLock = Iterables.getOnlyElement(locks); + version = myLock.getVersion(); + } + + final String specVersion = indexerSchema.getTuningConfig().getVersion(); + if (indexerSchema.getTuningConfig().isUseExplicitVersion()) { + if (specVersion.compareTo(version) < 0) { + version = specVersion; + } else { + log.error( + "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].", + specVersion, + version + ); + toolbox.getTaskReportFileWriter().write(getId(), null); + return TaskStatus.failure(getId()); } + } - log.info("Setting version to: %s", version); + log.info("Setting version to: %s", version); - Object innerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", - loader - ); - buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); + Object innerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", + loader + ); + buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); - String[] buildSegmentsInput = new String[]{ - toolbox.getJsonMapper().writeValueAsString(indexerSchema), - version, - hadoopJobIdFile - }; + String[] buildSegmentsInput = new String[]{ + toolbox.getJsonMapper().writeValueAsString(indexerSchema), + version, + hadoopJobIdFile + }; - Class buildSegmentsRunnerClass = innerProcessingRunner.getClass(); - Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass()); + Class buildSegmentsRunnerClass = innerProcessingRunner.getClass(); + Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass()); - try { - Thread.currentThread().setContextClassLoader(loader); + try { + Thread.currentThread().setContextClassLoader(loader); - ingestionState = IngestionState.BUILD_SEGMENTS; - indexGeneratorJobAttempted = true; - final String jobStatusString = (String) innerProcessingRunTask.invoke( - innerProcessingRunner, - new Object[]{buildSegmentsInput} - ); + ingestionState = IngestionState.BUILD_SEGMENTS; + final String jobStatusString = (String) innerProcessingRunTask.invoke( + innerProcessingRunner, + new Object[]{buildSegmentsInput} + ); - buildSegmentsStatus = toolbox.getJsonMapper().readValue( - jobStatusString, - HadoopIndexGeneratorInnerProcessingStatus.class - ); + buildSegmentsStatus = toolbox.getJsonMapper().readValue( + jobStatusString, + HadoopIndexGeneratorInnerProcessingStatus.class + ); - List dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths(); - if (dataSegmentAndIndexZipFilePaths != null) { - indexGeneratorJobSuccess = true; - try { - Thread.currentThread().setContextClassLoader(oldLoader); - renameSegmentIndexFilesJob( - toolbox.getJsonMapper().writeValueAsString(indexerSchema), - toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths) - ); - } - finally { - Thread.currentThread().setContextClassLoader(loader); - } - ArrayList segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream() - .map( - DataSegmentAndIndexZipFilePath::getSegment) - .collect(Collectors.toList())); - toolbox.publishSegments(segments); - - // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value - // for awaitSegmentAvailabilityTimeoutMillis - if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { - ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( - toolbox, - segments, - spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() - ); - } - - ingestionState = IngestionState.COMPLETED; - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.success(getId()); - } else { - errorMsg = buildSegmentsStatus.getErrorMsg(); - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.failure( - getId(), - errorMsg + if (buildSegmentsStatus.getDataSegments() != null) { + toolbox.publishSegments(buildSegmentsStatus.getDataSegments()); + + // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value + // for awaitSegmentAvailabilityTimeoutMillis + if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { + ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; + ArrayList segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments()); + segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + toolbox, + segmentsToWaitFor, + spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() ); } + + ingestionState = IngestionState.COMPLETED; + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.success(getId()); + } else { + errorMsg = buildSegmentsStatus.getErrorMsg(); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure( + getId(), + errorMsg + ); } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); - } + } + catch (Exception e) { + throw new RuntimeException(e); } finally { - indexerGeneratorCleanupJob( - indexGeneratorJobAttempted, - indexGeneratorJobSuccess, - indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema) - ); + Thread.currentThread().setContextClassLoader(oldLoader); } } @@ -545,96 +514,6 @@ private void killHadoopJob() } } - private void renameSegmentIndexFilesJob( - String hadoopIngestionSpecStr, - String dataSegmentAndIndexZipFilePathListStr - ) - { - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - try { - ClassLoader loader = HadoopTask.buildClassLoader( - getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates() - ); - - Object renameSegmentIndexFilesRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner", - loader - ); - - String[] renameSegmentIndexFilesJobInput = new String[]{ - hadoopIngestionSpecStr, - dataSegmentAndIndexZipFilePathListStr - }; - - Class buildRenameSegmentIndexFilesJobRunnerClass = renameSegmentIndexFilesRunner.getClass(); - Method renameSegmentIndexFiles = buildRenameSegmentIndexFilesJobRunnerClass.getMethod( - "runTask", - renameSegmentIndexFilesJobInput.getClass() - ); - - Thread.currentThread().setContextClassLoader(loader); - renameSegmentIndexFiles.invoke( - renameSegmentIndexFilesRunner, - new Object[]{renameSegmentIndexFilesJobInput} - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); - } - } - - private void indexerGeneratorCleanupJob( - boolean indexGeneratorJobAttempted, - boolean indexGeneratorJobSuccess, - String hadoopIngestionSpecStr - ) - { - if (!indexGeneratorJobAttempted) { - log.info("No need for cleanup as index generator job did not even run"); - return; - } - - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - try { - ClassLoader loader = HadoopTask.buildClassLoader( - getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates() - ); - - Object indexerGeneratorCleanupRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner", - loader - ); - - String[] indexerGeneratorCleanupJobInput = new String[]{ - indexGeneratorJobSuccess ? "true" : "false", - hadoopIngestionSpecStr, - }; - - Class buildIndexerGeneratorCleanupRunnerClass = indexerGeneratorCleanupRunner.getClass(); - Method indexerGeneratorCleanup = buildIndexerGeneratorCleanupRunnerClass.getMethod( - "runTask", - indexerGeneratorCleanupJobInput.getClass() - ); - - Thread.currentThread().setContextClassLoader(loader); - indexerGeneratorCleanup.invoke( - indexerGeneratorCleanupRunner, - new Object[]{indexerGeneratorCleanupJobInput} - ); - } - catch (Exception e) { - log.warn(e, "Failed to cleanup after index generator job"); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); - } - } - @GET @Path("/rowStats") @Produces(MediaType.APPLICATION_JSON) @@ -843,7 +722,7 @@ public String runTask(String[] args) throws Exception if (job.run()) { return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString( new HadoopIndexGeneratorInnerProcessingStatus( - job.getPublishedSegmentAndIndexZipFilePaths(), + job.getPublishedSegments(), job.getStats(), null ) @@ -911,111 +790,28 @@ public String[] runTask(String[] args) throws Exception } } - @SuppressWarnings("unused") - public static class HadoopRenameSegmentIndexFilesRunner - { - TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH = - new TypeReference>() - { - }; - - public void runTask(String[] args) throws Exception - { - if (args.length != 2) { - log.warn("HadoopRenameSegmentIndexFilesRunner called with improper number of arguments"); - } - String hadoopIngestionSpecStr = args[0]; - String dataSegmentAndIndexZipFilePathListStr = args[1]; - - HadoopIngestionSpec indexerSchema; - List dataSegmentAndIndexZipFilePaths; - try { - indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( - hadoopIngestionSpecStr, - HadoopIngestionSpec.class - ); - dataSegmentAndIndexZipFilePaths = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( - dataSegmentAndIndexZipFilePathListStr, - LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH - ); - } - catch (Exception e) { - log.warn( - e, - "HadoopRenameSegmentIndexFilesRunner: Error occurred while trying to read input parameters into data objects" - ); - throw e; - } - JobHelper.renameIndexFilesForSegments( - indexerSchema, - dataSegmentAndIndexZipFilePaths - ); - } - } - - @SuppressWarnings("unused") - public static class HadoopIndexerGeneratorCleanupRunner - { - TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH = - new TypeReference>() - { - }; - - public void runTask(String[] args) throws Exception - { - if (args.length != 2) { - log.warn("HadoopIndexerGeneratorCleanupRunner called with improper number of arguments"); - } - - String indexGeneratorJobSucceededStr = args[0]; - String hadoopIngestionSpecStr = args[1]; - - HadoopIngestionSpec indexerSchema; - boolean indexGeneratorJobSucceeded; - List dataSegmentAndIndexZipFilePaths; - try { - indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( - hadoopIngestionSpecStr, - HadoopIngestionSpec.class - ); - indexGeneratorJobSucceeded = BooleanUtils.toBoolean(indexGeneratorJobSucceededStr); - } - catch (Exception e) { - log.warn( - e, - "HadoopIndexerGeneratorCleanupRunner: Error occurred while trying to read input parameters into data objects" - ); - throw e; - } - JobHelper.maybeDeleteIntermediatePath( - indexGeneratorJobSucceeded, - indexerSchema - ); - } - } - public static class HadoopIndexGeneratorInnerProcessingStatus { - private final List dataSegmentAndIndexZipFilePaths; + private final List dataSegments; private final Map metrics; private final String errorMsg; @JsonCreator public HadoopIndexGeneratorInnerProcessingStatus( - @JsonProperty("dataSegmentAndIndexZipFilePaths") List dataSegmentAndIndexZipFilePaths, + @JsonProperty("dataSegments") List dataSegments, @JsonProperty("metrics") Map metrics, @JsonProperty("errorMsg") String errorMsg ) { - this.dataSegmentAndIndexZipFilePaths = dataSegmentAndIndexZipFilePaths; + this.dataSegments = dataSegments; this.metrics = metrics; this.errorMsg = errorMsg; } @JsonProperty - public List getDataSegmentAndIndexZipFilePaths() + public List getDataSegments() { - return dataSegmentAndIndexZipFilePaths; + return dataSegments; } @JsonProperty diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 8ad9cec8ffac..2fb9adc239e9 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -102,10 +102,6 @@ javax.servlet servlet-api - - com.squareup.okhttp - okhttp - diff --git a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java index b227ababb449..4235abbf884c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java @@ -117,12 +117,9 @@ public void run() ); List jobs = new ArrayList<>(); - HadoopDruidIndexerJob indexerJob = new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)); jobs.add(new HadoopDruidDetermineConfigurationJob(config)); - jobs.add(indexerJob); - boolean jobsSucceeded = JobHelper.runJobs(jobs); - JobHelper.renameIndexFilesForSegments(config.getSchema(), indexerJob.getPublishedSegmentAndIndexZipFilePaths()); - JobHelper.maybeDeleteIntermediatePath(jobsSucceeded, config.getSchema()); + jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class))); + JobHelper.runJobs(jobs, config); } catch (Exception e) {