diff --git a/extensions/hdfs-storage/pom.xml b/extensions/hdfs-storage/pom.xml index 84f32723d12e..e30f74b218ee 100644 --- a/extensions/hdfs-storage/pom.xml +++ b/extensions/hdfs-storage/pom.xml @@ -54,6 +54,11 @@ hadoop-client compile + + org.apache.hadoop + hadoop-common + compile + com.metamx emitter @@ -69,6 +74,32 @@ junit test + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-common + 2.3.0 + tests + test + + + org.apache.hadoop + hadoop-hdfs + 2.3.0 + test + + + com.sun.jersey + jersey-servlet + 1.17.1 + test + diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index fca37d71358c..a9a7183ba79e 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -28,6 +28,7 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs; import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.hadoop.conf.Configuration; +import sun.net.www.protocol.hdfs.Handler; import java.util.List; import java.util.Properties; @@ -66,6 +67,9 @@ public void configure(Binder binder) } } + // Silly non-injected legacy java + Handler.setConfiguration(conf); + binder.bind(Configuration.class).toInstance(conf); JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class); diff --git a/extensions/hdfs-storage/src/main/java/sun/net/www/protocol/hdfs/Handler.java b/extensions/hdfs-storage/src/main/java/sun/net/www/protocol/hdfs/Handler.java new file mode 100644 index 000000000000..dcbcbb44d259 --- /dev/null +++ b/extensions/hdfs-storage/src/main/java/sun/net/www/protocol/hdfs/Handler.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sun.net.www.protocol.hdfs; + +import com.metamx.common.IAE; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +/** + * + */ +public class Handler extends URLStreamHandler +{ + private static Configuration conf = new Configuration(true); + + public static void setConfiguration(Configuration conf){ + Handler.conf = conf; + } + + @Override + protected URLConnection openConnection(final URL u) throws IOException + { + final org.apache.hadoop.fs.Path path; + try { + path = new org.apache.hadoop.fs.Path(u.toURI()); + } + catch (URISyntaxException e) { + throw new IAE(e, "Malformed URL [%s]", u.toString()); + } + + final URLConnection connection = new URLConnection(u) + { + private final org.apache.hadoop.fs.FileSystem fs = path.getFileSystem(new Configuration(true)); + + @Override + public void connect() throws IOException + { + if (!fs.exists(path)) { + throw new IOException("File does not exist"); + } + } + + @Override + public InputStream getInputStream() throws IOException + { + return fs.open(path); + } + }; + connection.connect(); + return connection; + } +} diff --git a/extensions/hdfs-storage/src/test/java/sun/net/www/protocol/hdfs/HandlerTest.java b/extensions/hdfs-storage/src/test/java/sun/net/www/protocol/hdfs/HandlerTest.java new file mode 100644 index 000000000000..249ea11c6492 --- /dev/null +++ b/extensions/hdfs-storage/src/test/java/sun/net/www/protocol/hdfs/HandlerTest.java @@ -0,0 +1,111 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sun.net.www.protocol.hdfs; + +import com.metamx.common.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; + +/** + * + */ +public class HandlerTest +{ + private static MiniDFSCluster miniCluster; + private static File tmpDir; + private static URI uriBase; + private static Path filePath = new Path("/tmp/foo"); + private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum"; + private static byte[] pathByteContents = StringUtils.toUtf8(pathContents); + + @BeforeClass + public static void setupStatic() throws IOException, ClassNotFoundException + { + tmpDir = File.createTempFile("hdfsHandlerTest", "dir"); + tmpDir.deleteOnExit(); + if (!tmpDir.delete()) { + throw new IOException(String.format("Unable to delete tmpDir [%s]", tmpDir.getAbsolutePath())); + } + Configuration conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + uriBase = miniCluster.getURI(0); + + final File tmpFile = File.createTempFile("hdfsHandlerTest", "data"); + tmpFile.delete(); + try { + tmpFile.deleteOnExit(); + Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath()); + try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) { + Files.copy(tmpFile.toPath(), stream); + } + } + finally { + tmpFile.delete(); + } + } + + @AfterClass + public static void tearDownStatic() + { + if (miniCluster != null) { + miniCluster.getNameNode(0).stop(); + } + } + + @Test + public void testSimpleUrl() throws IOException + { + final URL url = new URL(uriBase.toString() + filePath.toString()); + final File file = File.createTempFile("hdfsHandlerTest", "data"); + file.delete(); + file.deleteOnExit(); + try { + try (InputStream stream = url.openStream()) { + Files.copy(stream, file.toPath()); + } + Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(file.toPath())); + } + finally { + file.delete(); + } + } + + @Test(expected = java.io.IOException.class) + public void testBadUrl() throws IOException + { + final URL url = new URL(uriBase.toString() + "/dsfjkafhdsajklfhdjlkshfjhasfhaslhfasjk"); + try (InputStream stream = url.openStream()) { + + } + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/AWSCredentialsGuiceBridge.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/AWSCredentialsGuiceBridge.java new file mode 100644 index 000000000000..e93572e7a248 --- /dev/null +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/AWSCredentialsGuiceBridge.java @@ -0,0 +1,38 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.google.common.collect.ImmutableList; + +import java.util.Collection; + +/** + * + */ +public class AWSCredentialsGuiceBridge +{ + private static AWSCredentialsProviderChain chain = null; + public void setCredentials(AWSCredentialsProviderChain chain){ + AWSCredentialsGuiceBridge.chain = chain; + } + public AWSCredentialsProviderChain getCredentials(){ + return chain; + } +} diff --git a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index c1e7dae641f2..188bc14fca47 100644 --- a/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -21,7 +21,10 @@ import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; import com.google.inject.Provides; +import com.google.inject.Singleton; import io.druid.common.aws.AWSCredentialsConfig; import io.druid.common.aws.AWSCredentialsUtils; import io.druid.guice.Binders; @@ -59,6 +62,22 @@ public void configure(Binder binder) Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); binder.bind(S3TaskLogs.class).in(LazySingleton.class); + binder.bind(AWSCredentialsGuiceBridge.class).toProvider(AWSCredentialsGuiceBridgeProvider.class).asEagerSingleton(); // Fix lack of DI in ServiceLoader + } + + private static class AWSCredentialsGuiceBridgeProvider implements Provider + { + private final AWSCredentialsConfig config; + @Inject + public AWSCredentialsGuiceBridgeProvider(final AWSCredentialsConfig config){ + this.config = config; + } + public AWSCredentialsGuiceBridge get() + { + AWSCredentialsGuiceBridge bridge = new AWSCredentialsGuiceBridge(); + bridge.setCredentials(AWSCredentialsUtils.defaultAWSCredentialsProviderChain(config)); + return bridge; + } } @Provides diff --git a/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3/Handler.java b/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3/Handler.java new file mode 100644 index 000000000000..3fba8788e5cf --- /dev/null +++ b/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3/Handler.java @@ -0,0 +1,138 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sun.net.www.protocol.s3; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.auth.SystemPropertiesCredentialsProvider; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.metamx.common.IAE; +import io.druid.storage.s3.AWSCredentialsGuiceBridge; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.util.ServiceLoader; + +/** + * + */ +public class Handler extends URLStreamHandler +{ + @Override + protected URLConnection openConnection(URL url) throws IOException + { + final String userInfo = url.getUserInfo(); + final AWSCredentialsProviderChain chain; + if (!Strings.isNullOrEmpty(userInfo)) { + final String user; + final String pass; + final String[] userPass = userInfo.split(":"); + if (userPass.length != 2) { + throw new IAE("Malformed key/secret"); + } + if (!Strings.isNullOrEmpty(userPass[0])) { + user = userPass[0]; + } else { + throw new IAE("Missing access key"); + } + if (!Strings.isNullOrEmpty(userPass[1])) { + pass = userPass[1]; + } else { + throw new IAE("Missing access secret"); + } + chain = new AWSCredentialsProviderChain( + new StaticCredentialsProvider( + new AWSCredentials() + { + @Override + public String getAWSAccessKeyId() + { + return user; + } + + @Override + public String getAWSSecretKey() + { + return pass; + } + } + ), new EnvironmentVariableCredentialsProvider(), + new SystemPropertiesCredentialsProvider(), + new ProfileCredentialsProvider(), + new InstanceProfileCredentialsProvider() + ); + } else { + final ServiceLoader bridge = ServiceLoader.load( + AWSCredentialsGuiceBridge.class, + getClass().getClassLoader() + ); + chain = bridge.iterator().next().getCredentials(); + } + + final String host = url.getHost(); + if (Strings.isNullOrEmpty(host)) { + throw new IAE("Must specify host/bucket"); + } + final String[] hostParts = host.split("\\."); + final String bucket = hostParts[0]; + + final String path = url.getPath(); + if (Strings.isNullOrEmpty(path)) { + throw new IAE("Must specify path/key"); + } + + if (path.length() < 2) { + throw new IAE("Cannot have zero length key. Found [%s]", path); + } + final String key = path.substring(1); // eliminate `/` + + + final URLConnection connection = new URLConnection(url) + { + AmazonS3Client amazonS3Client; + + @Override + public void connect() throws IOException + { + amazonS3Client = new AmazonS3Client(chain); + } + + @Override + public InputStream getInputStream() + { + final GetObjectRequest request = new GetObjectRequest(bucket, key); + final S3Object s3Object = amazonS3Client.getObject(request); + return s3Object.getObjectContent(); + } + }; + connection.connect(); + return connection; + } +} diff --git a/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3n/Handler.java b/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3n/Handler.java new file mode 100644 index 000000000000..a093b147cd61 --- /dev/null +++ b/extensions/s3-extensions/src/main/java/sun/net/www/protocol/s3n/Handler.java @@ -0,0 +1,26 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sun.net.www.protocol.s3n; + +/** + * + */ +public class Handler extends sun.net.www.protocol.s3.Handler +{ + // Exactly the same +} diff --git a/extensions/s3-extensions/src/main/resources/META-INF/services/io.druid.storage.s3.AWSCredentialsGuiceBridge b/extensions/s3-extensions/src/main/resources/META-INF/services/io.druid.storage.s3.AWSCredentialsGuiceBridge new file mode 100644 index 000000000000..9daf67844d69 --- /dev/null +++ b/extensions/s3-extensions/src/main/resources/META-INF/services/io.druid.storage.s3.AWSCredentialsGuiceBridge @@ -0,0 +1 @@ +io.druid.storage.s3.AWSCredentialsGuiceBridge \ No newline at end of file diff --git a/extensions/s3-extensions/src/test/java/sun/net/www/protocol/s3/HandlerTest.java b/extensions/s3-extensions/src/test/java/sun/net/www/protocol/s3/HandlerTest.java new file mode 100644 index 000000000000..a8fe61c13a6a --- /dev/null +++ b/extensions/s3-extensions/src/test/java/sun/net/www/protocol/s3/HandlerTest.java @@ -0,0 +1,180 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sun.net.www.protocol.s3; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provider; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.metamx.common.IAE; +import io.druid.common.aws.AWSCredentialsConfig; +import io.druid.common.aws.AWSCredentialsUtils; +import io.druid.storage.s3.AWSCredentialsGuiceBridge; +import io.druid.storage.s3.S3StorageDruidModule; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; + +/** + * + */ +public class HandlerTest +{ + + private static class AWSCredentialsGuiceBridgeProvider implements Provider{ + public AWSCredentialsGuiceBridge get(){ + AWSCredentialsGuiceBridge bridge = new AWSCredentialsGuiceBridge(); + bridge.setCredentials( + new AWSCredentialsProviderChain( + new AWSCredentialsProvider() + { + @Override + public AWSCredentials getCredentials() + { + return new AWSCredentials() + { + @Override + public String getAWSAccessKeyId() + { + return "abc"; + } + + @Override + public String getAWSSecretKey() + { + return "def"; + } + }; + } + + @Override + public void refresh() + { + // NOOP + } + } + ) + ); + return bridge; + } + } + private static final String prefix = "s3"; + private static final String goodURL = prefix + "://elasticmapreduce/images/integrating_data_predicate_1.png"; + @BeforeClass + public static void setupStatic(){ + Injector injector = Guice.createInjector( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(AWSCredentialsGuiceBridge.class).toProvider(AWSCredentialsGuiceBridgeProvider.class).asEagerSingleton(); + } + } + ); + } + + @Test(expected = AmazonS3Exception.class) + // Test will fail due to bad credentials + public void testGeneralURL() throws IOException + { + final URL url = new URL(goodURL); + File file = File.createTempFile("handlerTest", ".png"); + file.deleteOnExit(); + try { + if (!file.delete()) { + throw new IOException(String.format("Unable to delete file [%s]", file.getAbsolutePath())); + } + try (InputStream stream = url.openStream()) { + Files.copy(stream, file.toPath()); + }catch(AmazonS3Exception ex){ + Assert.assertTrue(ex.getMessage().startsWith("The AWS Access Key Id you provided does not exist in our records")); + throw ex; + } + } + finally { + file.delete(); + } + } + + @Test(expected = IAE.class) + public void testBadSecret() throws IOException + { + final URL url = new URL(prefix + "://user:@bucket/key"); + try(InputStream stream = url.openStream()){ + + } + } + + @Test(expected = IAE.class) + public void testBadUser() throws IOException + { + final URL url = new URL(prefix + "://:secret@bucket/key"); + try(InputStream stream = url.openStream()){ + + } + } + + @Test(expected = IAE.class) + public void testBadUserInfo() throws IOException + { + final URL url = new URL(prefix + "://whoops@bucket/key"); + try(InputStream stream = url.openStream()){ + + } + } + + @Test(expected = IAE.class) + public void testBadBucket() throws IOException + { + final URL url = new URL(prefix + ":///key"); + try(InputStream stream = url.openStream()){ + + } + } + @Test(expected = IAE.class) + public void testBadKey() throws IOException + { + final URL url = new URL(prefix + "://bucket/"); + try(InputStream stream = url.openStream()){ + + } + } + + @Test(expected = IAE.class) + public void testBadBucketKey() throws IOException + { + final URL url = new URL(prefix + "://bucket"); + try(InputStream stream = url.openStream()){ + + } + } +} diff --git a/pom.xml b/pom.xml index 4a930fb9f5a0..9e35ff209723 100644 --- a/pom.xml +++ b/pom.xml @@ -443,6 +443,12 @@ 2.3.0 provided + + org.apache.hadoop + hadoop-common + 2.3.0 + provided + org.mapdb mapdb