Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package io.druid.storage.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
Expand Down Expand Up @@ -53,7 +55,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher

private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;
private final String fullyQualifiedStorageDirectory;

// We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA.
// Please see https://github.com/druid-io/druid/pull/5684
private final Supplier<String> fullyQualifiedStorageDirectory;

@Inject
public HdfsDataSegmentPusher(
Expand All @@ -65,10 +70,19 @@ public HdfsDataSegmentPusher(
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig)
.makeQualified(storageDir)
.toUri()
.toString();
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {
try {
return FileSystem.newInstance(storageDir.toUri(), hadoopConfig)
.makeQualified(storageDir)
.toUri()
.toString();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
);

log.info("Configured HDFS as deep storage");
}
Expand All @@ -83,7 +97,7 @@ public String getPathForHadoop(String dataSource)
@Override
public String getPathForHadoop()
{
return fullyQualifiedStorageDirectory;
return fullyQualifiedStorageDirectory.get();
}

@Override
Expand All @@ -94,13 +108,13 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting
log.info(
"Copying segment[%s] to HDFS at location[%s/%s]",
segment.getIdentifier(),
fullyQualifiedStorageDirectory,
fullyQualifiedStorageDirectory.get(),
storageDir
);

Path tmpIndexFile = new Path(StringUtils.format(
"%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory,
fullyQualifiedStorageDirectory.get(),
segment.getDataSource(),
UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum()
Expand All @@ -116,13 +130,13 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting
size = CompressionUtils.zip(inDir, out);
final Path outIndexFile = new Path(StringUtils.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory,
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum()
));
final Path outDescriptorFile = new Path(StringUtils.format(
"%s/%s/%d_descriptor.json",
fullyQualifiedStorageDirectory,
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testPushWithScheme() throws Exception
@Test
public void testPushWithBadScheme() throws Exception
{
expectedException.expect(IOException.class);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("No FileSystem for scheme");
testUsingScheme("xyzzy");

Expand Down