Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions indexing-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ public boolean run()
if (config.isDeterminingPartitions()) {
job = createPartitionJob(config);
config.setHadoopJobIdFileName(hadoopJobIdFile);
boolean jobSucceeded = JobHelper.runSingleJob(job);
JobHelper.maybeDeleteIntermediatePath(
jobSucceeded,
config.getSchema()
);
return jobSucceeded;
return JobHelper.runSingleJob(job, config);
} else {
final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
final int shardsPerInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -39,7 +40,7 @@ public class HadoopDruidIndexerJob implements Jobby
@Nullable
private IndexGeneratorJob indexJob;
@Nullable
private volatile List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = null;
private volatile List<DataSegment> publishedSegments = null;
@Nullable
private String hadoopJobIdFile;

Expand Down Expand Up @@ -90,14 +91,14 @@ public boolean run()
@Override
public boolean run()
{
publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true;
}
}
);

config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runJobs(jobs);
return JobHelper.runJobs(jobs, config);
}

@Override
Expand All @@ -121,12 +122,12 @@ public String getErrorMessage()
return indexJob.getErrorMessage();
}

public List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths()
public List<DataSegment> getPublishedSegments()
{
if (publishedSegmentAndIndexZipFilePaths == null) {
if (publishedSegments == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
}
return publishedSegmentAndIndexZipFilePaths;
return publishedSegments;
}

public void setHadoopJobIdFile(String hadoopJobIdFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,24 @@ public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);

public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config)
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);

final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;

ImmutableList.Builder<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder();
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();

final Path descriptorInfoDir = config.makeDescriptorInfoDir();

try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);

for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath);
log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId());
final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class);
publishedSegmentsBuilder.add(segment);
log.info("Adding segment %s to the list of published segments", segment.getId());
}
}
catch (FileNotFoundException e) {
Expand All @@ -133,9 +133,9 @@ public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZi
catch (IOException e) {
throw new RuntimeException(e);
}
List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build();
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();

return publishedSegmentAndIndexZipFilePaths;
return publishedSegments;
}

private final HadoopDruidIndexerConfig config;
Expand Down Expand Up @@ -809,7 +809,7 @@ public void doRun()
0
);

final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
final DataSegment segment = JobHelper.serializeOutIndex(
segmentTemplate,
context.getConfiguration(),
context,
Expand All @@ -831,7 +831,7 @@ public void doRun()
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
);

Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
Path descriptorPath = config.makeDescriptorInfoPath(segment);
descriptorPath = JobHelper.prependFSIfNullScheme(
FileSystem.get(
descriptorPath.toUri(),
Expand All @@ -842,7 +842,7 @@ public void doRun()
log.info("Writing descriptor to path[%s]", descriptorPath);
JobHelper.writeSegmentDescriptor(
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
segmentAndIndexZipFilePath,
segment,
descriptorPath,
context
);
Expand Down
Loading