Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,28 +311,30 @@ public void run()
}
}
);
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
catch (Exception e) {
if (consumer != null) {
consumer.close();
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}

firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Optional<SupervisorReport> getSupervisorStatus(String id)

/**
* Stops a supervisor with a given id and then removes it from the list.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
Expand All @@ -164,7 +165,7 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write

/**
* Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
Expand All @@ -177,13 +178,23 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
return false;
}

Supervisor supervisor = spec.createSupervisor();
supervisor.start(); // try starting the supervisor first so we don't persist a bad spec

if (persistSpec) {
metadataSupervisorManager.insert(id, spec);
}

Supervisor supervisor = null;
try {
supervisor = spec.createSupervisor();
supervisor.start();
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does supervisor.start() failure guarantee that things are not left in a corrupted state and supervisor.stop(true) does not need to be called?

@dclim ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug to be safe, we should probably make a call to consumer.close() in KafkaSupervisor.start() if the Kafka consumer was instantiated but an exception was thrown afterwards; @pjain1 do you mind adding this?

if (persistSpec) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec());
}
Throwables.propagate(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering is this needs to have a retry for some kind of transitory exception or just fail the task. I think with metadata storage we have in place some retry mechanism.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok for now .. i don't think we have retry mechanism at least for this insert operation

}

supervisors.put(id, Pair.of(supervisor, spec));
return true;
}
Expand Down