Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions extensions/hdfs-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<artifactId>hadoop-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
Expand All @@ -69,6 +74,32 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.17.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
import sun.net.www.protocol.hdfs.Handler;

import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -66,6 +67,9 @@ public void configure(Binder binder)
}
}

// Silly non-injected legacy java
Handler.setConfiguration(conf);

binder.bind(Configuration.class).toInstance(conf);
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sun.net.www.protocol.hdfs;

import com.metamx.common.IAE;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;

/**
*
*/
public class Handler extends URLStreamHandler
{
private static Configuration conf = new Configuration(true);

public static void setConfiguration(Configuration conf){
Handler.conf = conf;
}

@Override
protected URLConnection openConnection(final URL u) throws IOException
{
final org.apache.hadoop.fs.Path path;
try {
path = new org.apache.hadoop.fs.Path(u.toURI());
}
catch (URISyntaxException e) {
throw new IAE(e, "Malformed URL [%s]", u.toString());
}

final URLConnection connection = new URLConnection(u)
{
private final org.apache.hadoop.fs.FileSystem fs = path.getFileSystem(new Configuration(true));

@Override
public void connect() throws IOException
{
if (!fs.exists(path)) {
throw new IOException("File does not exist");
}
}

@Override
public InputStream getInputStream() throws IOException
{
return fs.open(path);
}
};
connection.connect();
return connection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sun.net.www.protocol.hdfs;

import com.metamx.common.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;

/**
*
*/
public class HandlerTest
{
private static MiniDFSCluster miniCluster;
private static File tmpDir;
private static URI uriBase;
private static Path filePath = new Path("/tmp/foo");
private static String pathContents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum";
private static byte[] pathByteContents = StringUtils.toUtf8(pathContents);

@BeforeClass
public static void setupStatic() throws IOException, ClassNotFoundException
{
tmpDir = File.createTempFile("hdfsHandlerTest", "dir");
tmpDir.deleteOnExit();
if (!tmpDir.delete()) {
throw new IOException(String.format("Unable to delete tmpDir [%s]", tmpDir.getAbsolutePath()));
}
Configuration conf = new Configuration(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.getAbsolutePath());
miniCluster = new MiniDFSCluster.Builder(conf).build();
uriBase = miniCluster.getURI(0);

final File tmpFile = File.createTempFile("hdfsHandlerTest", "data");
tmpFile.delete();
try {
tmpFile.deleteOnExit();
Files.copy(new ByteArrayInputStream(pathByteContents), tmpFile.toPath());
try (OutputStream stream = miniCluster.getFileSystem().create(filePath)) {
Files.copy(tmpFile.toPath(), stream);
}
}
finally {
tmpFile.delete();
}
}

@AfterClass
public static void tearDownStatic()
{
if (miniCluster != null) {
miniCluster.getNameNode(0).stop();
}
}

@Test
public void testSimpleUrl() throws IOException
{
final URL url = new URL(uriBase.toString() + filePath.toString());
final File file = File.createTempFile("hdfsHandlerTest", "data");
file.delete();
file.deleteOnExit();
try {
try (InputStream stream = url.openStream()) {
Files.copy(stream, file.toPath());
}
Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(file.toPath()));
}
finally {
file.delete();
}
}

@Test(expected = java.io.IOException.class)
public void testBadUrl() throws IOException
{
final URL url = new URL(uriBase.toString() + "/dsfjkafhdsajklfhdjlkshfjhasfhaslhfasjk");
try (InputStream stream = url.openStream()) {

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.druid.storage.s3;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.google.common.collect.ImmutableList;

import java.util.Collection;

/**
*
*/
public class AWSCredentialsGuiceBridge
{
private static AWSCredentialsProviderChain chain = null;
public void setCredentials(AWSCredentialsProviderChain chain){
AWSCredentialsGuiceBridge.chain = chain;
}
public AWSCredentialsProviderChain getCredentials(){
return chain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.druid.common.aws.AWSCredentialsConfig;
import io.druid.common.aws.AWSCredentialsUtils;
import io.druid.guice.Binders;
Expand Down Expand Up @@ -59,6 +62,22 @@ public void configure(Binder binder)
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
binder.bind(AWSCredentialsGuiceBridge.class).toProvider(AWSCredentialsGuiceBridgeProvider.class).asEagerSingleton(); // Fix lack of DI in ServiceLoader
}

private static class AWSCredentialsGuiceBridgeProvider implements Provider<AWSCredentialsGuiceBridge>
{
private final AWSCredentialsConfig config;
@Inject
public AWSCredentialsGuiceBridgeProvider(final AWSCredentialsConfig config){
this.config = config;
}
public AWSCredentialsGuiceBridge get()
{
AWSCredentialsGuiceBridge bridge = new AWSCredentialsGuiceBridge();
bridge.setCredentials(AWSCredentialsUtils.defaultAWSCredentialsProviderChain(config));
return bridge;
}
}

@Provides
Expand Down
Loading