-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Allow client to configure batch ingestion task to wait to complete until segments are confirmed to be available by other #10676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d9e7f9b
3d4de8c
027938e
8b9d26d
72cd38a
f47a8bd
d5ed3c8
71d6f18
0170dca
237325f
848805e
8169c88
32fdff5
b4f2e09
9295df1
abf7f99
d2e9918
92f3bec
222d3c2
88df46c
452b649
376cfa0
c63611a
1772199
16274c3
503bdb4
0fa4f41
3674836
8c3a499
f6c9168
edc26dd
7b25321
9498cb0
d39dfa3
747ee33
381cdfe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -481,6 +481,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ) | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -235,6 +235,7 @@ public DetermineHashedPartitionsJobTest( | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ) | ||
| ); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -344,6 +344,7 @@ public DeterminePartitionsJobTest( | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ) | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -257,6 +257,7 @@ HadoopIngestionSpec build() | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -547,6 +547,7 @@ public void setUp() throws Exception | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ) | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -187,6 +187,7 @@ public void setup() throws Exception | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ) | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,6 +79,7 @@ public class GranularityPathSpecTest | |
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null | ||
| ); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -41,17 +41,22 @@ public class IngestionStatsAndErrorsTaskReportData | |||||
| @Nullable | ||||||
| private String errorMsg; | ||||||
|
|
||||||
| @JsonProperty | ||||||
| private boolean segmentAvailabilityConfirmed; | ||||||
|
|
||||||
| public IngestionStatsAndErrorsTaskReportData( | ||||||
| @JsonProperty("ingestionState") IngestionState ingestionState, | ||||||
| @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents, | ||||||
| @JsonProperty("rowStats") Map<String, Object> rowStats, | ||||||
| @JsonProperty("errorMsg") @Nullable String errorMsg | ||||||
| @JsonProperty("errorMsg") @Nullable String errorMsg, | ||||||
| @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive question: Should this be
Suggested change
Seeing the json properties automatically make me think about version mismatches - but I don't exactly know how this is used - so I'm just asking in the hope you can save me some time from digging :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So these will be created/written by individual indexing tasks. then stored wherever the cluster stores task logs. And I believe the only way they are ever accessed by Druid is streamed from their location directly to an API caller without ever deserializing them. So I don't think there is any possibility for issues during an upgrade here. |
||||||
| ) | ||||||
| { | ||||||
| this.ingestionState = ingestionState; | ||||||
| this.unparseableEvents = unparseableEvents; | ||||||
| this.rowStats = rowStats; | ||||||
| this.errorMsg = errorMsg; | ||||||
| this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed; | ||||||
| } | ||||||
|
|
||||||
| @JsonProperty | ||||||
|
|
@@ -79,6 +84,12 @@ public String getErrorMsg() | |||||
| return errorMsg; | ||||||
| } | ||||||
|
|
||||||
| @JsonProperty | ||||||
| public boolean isSegmentAvailabilityConfirmed() | ||||||
| { | ||||||
| return segmentAvailabilityConfirmed; | ||||||
| } | ||||||
|
|
||||||
| public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports( | ||||||
| Map<String, TaskReport> taskReports | ||||||
| ) | ||||||
|
|
@@ -100,13 +111,20 @@ public boolean equals(Object o) | |||||
| return getIngestionState() == that.getIngestionState() && | ||||||
| Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) && | ||||||
| Objects.equals(getRowStats(), that.getRowStats()) && | ||||||
| Objects.equals(getErrorMsg(), that.getErrorMsg()); | ||||||
| Objects.equals(getErrorMsg(), that.getErrorMsg()) && | ||||||
| Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public int hashCode() | ||||||
| { | ||||||
| return Objects.hash(getIngestionState(), getUnparseableEvents(), getRowStats(), getErrorMsg()); | ||||||
| return Objects.hash( | ||||||
| getIngestionState(), | ||||||
| getUnparseableEvents(), | ||||||
| getRowStats(), | ||||||
| getErrorMsg(), | ||||||
| isSegmentAvailabilityConfirmed() | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
|
|
@@ -117,6 +135,7 @@ public String toString() | |||||
| ", unparseableEvents=" + unparseableEvents + | ||||||
| ", rowStats=" + rowStats + | ||||||
| ", errorMsg='" + errorMsg + '\'' + | ||||||
| ", segmentAvailabilityConfirmed=" + segmentAvailabilityConfirmed + | ||||||
| '}'; | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give some more details on how this will be used in your application? Do you want to track handoff failures of each task? I'm wondering if handoff time is also important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my company deploys a large multi-tenant cluster with a services layer for ingestion that our tenants use. these tenants don't just want to know when their task succeeds, they also want to know when data from batch ingest is available for querying. This solution allows us to prevent the ingestion services layer and/or individual tenants from banging on Druid APIs trying to see if their data is available after ingestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this, but my question is more like what people expect when segment handoff fails. In streaming ingestion, the handoff failure causes task failure (this behavior seems arguable, but that's what it does now) and thus people's expectation is that they could see some data dropped after handoff failures until new tasks read the same data and publishes the same segments again. However, since there is no realtime querying in batch ingestion, I don't think tasks should fail on handoff failures (which is what this PR does! 🙂), but then what will be people's expectation? Are they going to be just OK with handoff failures and wait indefinitely until historicals load new segments (the current behavior)? Do they want to know why the handoff failed? Do they want to know how long it took before the handoff failed? These questions are not clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question. for my specific case the service that end users interact with really wanted to be able to answer this question for the end user:
For us a simple yes/no will suffice. The cluster operators would have the goal of having 100% of jobs successfully handoff data before the timeout, but when that doesn't happen our users simply want to know that they may need to wait longer. We are simply trying to be transparent and report the point in time status. The onus of finding out when the data is fully loaded if this timeout expired before loading, would fall on a different solution (TBD).
You're right, we intentionally did not fail these tasks because Historical nodes loading the segments is detached from whether or not the data was written to deepstore/metastore (if that failed the task should and likely would fail due to existing code paths). We don't want our end users thinking they need to re-run their jobs when this is much more likely to be an issue of the coordinator not having assigned servers to load segments by the time the timeout expired.
Why the handoff failed would be something I as an operator am more interested compared to a user (unless that user is also an operator). I think that would be very difficult to communicate in these reports since the indexing task doesn't know much about what the rest of the cluster is doing.
Knowing how long it took before the time out could be found in the spec, but I guess it could be useful to add that value to the report as well if you think users would want to have quick reference. I think that rather than having that static value, it could be cool to have the dynamic time waited for handoff. Maybe it is the static value because we hit the timeout. but as an operator I would enjoy seeing how long each successful job waited for handoff. what do you think about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, are you working on "the different solution"? That would be interesting too.
I agree. I think we need more visibility on the coordinator behavior.
That seems useful to me too 👍
For the time to fail handoff, due to the above issue of the lack of ability to know the cause of handoff failures, I guess I was wondering if the report can be a false alarm. For example, the report can say it failed to confirm the segments handed off, but maybe the handoff could be even not triggered at all for some reason. I don't think this can happen for now, but is possible in the future if someone else modifies this area for some good reason.
segmentAvailabilityConfirmationCompleted+ time to fail handoff can be an indicator of such unexpected failures. I would say this is not a blocker for this PR, but it seems useful to me.