Skip to content

Change early publishing to early pushing in indexTask & refactor AppenderatorDriver#5297

Merged
jihoonson merged 10 commits intoapache:masterfrom
jihoonson:fix-index-task
Feb 14, 2018
Merged

Change early publishing to early pushing in indexTask & refactor AppenderatorDriver#5297
jihoonson merged 10 commits intoapache:masterfrom
jihoonson:fix-index-task

Conversation

@jihoonson
Copy link
Copy Markdown
Contributor

@jihoonson jihoonson commented Jan 25, 2018

This PR is to fix a bug in indexTask. In #4238, indexTask is improved to support incrementally publishing segments which means, segments are incrementally published one by one during batch ingestion. This makes sense for normal ingestion. However, when it comes to reindexing, it can cause a problem of that an early published segment overshadows all old segments even before reindexing completes.

So, in this PR, the indexTask is changed to early push segments. All pushed segments are published at the end of the indexTask.

To do so, I also refactored AppenderatorDriver and added two child classes, FiniteAppenderatorDriver and InfiniteAppenderatorDriver, which are specialized for batch and realtime indexing, respectively. This is needed because the segment lifecycle is different in batch and realtime indexing. In batch indexing, the lifecycle of a segment is APPENDING -> PUSHED -> DROPPED -> PUBLISHED, while in the realtime indexing, it's APPENDING -> PUSHED -> PUBLISHED -> DROPPED. To reduce complexity, only some fundamental methods are remained in AppenderatorDriver. All specialized methods are moved to either FiniteAppenderatorDriver or InfiniteAppenderatorDriver.


This change is Reviewable

@jihoonson
Copy link
Copy Markdown
Contributor Author

I think this bug is critical and this PR should be included in 0.12.0.

@gianm gianm added this to the 0.12.0 milestone Jan 26, 2018
@himanshug
Copy link
Copy Markdown
Contributor

@jihoonson will new segments have NumberedShardSpec ? if that is true, then even if some of them are published , they wouldn't overshadow older segments because Broker would notice that not all partitions in the set are available yet ?
What am I missing?

@jihoonson
Copy link
Copy Markdown
Contributor Author

@himanshug, let me suppose an use case of kafka indexing service + compaction. The segments created by kafkaIndexTask will have the NumberedShardSpec which has 0 core partition (partitions in NumberedShardSpec). This means, the broker can't notice that all segments are compacted or not.

This can happen even for batch ingestion. For example, if some more data is appended to an existing dataSource, the partition id (partitionNum in NumberedShardSpec) can be increased beyond the number of core partitions (partitions in NumberedShardSpec).

@himanshug
Copy link
Copy Markdown
Contributor

my understanding is that, if compaction produces a brand new partition set with a new version AND NumberedShardSpec (with partitions > 0 and partitionNum < partitions for all partitions) then Broker shouldn't consider old data overshadowed unless all segments in partition set produced by compaction are loaded.
I thought that compaction would fulfill above conditions.

otherwise, yes ... partition set would be assumed complete with any number of segments available.

However, IndexTask is more general than compation so I think the PR still makes sense.

@jihoonson
Copy link
Copy Markdown
Contributor Author

Ah, I got your point. The new segments created by a compactionTask will have the NumberedShardSpec with partitions > 0 only when the task runs with the perfect rollup mode (forceGuaranteedRollup should be set in TuningConfig). This is possible since compactionTask allows for users to specify any options in the tuningConfig. In this case, the new segments won't overshadow old segments until the all segments in each partition are published.

However, if forceGuaranteedRollup is not set, the compactionTask also generates segments having the NumberedShardSpec with 0 partitions.

* commit metadata for this persist
*/
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, Committer committer);
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would persist be called with null committer when it would be a noop? shouldn't persist call throw IAE when committer is null ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its behavior is different. With the noop committer, Appenderator commits intermediate state which involving writing data on local disk. If the committer is null, Appenderator skips committing intermediate state and writes nothing. Since IndexTask is not restorable, committing intermediate state is unnecessary.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean IndexTask should never be calling persist and allowIncrementalPersists should be set to false on all add(..) calls (which then shouldn't/wouldn't call persist(..)) from IndexTask

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexTask should be able to persist intermediate data ingested so far to avoid out of disk problem. I guess you were confused because of the wrong javadoc. See #5297 (comment).

* dropped from local storage</li>
* </ul>
*/
public class InfiniteAppenderatorDriver extends AppenderatorDriver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this StreamAppenderatorDriver instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* <li>PUBLISHED: Segment's metadata is published to metastore.</li>
* </ul>
*/
public class FiniteAppenderatorDriver extends AppenderatorDriver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this BatchAppenderatorDriver instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the one what I considered at first. :) Changed.

* you pass in. It's wrapped in some extra metadata needed by the driver.
*/
public class AppenderatorDriver implements Closeable
public abstract class AppenderatorDriver implements Closeable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseAppenderatorDriver?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private final SegmentIdentifier segmentIdentifier;
private SegmentState state;
@Nullable private DataSegment dataSegment;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you include some comments about what these are used for?

It looks like you added dataSegment in this patch, and it's used to remember what DataSegment object was created after a push. I think it's worth a comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments for this variable and pushAndDrop() method.

protected final Appenderator appenderator;
// sequenceName -> segmentsForSequence
// This map should be locked with itself before accessing it.
// Note: FiniteAppenderatorDriver currently doesn't need to lock this map because it doens't do anything concurrently.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"doesn't" (spelling)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

// sequenceName -> segmentsForSequence
// This map should be locked with itself before accessing it.
// Note: FiniteAppenderatorDriver currently doesn't need to lock this map because it doens't do anything concurrently.
// However, it's desried to do some operations like indexing and pushing at the same time. Lockig this map is also
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

desired (spelling)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locking (spelling)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

identifier.getInterval().getStartMillis(),
k -> new LinkedList<>()
);
if (segmentWithState.getState() == SegmentState.APPENDING) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the original comment from this moved code.

            // always keep APPENDING segments for an interval start millis in the front

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

*/
ListenableFuture<SegmentsAndMetadata> dropInBackground(SegmentsAndMetadata segmentsAndMetadata)
{
log.info("dropping segments[%s]", segmentsAndMetadata.getSegments());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format the log message a bit nicer (capitalization)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jihoonson
Copy link
Copy Markdown
Contributor Author

#5261 will cause conflicts with this PR, and I don't want to block #5261. I'll update this PR once #5261 is merged.

@jihoonson
Copy link
Copy Markdown
Contributor Author

Merged recent changes from master.

{
final Object metadata = appenderator.startJob();
if (metadata != null) {
throw new ISE("Metadata should be null because batch ingestion doesn't support committing intermediate states");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rephrase to, "Metadata should be null because FiniteAppenderatorDriver never persists it."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

.map(segmentIdentifier -> SegmentWithState.newSegment(
segmentIdentifier,
AppenderatorDriver.SegmentState.INACTIVE
SegmentState.APPEND_FINISHED
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are changes to SegmentState backward compatible ?

it appears that possible values in SegmentState have changed with this patch. How will this work against the metadata persisted by previous versions of the code.
e.g. someone upgrades druid which stops all peons on middle manager and restarts them with new code which will see older values of SegmentState like ACTIVE, INACTIVE etc ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.

* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
* If committer is not provided, any data are NOT persisted. If it's provided, the add, clear, persist, persistAll,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, allowIncrementalPersists is ignored if committer is null ?
shouldn't it be the other way around and if allowIncrementalPersists is set to false then committer is ignored but if allowIncrementalPersists is true then a non-null committer must be provided ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also "no data is persisted" might be clearer than "any data are NOT persisted"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the Javadoc was wrong. It should be "no metadata is persisted". Fixed it now.

Committer is about persisting intermediate metadata while allowIncrementalPersists is about persisting data ingested so far. So, as you said, if allowIncrementalPersists is set to false then committer is ignored. But if allowIncrementalPersists, committer can still be null to avoid persisting metadata.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that makes sense now.
so, null committer simply means commit metadata is not persisted.

* commit metadata for this persist
*/
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, Committer committer);
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean IndexTask should never be calling persist and allowIncrementalPersists should be set to false on all add(..) calls (which then shouldn't/wouldn't call persist(..)) from IndexTask

if (persistExecutor != null) {
persistExecutor.shutdownNow();
Preconditions.checkState(
persistExecutor.awaitTermination(365, TimeUnit.DAYS),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two methods close() and closeNow() calling it .. in closeNow() we want to finish asap and shouldn't wait so you might want to add a flag in shutdownExecutors() if you want to wait in close() flow.

also, should we call xxx.shutdownNow() on all executors and then wait so that all of them are stopping in parallel ?

also the timeout here is too large.. if something was wrong then some thread would be stuck here potentially indefinitely . I would wait for maybe 5 minutes and then print an error and move on.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This awaitTermination existed before. I changed nothing but just moved into shutdownExecutors() because it is called in both close() and closeNow().

also, should we call xxx.shutdownNow() on all executors and then wait so that all of them are stopping in parallel ?

Maybe it's useful if it usually takes so long time. I didn't measure how long it takes.

also the timeout here is too large.. if something was wrong then some thread would be stuck here potentially indefinitely . I would wait for maybe 5 minutes and then print an error and move on.

It looks that this code was there from the first place. @gianm any thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the timeout here is too large.. if something was wrong then some thread would be stuck here potentially indefinitely . I would wait for maybe 5 minutes and then print an error and move on.

It looks that this code was there from the first place. @gianm any thoughts?

With regard to the huge timeout, when I wrote that I was expecting that some other system would kill tasks that are taking too long to shut down. But I guess it would be fine to print an error after a few minutes and give up.

also, should we call xxx.shutdownNow() on all executors and then wait so that all of them are stopping in parallel ?

Maybe it's useful if it usually takes so long time. I didn't measure how long it takes.

Sure, why not call shutdownNow first and then wait. I don't think it should matter too much.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson from the patch it looks like xxx.awaitTermination(..) was added here (also I can't see them in current master https://github.com/druid-io/druid/blob/0105cdbc19828009d21de57a30ba55794a518d30/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java#L840 )
what am I missing ?

it is called from AppenderatorImpl.closeNow() which should finish asap ... hence the suggestion to add a flag on whether to await or not , then closeNow() wouldn't have await while close() would.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called outside of shutdownExecutors(), but inside of close() (https://github.com/druid-io/druid/blob/0105cdbc19828009d21de57a30ba55794a518d30/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java#L698) and closeNow() (https://github.com/druid-io/druid/blob/0105cdbc19828009d21de57a30ba55794a518d30/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java#L749).

BTW, at first, I moved these awaiting termination codes to inside of shutdownExecutors() because closeNow() doesn't wait for pushExecutor to be terminated in the original code. But, it looks to be intended from the javadoc.

Do not unlock base persist dir as we are not waiting for push executor to shut down relying on current JVM to shutdown to not cause any locking problem if the task is restored.

I reverted this change and added a comment to avoid confusion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 any thoughts?

if (commitFile.exists()) {
final Committed oldCommitted = objectMapper.readValue(commitFile, Committed.class);
objectMapper.writeValue(commitFile, oldCommitted.without(identifier.getIdentifierAsString()));
final Committed oldCommit = readCommit();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did anything change here?
nit: FWIW, I find older code sufficiently readable ..also computeCommitFile() was called once (not that I care about that so much).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first, I added readCommit() and writeCommit() to use other places as well, but during refactoring, I realized those methods are not needed. Do you want to revert this change? I think they are not so bad anyway.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no strong opinion here .. i wasn't sure why the change was made.

{
return () -> wrapCommitter(committerSupplier.get());
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what changed here (is this a simple copy/paste from AppenderatorDriver) ... so haven't gone through it.. please let me know if there are specific parts here that changed and should be reviewed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing was changed. I guess git recognizes this was changed because of the line change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, I'm not going through this class then.

/**
* Move a set of identifiers out from "active", making way for newer segments.
*/
public void moveSegmentOut(final String sequenceName, final List<SegmentIdentifier> identifiers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment here saying this method only exists to support KafkaIndexTask.runLegacy(..) and should be removed with that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added.

@himanshug
Copy link
Copy Markdown
Contributor

@jihoonson where/how have you tested the changes? has this patch been verified on some cluster running Kafka Indexing Service and killing/restarting few Middle Managers ?

@jihoonson
Copy link
Copy Markdown
Contributor Author

@himanshug i've tested this PR on our cluster which is running multiple Kafka ingestions and some batch ingestions. It has been working well so far. I didn't tested killing/restarting MM yet. I'll keep you updated once the test is done.

@jihoonson
Copy link
Copy Markdown
Contributor Author

@himanshug I tested the backward compatibility by stopping/restarting MM and checking the running tasks were restored properly. It looks working well.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jihoonson. Please fix the conflict.

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm thanks fixed.

@drcrallen
Copy link
Copy Markdown
Contributor

Restarted failed travis component

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants