stop supervisor in case of metadata failures#3456
stop supervisor in case of metadata failures#3456b-slim merged 1 commit intoapache:masterfrom pjain1:kafka_indexing_fix
Conversation
There was a problem hiding this comment.
not sure why not doing is gracefully by having true instead of false?
There was a problem hiding this comment.
If we stop it gracefully that means tasks that may have been started by this supervisor are allowed to publish what they have read which might be really tiny data as the metadata insert happens just after the supervisor starts.
Ideally supervisor.start() should be called only if metadata insert succeeds but it is not possible here.
There was a problem hiding this comment.
I checked the code again and stop gracefully set to false means running tasks are not killed. So, false flag is meant to handle cases like leadership changes. Here, we would like to just kill the tasks so doing what @cheddar suggested would be correct.
There was a problem hiding this comment.
I notice this comment here. I wonder if the supervisor shouldn't have a .verify() call on it to make sure the spec is legit, then persist to DB then start()?
Or, maybe just store in the DB first, start() and if start() doesn't work, fail out and remove from the DB again?
|
updated the fix to start the supervisor first and then persist the spec if it succeeds otherwise write tombstone to prevent it being from picked up again. |
There was a problem hiding this comment.
wondering is this needs to have a retry for some kind of transitory exception or just fail the task. I think with metadata storage we have in place some retry mechanism.
There was a problem hiding this comment.
I think it is ok for now .. i don't think we have retry mechanism at least for this insert operation
|
fix seems legit, it will be nice if we can gracefully retry if the insert has a transient kind of issues. 👍 |
There was a problem hiding this comment.
does supervisor.start() failure guarantee that things are not left in a corrupted state and supervisor.stop(true) does not need to be called?
@dclim ?
There was a problem hiding this comment.
@himanshug to be safe, we should probably make a call to consumer.close() in KafkaSupervisor.start() if the Kafka consumer was instantiated but an exception was thrown afterwards; @pjain1 do you mind adding this?
|
@pjain1 yes that seems good to me, sorry for missing your comment; logic looks good to me, maybe just ensure the Kafka consumer gets cleaned up if start() fails which is something I should've done before, thanks! |
|
@dclim added please verify |
There was a problem hiding this comment.
@pjain1 Can you actually move the code below it:
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
into the try-catch as well? And also you should probably do a (if consumer != null) {consumer.close();} to avoid an NPE
|
@pjain1 sweet thanks, LGTM 👍 |
close kafka consumer in case supervisor start fails
|
rebased with master |
|
@dclim @b-slim @himanshug can we get this merged for 0.9.2-rc2 |
|
looks like we have 2 +1 |
|
@pjain1 backport plz |
close kafka consumer in case supervisor start fails
Possibly Fixes #3455