Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable SplitH
}

@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
if (firehoseFactory.isSplittable()) {
return firehoseFactory.getNumSplits(splitHintSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* @see FiniteFirehoseFactory#getSplits(SplitHintSpec)
* @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec)
* @see SplittableInputSource#createSplits
* @see SplittableInputSource#getNumSplits
* @see SplittableInputSource#estimateNumSplits
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable S
}

@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return uris.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, @Nullable
}

@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return Iterators.size(getFileIterator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,27 @@ default boolean isSplittable()
/**
* Creates a {@link Stream} of {@link InputSplit}s. The returned stream is supposed to be evaluated lazily to avoid
* consuming too much memory.
* Note that this interface also has {@link #getNumSplits} which is related to this method. The implementations
* Note that this interface also has {@link #estimateNumSplits} which is related to this method. The implementations
* should be careful to <i>NOT</i> cache the created splits in memory.
*
* Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to create splits
* in the same way with {@link #getNumSplits}.
* in the same way with {@link #estimateNumSplits}.
*/
Stream<InputSplit<T>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;

/**
* Returns the total number of splits to be created via {@link #createSplits}.
* This method can be expensive since it needs to iterate all directories or whatever substructure
* to find all input objects.
* Returns an estimated total number of splits to be created via {@link #createSplits}. The estimated number of splits
* doesn't have to be accurate and can be different from the actual number of InputSplits returned from
* {@link #createSplits}. This will be used to estimate the progress of a phase in parallel indexing.
* See TaskMonitor for more details of the progress estimation.
*
* This method can be expensive if an implementation iterates all directories or whatever substructure
* to find all input entities.
*
* Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to find splits
* in the same way with {@link #createSplits}.
*/
int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;
int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;

/**
* Helper method for ParallelIndexSupervisorTask.
Expand Down
4 changes: 2 additions & 2 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Returns the name of the current phase if the task running in the parallel mode.

* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`

Returns the current progress if the supervisor task is running in the parallel mode.
Returns the estimated progress of the current phase if the supervisor task is running in the parallel mode.

An example of the result is

Expand All @@ -285,7 +285,7 @@ An example of the result is
"failed":0,
"complete":0,
"total":10,
"expectedSucceeded":10
"estimatedExpectedSucceeded":10
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable S
}

@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return uris.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public Stream<InputSplit<Path>> createSplits(InputFormat inputFormat, @Nullable
}

@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
cachePathsIfNeeded();
return cachedPaths.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void hasCorrectSplits() throws IOException
@Test
public void hasCorrectNumberOfSplits() throws IOException
{
int numSplits = target.getNumSplits(null, null);
int numSplits = target.estimateNumSplits(null, null);
Assert.assertEquals(NUM_FILE, numSplits);
}
}
Expand Down Expand Up @@ -295,7 +295,7 @@ public void hasCorrectSplits() throws IOException
@Test
public void hasCorrectNumberOfSplits() throws IOException
{
int numSplits = target.getNumSplits(null, null);
int numSplits = target.estimateNumSplits(null, null);
Assert.assertEquals(0, numSplits);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ Iterator<SubTaskSpec<T>> subTaskSpecIterator() throws IOException
}

@Override
final int getTotalNumSubTasks() throws IOException
final int estimateTotalNumSubTasks() throws IOException
{
return baseInputSource.getNumSplits(
return baseInputSource.estimateNumSplits(
ingestionSchema.getIOConfig().getInputFormat(),
getTuningConfig().getSplitHintSpec()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -112,23 +113,23 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
/**
* Returns the total number of sub tasks required to execute this phase.
*/
abstract int getTotalNumSubTasks() throws IOException;
abstract int estimateTotalNumSubTasks() throws IOException;

@Override
public TaskState run() throws Exception
{
if (getTotalNumSubTasks() == 0) {
final CountingSubTaskSpecIterator subTaskSpecIterator = new CountingSubTaskSpecIterator(subTaskSpecIterator());
if (!subTaskSpecIterator.hasNext()) {
LOG.warn("There's no input split to process");
return TaskState.SUCCESS;
}

final Iterator<SubTaskSpec<SubTaskType>> subTaskSpecIterator = subTaskSpecIterator();
final long taskStatusCheckingPeriod = tuningConfig.getTaskStatusCheckPeriodMs();

taskMonitor = new TaskMonitor<>(
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"),
tuningConfig.getMaxRetry(),
getTotalNumSubTasks()
estimateTotalNumSubTasks()
);
TaskState state = TaskState.RUNNING;

Expand Down Expand Up @@ -165,15 +166,15 @@ public TaskState run() throws Exception
// We have no more subTasks to run
if (taskMonitor.getNumRunningTasks() == 0 && taskCompleteEvents.isEmpty()) {
subTaskScheduleAndMonitorStopped = true;
if (taskMonitor.isSucceeded()) {
if (subTaskSpecIterator.count == taskMonitor.getNumSucceededTasks()) {
// Succeeded
state = TaskState.SUCCESS;
} else {
// Failed
final SinglePhaseParallelIndexingProgress monitorStatus = taskMonitor.getProgress();
final ParallelIndexingPhaseProgress monitorStatus = taskMonitor.getProgress();
throw new ISE(
"Expected for [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks",
monitorStatus.getExpectedSucceeded(),
"Expected [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks",
subTaskSpecIterator.count,
monitorStatus.getSucceeded(),
monitorStatus.getFailed()
);
Expand Down Expand Up @@ -226,6 +227,33 @@ public TaskState run() throws Exception
return state;
}

private class CountingSubTaskSpecIterator implements Iterator<SubTaskSpec<SubTaskType>>
{
private final Iterator<SubTaskSpec<SubTaskType>> delegate;
private int count;

private CountingSubTaskSpecIterator(Iterator<SubTaskSpec<SubTaskType>> delegate)
{
this.delegate = delegate;
}

@Override
public boolean hasNext()
{
return delegate.hasNext();
}

@Override
public SubTaskSpec<SubTaskType> next()
{
if (!delegate.hasNext()) {
throw new NoSuchElementException();
}
count++;
return delegate.next();
}
}

private boolean isRunning()
{
return !subTaskScheduleAndMonitorStopped && !Thread.currentThread().isInterrupted();
Expand Down Expand Up @@ -321,9 +349,9 @@ public Map<String, SubTaskReportType> getReports()
}

@Override
public ParallelIndexingProgress getProgress()
public ParallelIndexingPhaseProgress getProgress()
{
return taskMonitor == null ? SinglePhaseParallelIndexingProgress.notRunning() : taskMonitor.getProgress();
return taskMonitor == null ? ParallelIndexingPhaseProgress.notRunning() : taskMonitor.getProgress();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public interface ParallelIndexTaskRunner<SubTaskType extends Task, SubTaskReport
Map<String, SubTaskReportType> getReports();

/**
* Returns the current {@link ParallelIndexingProgress}.
* Returns the current {@link ParallelIndexingPhaseProgress}.
*/
ParallelIndexingProgress getProgress();
ParallelIndexingPhaseProgress getProgress();

/**
* Returns the IDs of current running tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Represents the current progress of {@link SinglePhaseParallelIndexTaskRunner}.
* Represents the current progress of a single phase in parallel indexing.
* All subclasses of {@link ParallelIndexPhaseRunner} can use this class to indicate their progress.
*/
class SinglePhaseParallelIndexingProgress implements ParallelIndexingProgress
class ParallelIndexingPhaseProgress
{
/**
* Number of running tasks.
Expand Down Expand Up @@ -53,31 +54,32 @@ class SinglePhaseParallelIndexingProgress implements ParallelIndexingProgress
private final int total;

/**
* Number of succeeded tasks for {@link SinglePhaseParallelIndexTaskRunner} to succeed.
* Estimated number of succeeded tasks for {@link SinglePhaseParallelIndexTaskRunner} to succeed.
* This can be different from the actual number of tasks to succeed.
*/
private final int expectedSucceeded;
private final int estimatedExpectedSucceeded;

static SinglePhaseParallelIndexingProgress notRunning()
static ParallelIndexingPhaseProgress notRunning()
{
return new SinglePhaseParallelIndexingProgress(0, 0, 0, 0, 0, -1);
return new ParallelIndexingPhaseProgress(0, 0, 0, 0, 0, -1);
}

@JsonCreator
SinglePhaseParallelIndexingProgress(
ParallelIndexingPhaseProgress(
@JsonProperty("running") int running,
@JsonProperty("succeeded") int succeeded,
@JsonProperty("failed") int failed,
@JsonProperty("complete") int complete,
@JsonProperty("total") int total,
@JsonProperty("expectedSucceeded") int expectedSucceeded
@JsonProperty("estimatedExpectedSucceeded") int estimatedExpectedSucceeded
)
{
this.running = running;
this.succeeded = succeeded;
this.failed = failed;
this.complete = complete;
this.total = total;
this.expectedSucceeded = expectedSucceeded;
this.estimatedExpectedSucceeded = estimatedExpectedSucceeded;
}

@JsonProperty
Expand Down Expand Up @@ -111,8 +113,8 @@ public int getTotal()
}

@JsonProperty
public int getExpectedSucceeded()
public int getEstimatedExpectedSucceeded()
{
return expectedSucceeded;
return estimatedExpectedSucceeded;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Iterator<SubTaskSpec<PartialHashSegmentMergeTask>> subTaskSpecIterator()
}

@Override
int getTotalNumSubTasks()
int estimateTotalNumSubTasks()
{
return mergeIOConfigs.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ Iterator<SubTaskSpec<SinglePhaseSubTask>> subTaskSpecIterator() throws IOExcepti
}

@Override
int getTotalNumSubTasks() throws IOException
int estimateTotalNumSubTasks() throws IOException
{
return baseInputSource.getNumSplits(
return baseInputSource.estimateNumSplits(
ingestionSchema.getIOConfig().getInputFormat(),
getTuningConfig().getSplitHintSpec()
);
Expand Down
Loading