From 9cc9a81b3d0c89f721732071b4cbe7fc97feb6e0 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 23 Apr 2018 15:48:57 -0700 Subject: [PATCH 1/4] Lazy init of fullyQualifiedStorageDirectory in HDFS pusher --- .../storage/hdfs/HdfsDataSegmentPusher.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 96db0b28210d..9a1f6740710c 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -53,7 +53,8 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final Configuration hadoopConfig; private final ObjectMapper jsonMapper; - private final String fullyQualifiedStorageDirectory; + private final Path storageDir; + private String fullyQualifiedStorageDirectory; @Inject public HdfsDataSegmentPusher( @@ -64,11 +65,7 @@ public HdfsDataSegmentPusher( { this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; - Path storageDir = new Path(config.getStorageDirectory()); - this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) - .makeQualified(storageDir) - .toUri() - .toString(); + this.storageDir = new Path(config.getStorageDirectory()); log.info("Configured HDFS as deep storage"); } @@ -83,12 +80,16 @@ public String getPathForHadoop(String dataSource) @Override public String getPathForHadoop() { + initFullyQualifiedStorageDirectory(); + return fullyQualifiedStorageDirectory; } @Override public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException { + initFullyQualifiedStorageDirectory(); + final String storageDir = this.getStorageDir(segment); log.info( @@ -230,4 +231,19 @@ public String makeIndexPathName(DataSegment dataSegment, String indexName) indexName ); } + + private void initFullyQualifiedStorageDirectory() + { + try { + if (fullyQualifiedStorageDirectory == null) { + fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) + .makeQualified(storageDir) + .toUri() + .toString(); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } From ba4443cf6cf196a5538e90b1cc4010e883701c9a Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 23 Apr 2018 20:42:35 -0700 Subject: [PATCH 2/4] Comment --- .../main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 9a1f6740710c..d65617102505 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -232,6 +232,9 @@ public String makeIndexPathName(DataSegment dataSegment, String indexName) ); } + + // We lazily initialiize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. + // Please see https://github.com/druid-io/druid/pull/5684 private void initFullyQualifiedStorageDirectory() { try { From 31c644738c93cc79f5069a536086af95403b0645 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 23 Apr 2018 21:55:31 -0700 Subject: [PATCH 3/4] Fix test --- .../java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index e53d1ea3e0a4..bd69c323318c 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -107,7 +107,7 @@ public void testPushWithScheme() throws Exception @Test public void testPushWithBadScheme() throws Exception { - expectedException.expect(IOException.class); + expectedException.expect(RuntimeException.class); expectedException.expectMessage("No FileSystem for scheme"); testUsingScheme("xyzzy"); From 52867bd18c6a5c4bfc36de47288870bd4d28a0d5 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 24 Apr 2018 16:50:48 -0700 Subject: [PATCH 4/4] PR comments --- .../storage/hdfs/HdfsDataSegmentPusher.java | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index d65617102505..a7fc9ff40a6b 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -20,6 +20,8 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; @@ -53,8 +55,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final Configuration hadoopConfig; private final ObjectMapper jsonMapper; - private final Path storageDir; - private String fullyQualifiedStorageDirectory; + + // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. + // Please see https://github.com/druid-io/druid/pull/5684 + private final Supplier fullyQualifiedStorageDirectory; @Inject public HdfsDataSegmentPusher( @@ -65,7 +69,20 @@ public HdfsDataSegmentPusher( { this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; - this.storageDir = new Path(config.getStorageDirectory()); + Path storageDir = new Path(config.getStorageDirectory()); + this.fullyQualifiedStorageDirectory = Suppliers.memoize( + () -> { + try { + return FileSystem.newInstance(storageDir.toUri(), hadoopConfig) + .makeQualified(storageDir) + .toUri() + .toString(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ); log.info("Configured HDFS as deep storage"); } @@ -80,28 +97,24 @@ public String getPathForHadoop(String dataSource) @Override public String getPathForHadoop() { - initFullyQualifiedStorageDirectory(); - - return fullyQualifiedStorageDirectory; + return fullyQualifiedStorageDirectory.get(); } @Override public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException { - initFullyQualifiedStorageDirectory(); - final String storageDir = this.getStorageDir(segment); log.info( "Copying segment[%s] to HDFS at location[%s/%s]", segment.getIdentifier(), - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir ); Path tmpIndexFile = new Path(StringUtils.format( "%s/%s/%s/%s_index.zip", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), segment.getDataSource(), UUIDUtils.generateUuid(), segment.getShardSpec().getPartitionNum() @@ -117,13 +130,13 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting size = CompressionUtils.zip(inDir, out); final Path outIndexFile = new Path(StringUtils.format( "%s/%s/%d_index.zip", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum() )); final Path outDescriptorFile = new Path(StringUtils.format( "%s/%s/%d_descriptor.json", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum() )); @@ -231,22 +244,4 @@ public String makeIndexPathName(DataSegment dataSegment, String indexName) indexName ); } - - - // We lazily initialiize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. - // Please see https://github.com/druid-io/druid/pull/5684 - private void initFullyQualifiedStorageDirectory() - { - try { - if (fullyQualifiedStorageDirectory == null) { - fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) - .makeQualified(storageDir) - .toUri() - .toString(); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - } }