Skip to content

Allow client to configure batch ingestion task to wait to complete until segments are confirmed to be available by other#10676

Merged
suneet-s merged 36 commits intoapache:masterfrom
capistrant:batch-ingest-wait-for-handoff
Apr 9, 2021
Merged

Allow client to configure batch ingestion task to wait to complete until segments are confirmed to be available by other#10676
suneet-s merged 36 commits intoapache:masterfrom
capistrant:batch-ingest-wait-for-handoff

Conversation

@capistrant
Copy link
Copy Markdown
Contributor

@capistrant capistrant commented Dec 14, 2020

update for release notes

Add the ability to tell a batch ingestion task to wait for newly indexed segments to become available for query on Historical services before completing. This is turned off by default. If an end user wants their task to wait a specified amount of time for their segments to become available before the task completes, they must add awaitSegmentAvailabilityTimeoutMillis to the tuningConfig in their ingestion spec. If an end user adds this to their spec, the job will query the coordinator periodically checking to see if the desired segments have become available for query. If not all segments become available by the time this timeout expires, the job will still succeed. However, in the IngestionStatsAndErrors report, segmentAvailabilityConfirmed will be false. This indicates that handoff was not successful and these newly indexed segments may not all be available for query. On the other hand, if all segments become available for query on the Historical services before the timeout expires, the value for that key in the report will be true.

This tuningConfig value is not supported for compaction tasks at this time. If a user tries to specify a value for awaitSegmentAvailabilityTimeoutMillis for Compaction, the task will fail telling the user it is not supported.

end update

Description

High Level Description

Add configuration in tuningConfig for end user to tell Indexing Service to wait for segments to become available for query before completing the indexing task. The configuration is a timeout value in milliseconds to prevent waiting forever. If the timeout expires, the task still succeeds, but the task reports will indicate that Druid was not able to confirm that the segments became available for query.

This new configuration stems from my experience operating a production cluster with many tenants who often have the same complaint: "My indexing job is complete but the latest data is not available right when the job finishes". This addresses that by letting the client set a reasonable timeout. After the job completes, they can parse the ingestion report and see if their segments became available. More often than not, with a reasonable timeout, their segments will indeed be available right when the job completes.

Implementation

A lot of the code is already written for realtime handoffs. I extracted that code out of the realtime packages into a Java package so it is less confusing as to why non-realtime tasks are using it. org.apache.druid.segment.handoff is a new package in druid-server module.

AbstractBatchIndexTask gets a new method, waitForSegmentAvailability(TaskToolbox toolbox, ExecutorService exec, List<DataSegment> segmentsToWaitFor, long waitTimeout) that handles the waiting. Batch Indexing implementations leverage this method at the end of their ingestion task code if the client's tuningConfig has a non-zero wait time for segment availability. Default is to not wait.

A new key:value pair is added to the IngestionStatsAndErrorsTaskReport segmentAvailabilityConfirmed. This is a boolean that indicates if the job was able to confirm query availability of the new segments before finishing. Parallel index task supervisor did not previously have this report, so this PR adds the report with the needed availability key:value pair so all of simple native, parallel native, and hadoop native can implement this availability wait.

Alternatives

https://github.com/apache/druid/releases#19-datasource-loadstatus became available in druid 0.20.0. However, I worry about giving ingestion clients the green light to hit this API endpoint due to the possible expense of the calls depending on the questions asked.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • AbstractBatchIndexTask
  • HadoopIndexTask
  • IndexTask
  • ParallelIndexSupervisorTask
  • HadoopTuningConfig
  • ParallelIndexTuningConfig
  • IngestionStatsAndErrorsTaskReportData
  • AbstractITBatchIndexTest
  • ITHadoopIndexTest
  • ITBestEffortRollupParallelIndexTest
  • ITIndexerTest

@lgtm-com
Copy link
Copy Markdown

lgtm-com Bot commented Dec 14, 2020

This pull request introduces 1 alert when merging 027938e into 64f97e7 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link
Copy Markdown

lgtm-com Bot commented Dec 15, 2020

This pull request introduces 1 alert when merging 8b9d26d into 0ad27c0 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@jihoonson
Copy link
Copy Markdown
Contributor

@capistrant thanks for the PR. This idea seems useful. I will take a look soon.

@capistrant
Copy link
Copy Markdown
Contributor Author

@capistrant thanks for the PR. This idea seems useful. I will take a look soon.

Thanks @jihoonson. I'm a little worried about concurrency issues in AbstractBatchIndexTask#waitForSegmentAvailability so if your review can take extra care there that would be greatly appreciated

}
log.info("Waiting for segments to be loaded by the cluster...");

SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
Copy link
Copy Markdown
Contributor

@zhangyue19921010 zhangyue19921010 Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Each time we call the waitForSegmentAvailability function will communicate with coordinator no matter what the value of waitTimeout is and bring extra pressure to the Coordiantor, because there may be hundreds of thousands of batch tasks per day.
Maybe we can do a double check here, like when waitTimeout<=0 then skip all the waitForSegmentAvailability if possible , just in case that call waitForSegmentAvailability function without checking waitTimeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, that makes sense. Will add

toolbox,
availabilityExec,
segmentsToWaitFor,
ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we extract ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() at class level if possible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with that. addressed in latest commits

@zhangyue19921010
Copy link
Copy Markdown
Contributor

@capistrant Nice idea. Looking forward to being merged! 👍

s.getId()
);
synchronized (availabilityCondition) {
segmentsToWaitFor.remove(s);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure but removing items from the collection while iterating it may cause an error. Instead, we could use a counter and a completableFuture together. Something like this

CompletableFuture<Void> uberFuture = CompletableFuture.completedFuture(null);
for (DataSegment s : segmentsToWaitFor) {
      CompletableFuture<Void> future = new CompletableFuture<>();
      notifier.registerSegmentHandoffCallback(
          new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()),
          exec,
          () -> {
            log.info(
                "Confirmed availability for [%s]. Removing from list of segments to wait for",
                s.getId()
            );
           future.complete(null);
         }
      );
     uberFuture = uberFuture.thenCombine(future, (a, b) -> null);
    }
uberFuture.get(waitTimeout, TimeUnit.MILLISECONDS)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I've been trying to come up with an alternative to avoid concurrent access risks. thanks so much for this idea

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up trying a countdown latch with timeout since I am more familiar with those from use in the past as compared to your suggestion. let me know what you think of it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me @capistrant. Thanks.

for (DataSegment s : segmentsToWaitFor) {
notifier.registerSegmentHandoffCallback(
new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()),
exec,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since the runnable is a simple call without blocking or computation, we could just pass Execs.directExecutor here.

{
if (segmentsToWaitFor.isEmpty()) {
log.warn("Asked to wait for segments to be available, but I wasn't provided with any segments!?");
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this return true here? If there are no segments to confirm, the confirmation is done.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I suppose you are right. I guess I initially thought that since I didn't confirm any segments were loaded, I report back that segments were not confirmed to be loaded. However, that false value would be more confusing to an end user of the report compared to returning true... so true seems like right way to go

@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give some more details on how this will be used in your application? Do you want to track handoff failures of each task? I'm wondering if handoff time is also important.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my company deploys a large multi-tenant cluster with a services layer for ingestion that our tenants use. these tenants don't just want to know when their task succeeds, they also want to know when data from batch ingest is available for querying. This solution allows us to prevent the ingestion services layer and/or individual tenants from banging on Druid APIs trying to see if their data is available after ingestion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my company deploys a large multi-tenant cluster with a services layer for ingestion that our tenants use. these tenants don't just want to know when their task succeeds, they also want to know when data from batch ingest is available for querying. This solution allows us to prevent the ingestion services layer and/or individual tenants from banging on Druid APIs trying to see if their data is available after ingestion.

I understand this, but my question is more like what people expect when segment handoff fails. In streaming ingestion, the handoff failure causes task failure (this behavior seems arguable, but that's what it does now) and thus people's expectation is that they could see some data dropped after handoff failures until new tasks read the same data and publishes the same segments again. However, since there is no realtime querying in batch ingestion, I don't think tasks should fail on handoff failures (which is what this PR does! 🙂), but then what will be people's expectation? Are they going to be just OK with handoff failures and wait indefinitely until historicals load new segments (the current behavior)? Do they want to know why the handoff failed? Do they want to know how long it took before the handoff failed? These questions are not clear to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my company deploys a large multi-tenant cluster with a services layer for ingestion that our tenants use. these tenants don't just want to know when their task succeeds, they also want to know when data from batch ingest is available for querying. This solution allows us to prevent the ingestion services layer and/or individual tenants from banging on Druid APIs trying to see if their data is available after ingestion.

I understand this, but my question is more like what people expect when segment handoff fails. In streaming ingestion, the handoff failure causes task failure (this behavior seems arguable, but that's what it does now) and thus people's expectation is that they could see some data dropped after handoff failures until new tasks read the same data and publishes the same segments again. However, since there is no realtime querying in batch ingestion, I don't think tasks should fail on handoff failures (which is what this PR does! 🙂), but then what will be people's expectation? Are they going to be just OK with handoff failures and wait indefinitely until historicals load new segments (the current behavior)? Do they want to know why the handoff failed? Do they want to know how long it took before the handoff failed? These questions are not clear to me.

good question. for my specific case the service that end users interact with really wanted to be able to answer this question for the end user:

  • Is the data that I ingested in this job completely loaded for querying?

For us a simple yes/no will suffice. The cluster operators would have the goal of having 100% of jobs successfully handoff data before the timeout, but when that doesn't happen our users simply want to know that they may need to wait longer. We are simply trying to be transparent and report the point in time status. The onus of finding out when the data is fully loaded if this timeout expired before loading, would fall on a different solution (TBD).

You're right, we intentionally did not fail these tasks because Historical nodes loading the segments is detached from whether or not the data was written to deepstore/metastore (if that failed the task should and likely would fail due to existing code paths). We don't want our end users thinking they need to re-run their jobs when this is much more likely to be an issue of the coordinator not having assigned servers to load segments by the time the timeout expired.

Why the handoff failed would be something I as an operator am more interested compared to a user (unless that user is also an operator). I think that would be very difficult to communicate in these reports since the indexing task doesn't know much about what the rest of the cluster is doing.

Knowing how long it took before the time out could be found in the spec, but I guess it could be useful to add that value to the report as well if you think users would want to have quick reference. I think that rather than having that static value, it could be cool to have the dynamic time waited for handoff. Maybe it is the static value because we hit the timeout. but as an operator I would enjoy seeing how long each successful job waited for handoff. what do you think about that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us a simple yes/no will suffice. The cluster operators would have the goal of having 100% of jobs successfully handoff data before the timeout, but when that doesn't happen our users simply want to know that they may need to wait longer. We are simply trying to be transparent and report the point in time status. The onus of finding out when the data is fully loaded if this timeout expired before loading, would fall on a different solution (TBD).

Cool, are you working on "the different solution"? That would be interesting too.

Why the handoff failed would be something I as an operator am more interested compared to a user (unless that user is also an operator). I think that would be very difficult to communicate in these reports since the indexing task doesn't know much about what the rest of the cluster is doing.

I agree. I think we need more visibility on the coordinator behavior.

Knowing how long it took before the time out could be found in the spec, but I guess it could be useful to add that value to the report as well if you think users would want to have quick reference. I think that rather than having that static value, it could be cool to have the dynamic time waited for handoff. Maybe it is the static value because we hit the timeout. but as an operator I would enjoy seeing how long each successful job waited for handoff. what do you think about that?

That seems useful to me too 👍

For the time to fail handoff, due to the above issue of the lack of ability to know the cause of handoff failures, I guess I was wondering if the report can be a false alarm. For example, the report can say it failed to confirm the segments handed off, but maybe the handoff could be even not triggered at all for some reason. I don't think this can happen for now, but is possible in the future if someone else modifies this area for some good reason. segmentAvailabilityConfirmationCompleted + time to fail handoff can be an indicator of such unexpected failures. I would say this is not a blocker for this PR, but it seems useful to me.

* @param waitTimeout Millis to wait before giving up
* @return True if all segments became available, otherwise False.
*/
protected boolean waitForSegmentAvailability(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if you can reuse StreamAppenderatorDriver.registerHandoff() or StreamAppenderatorDriver.publishAndRegisterHandoff() as they seem pretty similar to this new method. You would need to move that method out to BaseAppenderatorDriver.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting suggestion. definitely open to it. I'll take a look as soon as I have time

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thoughts:

  • I agree that the code is working to achieve the same goal. so re-use would be nice
  • IndexTask appears to be straightforward if we were to take this approach.
  • Parallel Indexing and Hadoop Indexing do not seem as straightforward the way I understand things
    • parallel: we'd like to do this handoff wait from the supervisor, IMO. However there is no existing utilization of Appenderator or AppendoratorDriver at this level of parallel indexing.
    • hadoop: Hadoop does not use Appendorator or AppendoratorDriver so we would have to bolt that on with an Appendorator impl that is pretty much only there for show since hadoop indexing does not have any use other than the fact that it would be needed for this implementation (as far as I can tell)

I'm sure I could make it work eventually. But I wonder if the jumping through hoops to get the task code ready to be compatible for using the registerHandoff in BaseAppenderatorDriver is worth it both in terms of effort and code legibility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree. I think it would be better to not do such refactoring for parallel or hadoop task in this PR. But it would be still nice to reuse the same logic in both streaming and batch ingestion. Maybe we can extract this logic as a utility method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree. I think it would be better to not do such refactoring for parallel or hadoop task in this PR. But it would be still nice to reuse the same logic in both streaming and batch ingestion. Maybe we can extract this logic as a utility method?

Hm, I guess I'm a little bit confused on this comment. which logic are you suggesting be shared? Since the code in StreamAppenderatorDriver is so closely coupled with the appenderator concept, I struggle to see what can be extracted. The callback function required for the appenderator is incompatible with batch ingestion as it stands today. Are you suggesting that we use the same method for both but use different callback implementations based on the ingestion type? I guess I don't understand the value there if that is the case. Otherwise I may be missing your point entirely and just need a nudge in the right direction

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you are right.

getTaskCompletionRowStats(),
errorMsg
errorMsg,
false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this always false? Does it make more sense to be always true because realtime tasks will fail when handoff fails?

Copy link
Copy Markdown
Contributor Author

@capistrant capistrant Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I guess this would be true if the task succeeds and false if it does not instead of hardcoded. I was tunnel visioned on this part thinking only what my team needed and not what actual behavior should be for all different types of ingestion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with errorMsg == null and added a javadoc stating the errorMsg String should be null on success and non-null on failure. Current code follows that pattern

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I thought IngestionStatsAndErrorsTaskReportData is available only when the task succeeded. I think you are correct.

),
"an error message"
"an error message",
false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing with true would be better because missing booleans are defaulted to false by Jackson in Druid.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will change

@capistrant
Copy link
Copy Markdown
Contributor Author

Thanks @maytasm and @jihoonson for the reviews! @suneet-s I have merged master back into my branch and pushed a small commit to fix compilation after that merge.

Copy link
Copy Markdown
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed about 80 files. Posting my incomplete review. I don't think I've found anything major, just a couple of small things and a question about how the SegmentHandoffNotifier works

Comment thread server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java Outdated
Comment thread website/.spelling
JobHistory
a.example.com
assumeGrouped
awaitSegmentAvailabilityTimeoutMillis
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: did you mean to add this twice?

Suggested change
awaitSegmentAvailabilityTimeoutMillis

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understood it is that each file has it's own dictionary. The first is in the hadoop index file section and the second is in the native index file section

@JsonProperty("rowStats") Map<String, Object> rowStats,
@JsonProperty("errorMsg") @Nullable String errorMsg
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: Should this be

Suggested change
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed
@JsonProperty("segmentAvailabilityConfirmed") @Nullable Boolean segmentAvailabilityConfirmed

Seeing the json properties automatically make me think about version mismatches - but I don't exactly know how this is used - so I'm just asking in the hope you can save me some time from digging :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these will be created/written by individual indexing tasks. then stored wherever the cluster stores task logs. And I believe the only way they are ever accessed by Druid is streamed from their location directly to an API caller without ever deserializing them. So I don't think there is any possibility for issues during an upgrade here.

null,
null,
null,
null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a CompactionTaskTest where awaitSegmentAvailabilityTimeoutMillis is non null

Copy link
Copy Markdown
Contributor Author

@capistrant capistrant Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might. I don't think I was writing this with the intention of having compaction tasks support this. It really wouldn't add value since compaction isn't materially changing the underlying data. But I guess since compaction is spinning up a parallel indexing task, we kinda need to consider it so I will have to check on this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point. I think there are some things we should think about first.

  • It's true that currently compaction doesn't change the underlying data much, but it can make some changes such as filtering out some unnecessary dimensions or adding new metrics. You can also change the query granularity now. In the future, I can imagine that you can even transform your data using compaction with a new support for transformSpec.
  • The compaction task is a bit special and different from other batch tasks in how it publishes segments. All other batch tasks can push segments in the middle of indexing, but should publish all those segments at the end of indexing. However, the compaction task can process each time chunk at a time when there is no change in segment granularity. In this case, it can publish segments whenever it finishes processing individual time chunk. It can also go through all time chunks even when there are some time chunks that it fails to compact. The final task status will be FAILED when it succeeds to compact only some time chunks but fails for others.
  • Compacting datasources is usually not the single-shot type job. Rather, you would run multiple small compaction tasks over time as in auto compaction. In that case, you would want to know what time chunks are compacted and what are not, so that you can determine what result you can get when you query certain time chunks. For the compaction that is manually set up outside druid, tracking of individual compaction tasks could be useful for this purpose. However, for auto compaction, it won't provide much value since compaction tasks are submitted by the coordinator not users. So, we need another way such as adding a new coordinator API that returns such compaction status.

From these, we would probably want something similar but different for compaction from the one proposed here. I would suggest to do it in a different PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson when it comes to judging segment handoff, isn't a compaction task a parallel indexing task under the hood? I just tested a compaction task submit with tuning config and the new config for handoff set to 5 minutes. The task ran and logs indicate it confirmed handoff of newly created compacted segment.

Should not allow the handoff to occur when it is a compact type task? I'm not sure how we would do that, since the TuningConfig is not modifiable as written and if the task(s) ran by the compact task are just parallel indexing tasks, I'm not sure how we would tell them not to honor the handoff wait config

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson when it comes to judging segment handoff, isn't a compaction task a parallel indexing task under the hood? I just tested a compaction task submit with tuning config and the new config for handoff set to 5 minutes. The task ran and logs indicate it confirmed handoff of newly created compacted segment.

It is the parallel index task under the hood, but one compaction task can run multiple parallel indexing tasks. With the changes in this PR, each of those parallel indexing tasks will wait for segment handoff independently. The handoff wait timeout will also apply separately for each parallel task. I think this is OK as long as it's documented, but what I meant is, maybe we could do this differently, such as waiting for segments to be handed off all together after running all parallel tasks.

Should not allow the handoff to occur when it is a compact type task? I'm not sure how we would do that, since the TuningConfig is not modifiable as written and if the task(s) ran by the compact task are just parallel indexing tasks, I'm not sure how we would tell them not to honor the handoff wait config

It seems reasonable to me to not support until we figure out what we want to do. An easy way to do so is making a copy of tuningConfig in compaction task but with awaitSegmentAvailabilityTimeoutMillis always being set to 0. We should document that compaction tasks don't wait for handoff if you want to do this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this discussion, can we add a guardrail that says if someone tries to issue a compaction task when awaitSegmentAvailabilityTimeoutMillis is non null and != 0 that this is not supported. I think that's the easiest way forward to get the PR merged and it's a nice UX because it tells users that what they are doing is untested. Similar comment for auto-compaction (and other ingestion tasks that I may have missed)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this discussion, can we add a guardrail that says if someone tries to issue a compaction task when awaitSegmentAvailabilityTimeoutMillis is non null and != 0 that this is not supported. I think that's the easiest way forward to get the PR merged and it's a nice UX because it tells users that what they are doing is untested. Similar comment for auto-compaction (and other ingestion tasks that I may have missed)

Are you expecting a job submit failure then?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I was imagining. That being said, I'm open to other options if you had something in mind.

I'm basing this on the principle that if I (the user) do something that Druid doesn't expect, Druid tells me I'm doing something unexpected, and failing fast is better than failing in some obscure way later on in the process.

A clear error message on the failed job like "awaitSegmentAvailabilityTimeoutMillis is not supported for compaction. Please remove this from the spec and re-submit the compaction job." would tell users exactly how to fix the failed job - hence, the nice UX.

A future PR for this support with auto-compaction / compaction would be super cool though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea of it failing. I will work on changes as soon as I can, hopefully early this week.

10,
100,
null,
null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an IndexTaskSerdeTest where awaitSegmentAvailabilityTimeoutMillis is not null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was not any new explicit test written the other tests in that file do use -1L and 1L as test values. I just updated another of the existing tests to use 0L as well.

taskSpec = StringUtils.replace(
taskSpec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth testing what happens when this is non zero?

Do we also want to check that the IngestionStatsAndErrorsTaskReportData reports segmentAvailabilityConfirmed as false now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya so this relates back to your comment on compaction above. It seems that the smart thing to do is add testing since it is supported by default despite my not intentionally supporting it

@capistrant
Copy link
Copy Markdown
Contributor Author

Reviewed about 80 files. Posting my incomplete review. I don't think I've found anything major, just a couple of small things and a question about how the SegmentHandoffNotifier works

I appreciate you starting to dig through all of this. I pushed up some of the more trivial changes from comments and addressed the others as well. I think you have made a good point about compaction. I never wrote this intending for compaction to use the handoff, but since it seems I inadvertently allow its support, I should have some kind of testing. I'm looking into that now

);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to remove the retry loop in here https://github.com/apache/druid/pull/10676/files#diff-6dbfd938d89c6c209d36efc181277afa535629677dfe6b5a1710190c2f9d8ee3L308-L311

if IngestionStatsAndErrorsTaskReport say that the segment Availability was confirmed. If it isn't true on the first attempt, that means the task report was wrong - am I understanding this correctly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to track down exactly what retry loop you are referring to. When I click that link my browser is just loading at the top of the page so I'm not sure exactly where you are wanting me to look.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this link is better https://github.com/apache/druid/blob/master/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java#L308-L312

    if (waitForSegmentsToLoad) {
      ITRetryUtil.retryUntilTrue(
          () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
      );
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, my gut is that no we don't want to remove that without some (potentially large) surgery to the indexing IT suite. I added on my own logic for checking the ingestion report for handoff status. But that is separate from what other tests are using this loop for. I would bet that the two ideas could be merged together more gracefully by replacing waitForSegmentsToLoad by using the new handoff config and waiting for the task to complete and then checking the report and failing if the report doesn't indicate the segments were loaded. Not sure if it is in scope for this PR, especially after we revoked support for the handoff for compaction tasks (unless there are no Compaction IT out there that use waitForSegmentsToLoad=true, I'd have to check).

return StringUtils.replace(
spec,
"%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
jsonMapper.writeValueAsString("0")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on this pattern since I've seen it across a few tests

  • Does this change mean we lose coverage on a missing SEGMENT_AVAIL_TIMEOUT_MILLIS? Since this is the default mode, it would be good to have coverage for that missing change
  • Do we have / want tests where this is set to a high enough number that we don't need a re-try loop in the integration tests while waiting for the segments to be available

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on this pattern since I've seen it across a few tests

  • Does this change mean we lose coverage on a missing SEGMENT_AVAIL_TIMEOUT_MILLIS? Since this is the default mode, it would be good to have coverage for that missing change

There IT tests out there that indirectly test this. By indirectly, I mean that I did not add the tests but the tests do not have the config in the tuningConfig. For instance, https://github.com/capistrant/incubator-druid/blob/batch-ingest-wait-for-handoff/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java#L146, is an index job that doesn't add the transform and the underlying spec file does not have the config in it. https://github.com/capistrant/incubator-druid/blob/batch-ingest-wait-for-handoff/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json#L49

  • Do we have / want tests where this is set to a high enough number that we don't need a re-try loop in the integration tests while waiting for the segments to be available

technically that retry loop is redundant in the case that the test uses true,true for the Pair object that determines if we should check the ingestion report for true as the value for the handoff. this would be an example of that, https://github.com/capistrant/incubator-druid/blob/batch-ingest-wait-for-handoff/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java#L205

Copy link
Copy Markdown
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished looking through all the files, overall it's great. Nothing too concerning from me. Let me know how you think we should proceed on some of my comments

@capistrant
Copy link
Copy Markdown
Contributor Author

capistrant commented Mar 31, 2021

@suneet-s I took a stab at a CompactionTuningConfig that basically fronts the ParallelIndexTuningConfig that it had been using. This allows us to enforce a check that a non-null or 0 value for awaitSegmentAvailabilityTimeoutMillis and causes the job to fail if set differently in the provided tuningConfig. Since this is all the TuningConfig does, I suggest we leave it undocumented and add documentation stating awaitSegmentAvailabilityTimeoutMillis is not supported for compaction tasks at this time.

I still have to test this new code locally, but I wanted to get it up for review and CI

private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
private final CompactionTuningConfig tuningConfig;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change means that compaction jobs that are submitted with a ParallelIndexTuningConfig (using type = index_parallel) will start failing after this change - is this correct?

If so, I think instead of introducing this as a breaking change, we can just add the Precondition check that you have in the CompactionTuningConfig into the constructor for the compaction task.

If this isn't a breaking change, I like that there's a separate tuningConfig for compaction tasks, so that in the future, this config can be more easily optimized.

cc @maytasm Since I've seen you make some improvements around compaction recently

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take this back. getTuningConfig deals with the different types of tuningConfigs. 🤦

Copy link
Copy Markdown
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after CI

@capistrant
Copy link
Copy Markdown
Contributor Author

was able to play around with the latest code in my sandbox environment yesterday. It behaves how I would expect with the latest compaction changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants