-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-22] Schedule roots less aggressively #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9423fa6
550953f
3a649bd
35e823a
caa142d
d354ee9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -304,8 +304,8 @@ public void run() { | |
| visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); | ||
| } | ||
| } | ||
| fireTimers(); | ||
| mightNeedMoreWork(); | ||
| boolean timersFired = fireTimers(); | ||
| addWorkIfNecessary(timersFired); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("Monitor died due to being interrupted"); | ||
|
|
@@ -326,8 +326,12 @@ public void run() { | |
| } | ||
| } | ||
|
|
||
| private void fireTimers() throws Exception { | ||
| /** | ||
| * Fires any available timers. Returns true if at least one timer was fired. | ||
| */ | ||
| private boolean fireTimers() throws Exception { | ||
| try { | ||
| boolean firedTimers = false; | ||
| for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers : | ||
| evaluationContext.extractFiredTimers().entrySet()) { | ||
| AppliedPTransform<?, ?, ?> transform = transformTimers.getKey(); | ||
|
|
@@ -346,9 +350,11 @@ private void fireTimers() throws Exception { | |
| .add(WindowedValue.valueInEmptyWindows(work)) | ||
| .commit(Instant.now()); | ||
| scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); | ||
| firedTimers = true; | ||
| } | ||
| } | ||
| } | ||
| return firedTimers; | ||
| } catch (Exception e) { | ||
| LOG.error("Internal Error while delivering timers", e); | ||
| throw e; | ||
|
|
@@ -367,26 +373,58 @@ private boolean shouldShutdown() { | |
| return false; | ||
| } | ||
|
|
||
| private void mightNeedMoreWork() { | ||
| synchronized (scheduledExecutors) { | ||
| for (TransformExecutor<?> executor : scheduledExecutors.keySet()) { | ||
| Thread thread = executor.getThread(); | ||
| if (thread != null) { | ||
| switch (thread.getState()) { | ||
| case BLOCKED: | ||
| case WAITING: | ||
| case TERMINATED: | ||
| case TIMED_WAITING: | ||
| break; | ||
| default: | ||
| return; | ||
| } | ||
| } | ||
| /** | ||
| * If all active {@link TransformExecutor TransformExecutors} are in a blocked state, | ||
| * add more work from root nodes that may have additional work. This ensures that if a pipeline | ||
| * has elements available from the root nodes it will add those elements when necessary. | ||
| */ | ||
| private void addWorkIfNecessary(boolean firedTimers) { | ||
| // If any timers have fired, they will add more work; We don't need to add more | ||
| if (firedTimers) { | ||
| return; | ||
| } | ||
| for (TransformExecutor<?> executor : scheduledExecutors.keySet()) { | ||
| if (!isExecutorBlocked(executor)) { | ||
| // We have at least one executor that can proceed without adding additional work | ||
| return; | ||
| } | ||
| } | ||
| // All current TransformExecutors are blocked; add more work from the roots. | ||
| for (AppliedPTransform<?, ?, ?> root : rootNodes) { | ||
| scheduleConsumption(root, null, defaultCompletionCallback); | ||
| if (!evaluationContext.isDone(root)) { | ||
| scheduleConsumption(root, null, defaultCompletionCallback); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Return true if the provided executor might make more progress if no action is taken. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just making sure: The way I read this, we want any incorrectness to be towards returning false (eg., if there is a chance we'll make progress). This is safe, because if we think we might make progress there will be a future in which either (1) we've made progress or (2) we check again and decide there is no longer a chance to make progress and then this will return true. Correct?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
| * | ||
| * <p>May return false even if all executor threads are currently blocked or cleaning up, as | ||
| * these can cause more work to be scheduled. If this does not occur, after these calls | ||
| * terminate, future calls will return true if all executors are waiting. | ||
| */ | ||
| private boolean isExecutorBlocked(TransformExecutor<?> executor) { | ||
| Thread thread = executor.getThread(); | ||
| if (thread == null) { | ||
| return false; | ||
| } | ||
| switch (thread.getState()) { | ||
| case TERMINATED: | ||
| throw new IllegalStateException(String.format( | ||
| "Unexpectedly encountered a Terminated TransformExecutor %s", executor)); | ||
| case WAITING: | ||
| case TIMED_WAITING: | ||
| // The thread is waiting for some external input. Adding more work may cause the thread | ||
| // to stop waiting (e.g. the thread is waiting on an unbounded side input) | ||
| return true; | ||
| case BLOCKED: | ||
| // The executor is blocked on acquisition of a java monitor. This usually means it is | ||
| // making a call to the EvaluationContext, but not a model-blocking call - and will | ||
| // eventually complete, at which point we may reevaluate. | ||
| default: | ||
| // NEW and RUNNABLE threads can make progress | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -357,27 +357,49 @@ public CounterSet getCounters() { | |
| } | ||
|
|
||
| /** | ||
| * Returns true if all steps are done. | ||
| * Returns true if the step will not produce additional output. | ||
| * | ||
| * <p>If the provided transform produces only {@link IsBounded#BOUNDED} | ||
| * {@link PCollection PCollections}, returns true if the watermark is at | ||
| * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. | ||
| * | ||
| * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED} | ||
| * {@link PCollection PCollections}, returns the value of | ||
| * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. | ||
| */ | ||
| public boolean isDone() { | ||
| if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) { | ||
| public boolean isDone(AppliedPTransform<?, ?, ?> transform) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc on what it means for a transform to be done according to this?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| // if the PTransform's watermark isn't at the max value, it isn't done | ||
| if (watermarkManager | ||
| .getWatermarks(transform) | ||
| .getOutputWatermark() | ||
| .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { | ||
| return false; | ||
| } | ||
| if (!watermarkManager.allWatermarksAtPositiveInfinity()) { | ||
| return false; | ||
| // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, | ||
| // the PTransform may produce additional output. It is not done. | ||
| for (PValue output : transform.getOutput().expand()) { | ||
| if (output instanceof PCollection) { | ||
| IsBounded bounded = ((PCollection<?>) output).isBounded(); | ||
| if (bounded.equals(IsBounded.UNBOUNDED) | ||
| && !options.isShutdownUnboundedProducersWithMaxWatermark()) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
| // The PTransform's watermark was at positive infinity and all of its outputs are known to be | ||
| // done. It is done. | ||
| return true; | ||
| } | ||
|
|
||
| private boolean containsUnboundedPCollection() { | ||
| /** | ||
| * Returns true if all steps are done. | ||
| */ | ||
| public boolean isDone() { | ||
| for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) { | ||
| for (PValue value : transform.getInput().expand()) { | ||
| if (value instanceof PCollection | ||
| && ((PCollection<?>) value).isBounded().equals(IsBounded.UNBOUNDED)) { | ||
| return true; | ||
| } | ||
| if (!isDone(transform)) { | ||
| return false; | ||
| } | ||
| } | ||
| return false; | ||
| return true; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc on what the return value means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.