Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
Expand Down Expand Up @@ -136,7 +136,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -187,7 +186,7 @@ public enum Status
private TaskToolbox toolbox;

private volatile Appenderator appenderator = null;
private volatile AppenderatorDriver driver = null;
private volatile StreamAppenderatorDriver driver = null;
private volatile FireDepartmentMetrics fireDepartmentMetrics = null;
private volatile DateTime startTime;
private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
Expand Down Expand Up @@ -372,7 +371,7 @@ private void createAndStartPublishExecutor()
Joiner.on(", ").join(
result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
),
result.getCommitMetadata()
Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata")
);
}

Expand Down Expand Up @@ -423,9 +422,11 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
if (getContext() != null && getContext().get("checkpoints") != null) {
log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints"));
final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
(String) getContext().get("checkpoints"), new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
(String) getContext().get("checkpoints"),
new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
});
}
);

Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
Expand Down Expand Up @@ -709,7 +710,7 @@ public void run()

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of AppenderatorDriver to make a new segment.
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
if (!sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
Expand Down Expand Up @@ -747,7 +748,6 @@ public void onFailure(Throwable t)
}
}
);

}
}
catch (ParseException e) {
Expand Down Expand Up @@ -850,7 +850,7 @@ public void onFailure(Throwable t)
Joiner.on(", ").join(
handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
),
handedOff.getCommitMetadata()
Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
);
}
}
Expand Down Expand Up @@ -940,9 +940,9 @@ private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception
);

try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
Expand Down Expand Up @@ -1104,7 +1104,7 @@ public void run()

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of AppenderatorDriver to make a new segment.
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
.add(addResult.getSegmentIdentifier());
Expand Down Expand Up @@ -1170,7 +1170,7 @@ public void run()

final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);

Expand Down Expand Up @@ -1230,7 +1230,7 @@ public String apply(DataSegment input)
}
)
),
handedOff.getCommitMetadata()
Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
);
}
}
Expand Down Expand Up @@ -1269,7 +1269,7 @@ private void checkAndMaybeThrowException()
}

private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
throws ExecutionException, InterruptedException
throws InterruptedException
{
for (SequenceMetadata sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(nextOffsets);
Expand Down Expand Up @@ -1793,13 +1793,13 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
);
}

private AppenderatorDriver newDriver(
private StreamAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final FireDepartmentMetrics metrics
)
{
return new AppenderatorDriver(
return new StreamAppenderatorDriver(
appenderator,
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
Expand Down Expand Up @@ -2222,7 +2222,7 @@ public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean u
{
return (segments, commitMetadata) -> {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_PUBLISH_PARTITIONS),
((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS),
KafkaPartitions.class
);

Expand Down
Loading