Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions indexing-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,52 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.compile.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public boolean run()
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
JobHelper.setupClasspath(config, groupByJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need those classpath changes, now that we abstracted the hadoop task stuff?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, yes. The abstractions were not perfect because some functions (like this JobHelper still took specific configs instead of specific distributed classpaths.


config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public boolean run()
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobHelper.setupClasspath(config, groupByJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);

config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
Expand Down Expand Up @@ -186,7 +186,7 @@ public boolean run()
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(config, dimSelectionJob);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob);

config.addJobProperties(dimSelectionJob);
config.intoConfiguration(dimSelectionJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean run()

config.intoConfiguration(job);

JobHelper.setupClasspath(config, job);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job);

job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
Expand Down
137 changes: 128 additions & 9 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,15 +40,18 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
Expand All @@ -58,6 +62,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

/**
Expand All @@ -68,12 +73,21 @@ public class JobHelper

private static final Set<Path> existing = Sets.newHashSet();

private static final int NUM_RETRIES = 6;
private static final int SECONDS_BETWEEN_RETRIES = 10;
private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2;

public static Path distributedClassPath(String path)
{
return distributedClassPath(new Path(path));
}

public static Path distributedClassPath(Path base)
{
return new Path(base, "classpath");
}

public static void setupClasspath(
HadoopDruidIndexerConfig config,
Path distributedClassPath,
Job job
)
throws IOException
Expand All @@ -86,7 +100,6 @@ public static void setupClasspath(
String[] jarFiles = classpathProperty.split(File.pathSeparator);

final Configuration conf = job.getConfiguration();
final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final FileSystem fs = distributedClassPath.getFileSystem(conf);

if (fs instanceof LocalFileSystem) {
Expand Down Expand Up @@ -216,7 +229,7 @@ public static DataSegment serializeOutIndex(
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
public long push() throws IOException
{
try (OutputStream outputStream = fileContext.create(
tmpPath,
Expand All @@ -231,9 +244,10 @@ public void push() throws IOException
log.error(exception, "Exception in retry loop");
throw exception;
}
return -1;
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
zipPusher.push();
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
Expand Down Expand Up @@ -294,7 +308,7 @@ public static void writeSegmentDescriptor(
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
public long push() throws IOException
{
try {
progressable.progress();
Expand All @@ -317,9 +331,10 @@ public void push() throws IOException
log.info(ex, "Error in retry loop");
throw ex;
}
return -1;
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
descriptorPusher.push();
}
Expand All @@ -329,7 +344,7 @@ public void push() throws IOException
*/
public interface DataPusher
{
void push() throws IOException;
long push() throws IOException;
}

public static long zipAndCopyDir(
Expand Down Expand Up @@ -425,4 +440,108 @@ public static Path prependFSIfNullScheme(FileSystem fs, Path path)
}
return path;
}

// TODO: Replace this whenever hadoop gets their act together and stops breaking with more recent versions of Guava
public static long unzipNoGuava(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol
+1 on this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much of this method is copypasta from guava?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero, it is actually because com.metamx.common.CompressionUtils uses byte source and byte sink in the "useful" methods. Also com.google.common.io.ByteStreams#copy(java.io.InputStream, java.io.OutputStream) is used quite a bit in there.

final Path zip,
final Configuration configuration,
final File outDir,
final Progressable progressable
) throws IOException
{
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
DataPusher.class, new DataPusher()
{
@Override
public long push() throws IOException
{
final FileContext context = FileContext.getFileContext(zip.toUri(), configuration);
long size = 0L;
final byte[] buffer = new byte[1 << 13];
progressable.progress();
try (ZipInputStream in = new ZipInputStream(context.open(zip, 1 << 13))) {
for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
final String fileName = entry.getName();
try (final OutputStream out = new BufferedOutputStream(
new FileOutputStream(
outDir.getAbsolutePath()
+ File.separator
+ fileName
), 1 << 13
)) {
for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
progressable.progress();
if (len == 0) {
continue;
}
size += len;
out.write(buffer, 0, len);
}
out.flush();
}
}
}
catch (IOException | RuntimeException exception) {
log.error(exception, "Exception in retry loop");
throw exception;
}
progressable.progress();
return size;
}
},
RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
return zipPusher.push();
}

public static ProgressIndicator progressIndicatorForContext(
final TaskAttemptContext context
)
{
return new ProgressIndicator()
{

@Override
public void progress()
{
context.progress();
}

@Override
public void start()
{
context.progress();
context.setStatus("STARTED");
}

@Override
public void stop()
{
context.progress();
context.setStatus("STOPPED");
}

@Override
public void startSection(String section)
{
context.progress();
context.setStatus(String.format("STARTED [%s]", section));
}

@Override
public void progressSection(String section, String message)
{
log.info("Progress message for section [%s] : [%s]", section, message);
context.progress();
context.setStatus(String.format("PROGRESS [%s]", section));
}

@Override
public void stopSection(String section)
{
context.progress();
context.setStatus(String.format("STOPPED [%s]", section));
}
};
}
}
Loading