Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ public class OperationalLimits {
public final long maxOutputKeyBytes;
// Maximum size of a single output element's serialized value.
public final long maxOutputValueBytes;
// Whether to throw an exception when processing output that violates any of the given limits.
public final boolean throwExceptionOnLargeOutput;

OperationalLimits(
long maxWorkItemCommitBytes,
long maxOutputKeyBytes,
long maxOutputValueBytes,
boolean throwExceptionOnLargeOutput) {
OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
this.maxOutputKeyBytes = maxOutputKeyBytes;
this.maxOutputValueBytes = maxOutputValueBytes;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@AutoBuilder(ofClass = OperationalLimits.class)
Expand All @@ -49,16 +42,13 @@ public interface Builder {

Builder setMaxOutputValueBytes(long bytes);

Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);

OperationalLimits build();
}

public static Builder builder() {
return new AutoBuilder_OperationalLimits_Builder()
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE)
.setThrowExceptionOnLargeOutput(false);
.setMaxOutputValueBytes(Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
config ->
onPipelineConfig(
config,
options,
dispatcherClient::consumeWindmillDispatcherEndpoints,
operationalLimits::set));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
Expand Down Expand Up @@ -515,7 +514,6 @@ static StreamingDataflowWorker forTesting(
config ->
onPipelineConfig(
config,
options,
windmillServer::setWindmillServiceEndpoints,
operationalLimits::set))
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
Expand Down Expand Up @@ -598,7 +596,6 @@ static StreamingDataflowWorker forTesting(

private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
DataflowWorkerHarnessOptions options,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
Consumer<OperationalLimits> operationalLimits) {

Expand All @@ -607,8 +604,6 @@ private static void onPipelineConfig(
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
.setMaxOutputValueBytes(config.maxOutputValueBytes())
.setThrowExceptionOnLargeOutput(
DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"))
.build());

if (!config.windmillServiceEndpoints().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;

/**
Expand Down Expand Up @@ -152,7 +153,8 @@ public StreamingModeExecutionContext(
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
long sinkByteLimit) {
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -165,6 +167,7 @@ public StreamingModeExecutionContext(
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@VisibleForTesting
Expand All @@ -181,7 +184,7 @@ public long getMaxOutputValueBytes() {
}

public boolean throwExceptionsForLargeOutput() {
return operationalLimits.throwExceptionOnLargeOutput;
return throwExceptionOnLargeOutput;
}

public boolean workIsFailed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ final class ComputationWorkExecutorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ComputationWorkExecutorFactory.class);
private static final String DISABLE_SINK_BYTE_LIMIT_EXPERIMENT =
"disable_limiting_bundle_sink_bytes";
// Whether to throw an exception when processing output that violates any of the operational
// limits.
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
"throw_exceptions_on_large_output";

private final DataflowWorkerHarnessOptions options;
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
Expand All @@ -90,6 +94,7 @@ final class ComputationWorkExecutorFactory {

private final long maxSinkBytes;
private final IdGenerator idGenerator;
private final boolean throwExceptionOnLargeOutput;

ComputationWorkExecutorFactory(
DataflowWorkerHarnessOptions options,
Expand All @@ -113,6 +118,8 @@ final class ComputationWorkExecutorFactory {
hasExperiment(options, DISABLE_SINK_BYTE_LIMIT_EXPERIMENT)
? Long.MAX_VALUE
: StreamingDataflowWorker.MAX_SINK_BYTES;
this.throwExceptionOnLargeOutput =
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
}

private static Nodes.ParallelInstructionNode extractReadNode(
Expand Down Expand Up @@ -255,7 +262,8 @@ private StreamingModeExecutionContext createExecutionContext(
stageInfo.metricsContainerRegistry(),
executionStateTracker,
stageInfo.executionStateRegistry(),
maxSinkBytes);
maxSinkBytes,
throwExceptionOnLargeOutput);
}

private DataflowMapTaskExecutor createMapTaskExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,13 +1280,9 @@ public void testOutputKeyTooLargeException() throws Exception {

StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
.setOperationalLimits(
OperationalLimits.builder()
.setMaxOutputKeyBytes(15)
.setThrowExceptionOnLargeOutput(true)
.build())
.setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
.build());
worker.start();

Expand Down Expand Up @@ -1317,13 +1313,10 @@ public void testOutputValueTooLargeException() throws Exception {

StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
.setOperationalLimits(
OperationalLimits.builder()
.setMaxOutputValueBytes(15)
.setThrowExceptionOnLargeOutput(true)
.build())
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
.build());
worker.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setUp() {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
}

private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ public void testReadUnboundedReader() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 10;
Expand Down Expand Up @@ -978,7 +979,8 @@ public void testFailedWorkItemsAbort() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 100;
Expand Down