Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

public interface DataSegmentPusher
{
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;
//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -174,4 +175,17 @@ public DataSegment call() throws Exception
}
}
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either use this in uploadDataSegment or else have them use a common helper method.

{
return ImmutableMap.<String, Object>of(
"type",
AzureStorageDruidModule.SCHEME,
"containerName",
config.getContainer(),
"blobPath",
uri.toString()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.astyanax.recipes.storage.ChunkedStorage;

import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
Expand All @@ -36,6 +37,8 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

/**
* Cassandra Segment Pusher
Expand All @@ -46,7 +49,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
{
private static final Logger log = new Logger(CassandraDataSegmentPusher.class);
private static final int CONCURRENCY = 10;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final ObjectMapper jsonMapper;

@Inject
Expand Down Expand Up @@ -96,7 +99,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
} catch (Exception e)
Expand All @@ -114,4 +117,10 @@ ImmutableMap.<String, Object> of("type", "c*", "key", key)
compressedIndexFile.delete();
return segment;
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new IAE("not supported");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use spaces instead of tabs for indenting. Also this should technically be an UnsupportedOperationException.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;

public class CloudFilesDataSegmentPusher implements DataSegmentPusher
Expand Down Expand Up @@ -146,4 +148,19 @@ public DataSegment call() throws Exception
}
}
}

@Override
public Map<String, Object> makeLoadSpec(URI uri)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit : for each pusher the makeLoadSpec logic seems duplicated with the push method, this can be extracted to a common method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @nishantmonu51, to avoid duplicated code please use this in push, or else have them both call a common helper method.

{
return ImmutableMap.<String, Object>of(
"type",
CloudFilesStorageDruidModule.SCHEME,
"region",
objectApi.getRegion(),
"container",
objectApi.getContainer(),
"path",
uri.toString()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

public class GoogleDataSegmentPusher implements DataSegmentPusher
{
Expand Down Expand Up @@ -82,7 +84,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen
return descriptorFile;
}

public void insert(final File file, final String contentType, final String path) throws IOException {
public void insert(final File file, final String contentType, final String path) throws IOException
{
LOG.info("Inserting [%s] to [%s]", file, path);

FileInputStream fileSteam = new FileInputStream(file);
Expand Down Expand Up @@ -117,7 +120,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr
"bucket", config.getBucket(),
"path", indexPath
)
)
)
.withBinaryVersion(version);

descriptorFile = createDescriptorFile(jsonMapper, outSegment);
Expand All @@ -129,7 +132,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr
}
catch (Exception e) {
throw Throwables.propagate(e);
} finally {
}
finally {
if (indexFile != null) {
LOG.info("Deleting file [%s]", indexFile);
indexFile.delete();
Expand All @@ -142,6 +146,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr
}
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use this in push too, or else have them both call a common helper method.

{
return ImmutableMap.<String, Object>of(
"type",
GoogleStorageDruidModule.SCHEME,
"bucket",
config.getBucket(),
"path",
finalIndexZipFilePath.getPath().substring(1) // remove the leading "/"
);
}

public String buildPath(final String path)
{
if (config.getPrefix() != "") {
Expand Down
7 changes: 6 additions & 1 deletion extensions-core/hdfs-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,17 @@
<artifactId>emitter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;

/**
*/
Expand Down Expand Up @@ -114,7 +116,7 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
));
final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
segment.withLoadSpec(makeLoadSpec(outFile.toUri()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(),
Expand Down Expand Up @@ -153,6 +155,12 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException
return dataSegment;
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", finalIndexZipFilePath.toString());
}

private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
Expand All @@ -163,11 +171,6 @@ private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final
return segment;
}

private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toUri().toString());
}

private static class HdfsOutputStreamSupplier extends ByteSink
{
private final FileSystem fs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;

public class S3DataSegmentPusher implements DataSegmentPusher
Expand Down Expand Up @@ -149,4 +151,14 @@ public DataSegment call() throws Exception
throw Throwables.propagate(e);
}
}

@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to other pushers about code duplication.

{
return ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", finalIndexZipFilePath.getHost(),
"key", finalIndexZipFilePath.getPath().substring(1) // remove the leading "/"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
Expand Down Expand Up @@ -92,9 +93,11 @@ public class HadoopDruidIndexerConfig
public static final IndexMerger INDEX_MERGER;
public static final IndexMergerV9 INDEX_MERGER_V9;
public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;

public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";



static {
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
Expand All @@ -118,6 +121,7 @@ public void configure(Binder binder)
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class);
DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class);
}

public static enum IndexJobCounters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate
)
),
config.DATA_SEGMENT_PUSHER
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a static, so this would be more clear as HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER.

Copy link
Copy Markdown
Contributor

@gianm gianm Mar 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3a shouldn't really be here… probably just like determining the loadSpec, the directory should be based on the kind of deep storage configured and not on the scheme. I think adding a getStorageDir to DataSegmentPusher, and getting rid of DataSegmentPusherUtil, would solve that. It could use java 8 default methods to prevent any pusher other than HDFS from having to override it.

Oops, this comment was in the wrong spot. Moved to https://github.com/druid-io/druid/pull/3940/files#r107766956

);

Path descriptorPath = config.makeDescriptorInfoPath(segment);
Expand Down
46 changes: 7 additions & 39 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
Expand All @@ -36,6 +35,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -379,7 +379,8 @@ public static DataSegment serializeOutIndex(
final Progressable progressable,
final TaskAttemptID taskAttemptID,
final File mergedBase,
final Path segmentBasePath
final Path segmentBasePath,
DataSegmentPusher dataSegmentPusher
)
throws IOException
{
Expand Down Expand Up @@ -415,43 +416,8 @@ public long push() throws IOException

final Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip");
final URI indexOutURI = finalIndexZipFilePath.toUri();
final ImmutableMap<String, Object> loadSpec;
// TODO: Make this a part of Pushers or Pullers
switch (outputFS.getScheme()) {
case "hdfs":
case "viewfs":
case "maprfs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.toString()
);
break;
case "gs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "google",
"bucket", indexOutURI.getHost(),
"path", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "s3":
case "s3n":
loadSpec = ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", indexOutURI.getHost(),
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "file":
loadSpec = ImmutableMap.<String, Object>of(
"type", "local",
"path", indexOutURI.getPath()
);
break;
default:
throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme());
}
final DataSegment finalSegment = segmentTemplate
.withLoadSpec(loadSpec)
.withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI))
.withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));

Expand Down Expand Up @@ -583,7 +549,9 @@ public static Path makeSegmentOutputPath(
DataSegment segment
)
{
String segmentDir = "hdfs".equals(fileSystem.getScheme()) || "viewfs".equals(fileSystem.getScheme())
String segmentDir = "hdfs".equals(fileSystem.getScheme())
|| "viewfs".equals(fileSystem.getScheme())
|| "s3a".equals(fileSystem.getScheme())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm how about the issue with : it still not supported by hadoop thought.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3a shouldn't really be here… probably just like determining the loadSpec, the directory should be based on the kind of deep storage configured and not on the scheme. I think adding a getStorageDir to DataSegmentPusher, and getting rid of DataSegmentPusherUtil, would solve that. It could use java 8 default methods to prevent any pusher other than HDFS from having to override it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hum this method is used everywhere... anyway will change that.

? DataSegmentPusherUtil.getHdfsStorageDir(segment)
: DataSegmentPusherUtil.getStorageDir(segment);
return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ protected void map(
baseOutputPath,
outputFS,
finalSegmentTemplate
)
),
config.DATA_SEGMENT_PUSHER
);
context.progress();
context.setStatus("Finished PUSH");
Expand Down
Loading