Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,12 @@ private void serializeOutIndex(Context context, Bucket bucket, File mergedBase,

int attemptNumber = context.getTaskAttemptID().getId();

FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem(
context.getConfiguration()
);
final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket);
final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));

outputFS.mkdirs(indexBasePath);

Expand Down Expand Up @@ -502,7 +503,7 @@ private void serializeOutIndex(Context context, Bucket bucket, File mergedBase,
// retry 1 minute
boolean success = false;
for (int i = 0; i < 6; i++) {
if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
success = true;
break;
Expand Down Expand Up @@ -532,7 +533,7 @@ private void serializeOutIndex(Context context, Bucket bucket, File mergedBase,
outputFS.delete(indexZipFilePath, true);
} else {
outputFS.delete(finalIndexZipFilePath, true);
if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
throw new ISE(
"Files [%s] and [%s] are different, but still cannot rename after retry loop",
indexZipFilePath.toUri().getPath(),
Expand Down
8 changes: 4 additions & 4 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class JobHelper

public static void setupClasspath(
HadoopDruidIndexerConfig config,
Job groupByJob
Job job
)
throws IOException
{
Expand All @@ -59,9 +59,9 @@ public static void setupClasspath(

String[] jarFiles = classpathProperty.split(File.pathSeparator);

final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final Configuration conf = job.getConfiguration();
final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final FileSystem fs = distributedClassPath.getFileSystem(conf);

if (fs instanceof LocalFileSystem) {
return;
Expand Down