Add errors and state to stream supervisor status API endpoint#7428
Conversation
| |UNHEALTHY_SUPERVISOR|The supervisor has encountered non-transient errors on the past `druid.supervisor.stream.unhealthinessThreshold` iterations|1| | ||
| |UNHEALTHY_TASKS|The last `druid.supervisor.stream.taskUnhealthinessThreshold` tasks have all failed|2| | ||
| |UNABLE_TO_CONNECT_TO_STREAM|The supervisor is encountering connectivity issues with Kafka and has not successfully connected in the past|3| | ||
| |LOST_CONTACT_WITH_STREAM|The supervisor is encountering transient connectivity issues with Kafka but has successfully connected in the past|4| |
There was a problem hiding this comment.
nit: For LOST_CONTACT_WITH_STREAM, are the connectivity issues necessarily transient? Maybe sometimes the connectivity never comes back without operator intervention.
| Notes about states: | ||
|
|
||
| - Since it's possible that 2+ states can apply to a supervisor at the same time, each state is given a priority. The | ||
| active state with the highest priority will be returned in the status report. |
There was a problem hiding this comment.
is priority 1 considered higher than priority 5, or the other way around?
| } | ||
|
|
||
| private State supervisorState; | ||
| // Remove all throwableEvents that aren't in this set at the end of each run (transient) |
There was a problem hiding this comment.
This comment seems misplaced
| tasksHealthy = false; | ||
| } | ||
| } | ||
| if (tasksHealthy && currentRunState == State.UNHEALTHY_TASKS) { |
There was a problem hiding this comment.
Can currentRunState ever be UNHEALTHY_TASKS here? It starts as RUNNING but I don't see anything above this setting it to UNHEALTHY_TASKS
There was a problem hiding this comment.
You're right. That should have been supervisorState but that's an irrelevant check because it's already checked on line 173.
| import org.junit.BeforeClass; | ||
| import org.junit.Test; | ||
|
|
||
| public class CircularBufferTest |
There was a problem hiding this comment.
There's also a ChangeRequestHistoryTest test suite that has a test for CircularBuffer, can you move that test here now that you've added a separate test suite?
| throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException | ||
| { | ||
| possiblyRegisterListener(); | ||
| stateManager.setStateIfNoSuccessfulRunYet(SeekableStreamSupervisorStateManager.State.CONNECTING_TO_STREAM); |
There was a problem hiding this comment.
Hm, I feel like it would be cleaner if the state manager handled the decision of whether to transition to a particular state based on whether a successful run has occurred or not (I don't think the caller should have to know that it needs to call either setStateIfNoSuccessfulRunYet or setState depending on the state)
|
|
||
| for (Map.Entry<Class, Queue<ExceptionEvent>> events : eventStore.getNonTransientRecentEvents().entrySet()) { | ||
| if (events.getValue().size() >= unhealthinessThreshold) { | ||
| if (events.getKey().equals(NonTransientStreamException.class)) { |
There was a problem hiding this comment.
Is it possible that you once connected successfully to the stream, but later get a lot of NonTransientStreamException?
There was a problem hiding this comment.
Yes. Although it shouldn't happen in normal operation, I could imagine that there's some edge cases where that might occur (e.g. someone changing a Kafka cluster's permissions causing an already successfully running supervisor to start throwing auth exceptions?). Do you think there's value in subdividing the LOST_CONTACT_WITH_STREAM state into LOST_CONTACT_WITH_STREAM_NON_TRANSIENT and LOST_CONTACT_WITH_STREAM_TRANSIENT states?
There was a problem hiding this comment.
Hm, I'm not sure that tying transience to the type of exception is the right approach (I think transience is more about the number of consecutive failures regardless of the exception type).
Since the error states are trying to convey whether the supervisor is having stream connectivity issues (UNABLE_TO_CONNECT_TO_STREAM, LOST_CONTACT_WITH_STREAM) or if its some other kind of issue (UNHELAHTY_SUPERVISOR), I'm thinking it would be better to separate the exceptions into two categories:
- Stream connection problems
- problems unrelated to stream connectivity
UNABLE_TO_CONNECT_TO_STREAM would be the state when you have stream connection exceptions over the configured threshold, and you've not successfully connected before
LOST_CONTACT_WITH_STREAM would occur when you have stream connection exceptions over the configured threshold, and you have successfully connected before
UNHEALTHY_SUPERVISOR would occur when you have non-stream connection exceptions over the threshold
There was a problem hiding this comment.
Although I agree that your implementation would result in a cleaner API design, I still think the transience of an error is important to convey since it indicates severity. For example, a Kinesis supervisor that encounters LimitExceededExceptions (a temporary API limit exception) over the first threshold runs shouldn't be treated equally in severity to a supervisor that throws some sort of auth exception over the first threshold runs since only the former could possibly recover without user intervention. If we were to eliminate that concept, there'd be no difference from the API caller's perspective.
In the initial discussion on this change's proposal, @dclim said "If it transitions from RUNNING (healthy) to UNHEALTHY, assume that someone has hooked up a monitoring system to it and is going to get paged at 4am in the morning, so it better actually be UNHEALTHY, and not some transient error that is going to resolve in the next minute". If we're set on working under the assumption that someone's hooking up this endpoint to a pager, I think the transience of the error is very relevant.
There was a problem hiding this comment.
If we're set on working under the assumption that someone's hooking up this endpoint to a pager, I think the transience of the error is very relevant.
If the exceptions have different severities, then I think you could be more lenient on the failure thresholds for low severity errors. UNABLE_TO_CONNECT and LOST_CONTACT should accurately reflect the real state/history though, I don't think fudging that for low severity errors is the right approach.
There was a problem hiding this comment.
Another consideration might be to have the status report contain some error severity suggestion, if you only had low severity errors the report could indicate that somehow and a user that didn't care too much low sev errors could choose not to alert in such situations, also, is the classification of low sev vs high sev errors very clear?
There was a problem hiding this comment.
Splitting the states into transient vs non transient could be useful, but maybe better to indicate transience or severity in a separate field?
There was a problem hiding this comment.
I can add a field to ExceptionEvent that indicates transience then have the states set regardless of error transience. Is that approach cool?
| } else { | ||
| currentRunState = State.UNHEALTHY_TASKS; | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Since UNHEALTHY_SUPERVISOR is higher priority than UNHEALTHY_TASKS, I think you could skip this else block when currentRunState has been set to UNHEALTHY_SUPERVISOR
| currentRunState = getHigherPriorityState(currentRunState, State.UNABLE_TO_CONNECT_TO_STREAM); | ||
| } else if (events.getKey().equals(TransientStreamException.class) || | ||
| events.getKey().equals(PossiblyTransientStreamException.class)) { | ||
| currentRunState = getHigherPriorityState(currentRunState, State.LOST_CONTACT_WITH_STREAM); |
There was a problem hiding this comment.
Similarly, is it possible that you've never successfully connected to the stream, but you've only gotten PossiblyTransient or Transient exceptions?
There was a problem hiding this comment.
Yes. I was debating whether to give the supervisor the higher severity UNABLE_TO_CONNECT_TO_STREAM state in that case. I opted against doing so for the reason that I believe that it's better to incorrectly label a non-transient issue as LOST_CONTACT than to incorrectly label a transient issue as UNABLE_TO_CONNECT (assuming someone's pager is hooked up to this endpoint). Do you think there's value in subdividing the UNABLE_TO_CONNECT_TO_STREAM state into UNABLE_TO_CONNECT_TO_STREAM_NON_TRANSIENT and UNABLE_TO_CONNECT_TO_STREAM_TRANSIENT states?
| this.stateHistory = new CircularBuffer<>(Math.max(healthinessThreshold, unhealthinessThreshold)); | ||
| } | ||
|
|
||
| public Optional<State> setStateAndCheckIfFirstRun(State newState) |
There was a problem hiding this comment.
I think it's more readable if this is just called setState, the logic around some states only occurring on the first iteration of the supervisor could be documented more fully in a method javadoc or somewhere else state-related
|
|
||
| // The number of runs failed before the supervisor flips from a RUNNING to an UNHEALTHY state | ||
| @JsonProperty | ||
| @Min(3) |
There was a problem hiding this comment.
Is there any particular reason to limit the minimum to 3 in any of these?
| @Min(3) | ||
| private int unhealthinessThreshold = 3; | ||
|
|
||
| // The number of successful before the supervisor flips from an UNHEALTHY to a RUNNING state |
| * @return {@link AppenderatorDriverAddResult} | ||
| * | ||
| * @throws IOException if there is an I/O error while allocating or writing to a segment | ||
| * @throws IOException if there is an I/O error while allocating or writing to a segmentq |
| private DateTime timestamp; | ||
| private StreamErrorTransience streamErrorTransience; | ||
|
|
||
| public ExceptionEvent() |
There was a problem hiding this comment.
Jackson's deserializer complained that there's no default constructor but I can replace this with a JsonCreator constructor
| private final DateTime offsetsLastUpdated; | ||
| private final boolean suspended; | ||
| private final SeekableStreamSupervisorStateManager.State state; | ||
| private final Queue<SeekableStreamSupervisorStateManager.ExceptionEvent> recentErrors; |
There was a problem hiding this comment.
minor: I think typically we use List rather than Queue if there's no particular reason for it to be a queue. ExceptionEventStore.getRecentEvents() should probably return something more common likeList or Collection. Also while we're at it, it should probably copy the data into an immutable data type instead of passing the underlying ConcurrentLinkedQueue around to avoid accidental modification.
| * checks if there's been at least one successful iteration if needed and sets supervisor state to an appropriate | ||
| * new state. | ||
| */ | ||
| public Optional<State> setState(State newState) |
There was a problem hiding this comment.
Doesn't seem like anyone reads the return value
| public void storeThrowableEvent(Throwable t) | ||
| { | ||
| if (t instanceof PossiblyTransientStreamException && atLeastOneSuccessfulRun) { | ||
| t = new TransientStreamException(t); |
There was a problem hiding this comment.
How about wrap the underlying exception instead of wrapping PossiblyTransientStreamException in a TransientStreamException?
|
|
||
| public void markRunFinishedAndEvaluateHealth() | ||
| { | ||
| if (currentRunSuccessful) { |
There was a problem hiding this comment.
More elegant as atLeastOneSuccessfulRun |= currentRunSuccessful;
| } | ||
| } | ||
|
|
||
| supervisorStateHistory.add(currentRunState); |
There was a problem hiding this comment.
Shouldn't this come after the UNHEALTHY_SUPERVISOR state check?
|
|
||
| State currentRunState = State.RUNNING; | ||
|
|
||
| for (Map.Entry<Class, Queue<ExceptionEvent>> events : eventStore.getNonTransientRecentEvents().entrySet()) { |
There was a problem hiding this comment.
I don't think this is clearing error conditions properly. Let's say that you have maxStoredExceptionEvents set to something very high, such that ExceptionEventStore.storeThrowable() never removes any items from recentEventsQueue or recentEventsMap. Then let's say you have unhealthinessThreshold set to 3 and you accumulate 3 TransientStreamException events - It'll drop into a LOST_CONTACT_WITH_STREAM state. Then next run, say there were no errors encountered on the run - it'll go back into a RUNNING state. But the next run if you get another TransientStreamException, it'll immediately drop back into a LOST_CONTACT_WITH_STREAM state. Again assuming maxStoredExceptionEvents is very high and is never hit, you could have a stream running successfully for hours, and then on the next TransientStreamException it goes into an unhealthy state without respecting unhealthinessThreshold. Does this behavior sound like what would happen?
|
In
|
552cab1 to
f686d2a
Compare
clintropolis
left a comment
There was a problem hiding this comment.
Had first pass, will do another soon 👍
| |CREATING_TASKS (first iteration only)|The supervisor is creating tasks and discovering state| | ||
| |RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse| | ||
| |SUSPENDED|The supervisor has been suspended| | ||
| |SHUTTING_DOWN|Shutdown has been called but the supervisor hasn’t fully shutdown yet| |
There was a problem hiding this comment.
I think this state should be STOPPING instead of SHUTTING_DOWN since it is tied to the supervisor 'stop' method and importantly to avoid confusion with the deprecated supervisor 'shutdown' API call which is now called 'terminate' which tombstones the supervisor.
| { | ||
| public enum State | ||
| { | ||
| // Error states - ordered from high to low priority |
There was a problem hiding this comment.
Is there still a priority in this PR? It looks to me like UNABLE_TO_CONNECT_TO_STREAM or LOST_CONTACT_WITH_STREAM would come first depending on if a successful run had happened, then UNHEALTHY_SUPERVISOR, then UNHEALTHY_TASKS?
There was a problem hiding this comment.
Yeah the priority is more implicit now - there is no overlap between LOST_CONTACT_WITH_STREAM, UNABLE_TO_CONNECT_TO_STREAM, and UNHEALTHY_SUPERVISOR - it reports one of the first two if the last exception thrown was wrapped in a StreamException (which happens for exceptions from calls made to the Kafka/Kinesis client library), otherwise it's UNHEALTHY_SUPERVISOR. Any of those 3 take priority over UNHEALTHY_TASKS.
|
|
||
| |Property|Description|Default| | ||
| |--------|-----------|-------| | ||
| |druid.supervisor.stream.healthinessThreshold|The number of successful iterations before the supervisor flips from an UNHEALTHY to a RUNNING state|3| |
There was a problem hiding this comment.
inconsistent terminology, this should be UNEALTHY_SUPERVISOR?
There was a problem hiding this comment.
Not necessarily - it could also have been in a LOST_CONTACT_WITH_STREAM or UNABLE_TO_CONNECT_TO_STREAM which are also the 'unhealthy' states. Maybe it shouldn't be capitalized to avoid confusion?
There was a problem hiding this comment.
Similarly with the rest of the comments about UNHEALTHY -> UNHEALTHY_SUPERVISOR.
| |Property|Description|Default| | ||
| |--------|-----------|-------| | ||
| |druid.supervisor.stream.healthinessThreshold|The number of successful iterations before the supervisor flips from an UNHEALTHY to a RUNNING state|3| | ||
| |druid.supervisor.stream.unhealthinessThreshold|The number of iterations failed before the supervisor flips from a RUNNING to an UNHEALTHY state|3| |
|
|
||
| |Property|Description|Default| | ||
| |--------|-----------|-------| | ||
| |druid.supervisor.stream.healthinessThreshold|The number of successful iterations before the supervisor flips from an UNHEALTHY to a RUNNING state|3| |
| |Property|Description|Default| | ||
| |--------|-----------|-------| | ||
| |druid.supervisor.stream.healthinessThreshold|The number of successful iterations before the supervisor flips from an UNHEALTHY to a RUNNING state|3| | ||
| |druid.supervisor.stream.unhealthinessThreshold|The number of iterations failed before the supervisor flips from a RUNNING to an UNHEALTHY state|3| |
| currentRunSuccessful = false; | ||
| } | ||
|
|
||
| public void storeCompletedTaskState(TaskState state) |
There was a problem hiding this comment.
Depends on your definition of storing. Maybe something like trackCompletedTask() would be less confusing?
| @JsonProperty | ||
| private boolean storingStackTraces = false; | ||
|
|
||
| // The number of runs failed before the supervisor flips from a RUNNING to an UNHEALTHY state |
There was a problem hiding this comment.
UNHEALTHY -> UNHEALTHY_SUPERVISOR?
| @JsonProperty | ||
| private int unhealthinessThreshold = 3; | ||
|
|
||
| // The number of successful runs before the supervisor flips from an UNHEALTHY to a RUNNING state |
There was a problem hiding this comment.
UNHEALTHY -> UNHEALTHY_SUPERVISOR?
| consumer lag per partition may be reported as negative values if the supervisor has not received a recent latest offset | ||
| response from Kafka. The aggregate lag value will always be >= 0. | ||
|
|
||
| The status report also contains the supervisor's state and a list of recently thrown exceptions (whose max size can be |
There was a problem hiding this comment.
Since afaict UNHEALTHY_SUPERVISOR, UNHEALTHY_TASK,UNABLE_TO_CONNECT_TO_STREAM, and LOST_CONTACT_TO_STREAM don't all seem mutually exclusive, what exactly is the value in distinguishing them at all instead of just a single UNHEALTHY state? It seems like the end result is going to be hitting the API to get the errors so you can find out the 'why' so you can determine how to resolve the situation, no matter which unhealthy state, no? Am I missing something?
There was a problem hiding this comment.
Well, everything but UNHEALTHY_TASKS is mutually exclusive.
I think the value is in providing a bit more information to help in debugging, mainly in distinguishing between the 'unable to connect' and 'lost contact' cases. If you didn't distinguish this and just had a list of recent exceptions, how would you be able to tell if this supervisor ever worked and is possibly suffering a 'transient' issue, other than by looking through logs? The 'unhealthy supervisor' case is then a necessary third option to handle exceptions that don't fall into either of the first two categories because they're not stream-related. 'Unhealthy tasks' is more of a nice to have - that way monitoring systems don't have to additionally parse the response of the task API endpoints to figure out that a bunch of tasks are failing.
| return partitions.stream().map(PartitionInfo::partition).collect(Collectors.toSet()); | ||
| return wrapExceptions(() -> { | ||
| // use consumer.listTopics() instead of partitionsFor() to force a remote call so we can detect stream issues | ||
| Map<String, List<PartitionInfo>> topics = consumer.listTopics(); |
There was a problem hiding this comment.
I think this was explicitly changed to partitionsFor because of the overhead of listTopics if you have a ton of topics. See #6455
There was a problem hiding this comment.
Hah nice, thanks for pointing that PR out. In that case, I can go back to using partitionsFor
…c SeekableStreamSupervisorStateManager
jon-wei
left a comment
There was a problem hiding this comment.
Latest code changes LGTM, had some comments on the docs
|
|
||
| |State|Description| | ||
| |-----|-----------| | ||
| |UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations| |
There was a problem hiding this comment.
Suggest adding some info about the difference between the basic states and the detailed implementation-specific states, and how the detailed implementation states here map to basic states
| |SUSPENDED|The supervisor has been suspended| | ||
| |STOPPING|The supervisor is stopping| | ||
|
|
||
| States marked with "first iteration only" only occur on the supervisor's first iteration at startup or after suspension. |
There was a problem hiding this comment.
Suggest adding a short high-level summary of the Kafka/Kinesis supervisor's runInternal() loop. The info is kind of there implicitly in the ordering of the states above, but I think a more explicit description of the per-iteration sequence would be useful
|
|
||
| |State|Description| | ||
| |-----|-----------| | ||
| |UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations| |
There was a problem hiding this comment.
Should these be broken down by how implementation state maps to universal state like kafka?
…#7428) * Add state and error tracking for seekable stream supervisors * Fixed nits in docs * Made inner class static and updated spec test with jackson inject * Review changes * Remove redundant config param in supervisor * Style * Applied some of Jon's recommendations * Add transience field * write test * implement code review changes except for reconsidering logic of markRunFinishedAndEvaluateHealth() * remove transience reporting and fix SeekableStreamSupervisorStateManager impl * move call to stateManager.markRunFinished() from RunNotice to runInternal() for tests * remove stateHistory because it wasn't adding much value, some fixes, and add more tests * fix tests * code review changes and add HTTP health check status * fix test failure * refactor to split into a generic SupervisorStateManager and a specific SeekableStreamSupervisorStateManager * fixup after merge * code review changes - add additional docs * cleanup KafkaIndexTaskTest * add additional documentation for Kinesis indexing * remove unused throws class
…#7428) * Add state and error tracking for seekable stream supervisors * Fixed nits in docs * Made inner class static and updated spec test with jackson inject * Review changes * Remove redundant config param in supervisor * Style * Applied some of Jon's recommendations * Add transience field * write test * implement code review changes except for reconsidering logic of markRunFinishedAndEvaluateHealth() * remove transience reporting and fix SeekableStreamSupervisorStateManager impl * move call to stateManager.markRunFinished() from RunNotice to runInternal() for tests * remove stateHistory because it wasn't adding much value, some fixes, and add more tests * fix tests * code review changes and add HTTP health check status * fix test failure * refactor to split into a generic SupervisorStateManager and a specific SeekableStreamSupervisorStateManager * fixup after merge * code review changes - add additional docs * cleanup KafkaIndexTaskTest * add additional documentation for Kinesis indexing * remove unused throws class
Closes #7217
Based on the proposal and discussion in #7217.
The PR adds the following config values (which weren't discussed in the proposal):
/statusendpoint/statusendpointmax(healthinessThreshold, unhealthinessThreshold), 2147483647]max(healthinessThreshold, unhealthinessThreshold)