From 54bd6b1d2666ab6f6eab6922d418ec04c9add6b6 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 9 Sep 2015 18:42:59 -0700 Subject: [PATCH] Set context classloader to the one used to load the hdfs extension in HdfsDataSegmentPusher * Fixes https://github.com/druid-io/druid/issues/1714 --- .../storage/hdfs/HdfsDataSegmentPusher.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index aa821bb60932..a8d9921fa51d 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -70,33 +70,41 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); - - log.info( - "Copying segment[%s] to HDFS at location[%s/%s]", - segment.getIdentifier(), - config.getStorageDirectory(), - storageDir - ); - - Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); - FileSystem fs = outFile.getFileSystem(hadoopConfig); - - fs.mkdirs(outFile.getParent()); - log.info("Compressing files from[%s] to [%s]", inDir, outFile); - - final long size; - try (FSDataOutputStream out = fs.create(outFile)) { - size = CompressionUtils.zip(inDir, out); + final ClassLoader priorLoader = Thread.currentThread().getContextClassLoader(); + try { + // See https://github.com/druid-io/druid/issues/1714 + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); + + log.info( + "Copying segment[%s] to HDFS at location[%s/%s]", + segment.getIdentifier(), + config.getStorageDirectory(), + storageDir + ); + + Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); + FileSystem fs = outFile.getFileSystem(hadoopConfig); + + fs.mkdirs(outFile.getParent()); + log.info("Compressing files from[%s] to [%s]", inDir, outFile); + + final long size; + try (FSDataOutputStream out = fs.create(outFile)) { + size = CompressionUtils.zip(inDir, out); + } + + return createDescriptorFile( + segment.withLoadSpec(makeLoadSpec(outFile)) + .withSize(size) + .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), + outFile.getParent(), + fs + ); + } + finally { + Thread.currentThread().setContextClassLoader(priorLoader); } - - return createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outFile)) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), - outFile.getParent(), - fs - ); } private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException