Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240323-2.0.0", // [bomupgrader] sets version
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240624-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240319-2.0.0", // [bomupgrader] sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,38 @@
*/
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoBuilder;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.annotations.Internal;

/** Keep track of any operational limits required by the backend. */
public class OperationalLimits {
@AutoValue
@Internal
public abstract class OperationalLimits {

private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;

// Maximum size of a commit from a single work item.
public final long maxWorkItemCommitBytes;
public abstract long getMaxWorkItemCommitBytes();
// Maximum size of a single output element's serialized key.
public final long maxOutputKeyBytes;
public abstract long getMaxOutputKeyBytes();
// Maximum size of a single output element's serialized value.
public final long maxOutputValueBytes;
public abstract long getMaxOutputValueBytes();

OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
this.maxOutputKeyBytes = maxOutputKeyBytes;
this.maxOutputValueBytes = maxOutputValueBytes;
}
@AutoValue.Builder
public abstract static class Builder {

@AutoBuilder(ofClass = OperationalLimits.class)
public interface Builder {
Builder setMaxWorkItemCommitBytes(long bytes);
public abstract Builder setMaxWorkItemCommitBytes(long bytes);

Builder setMaxOutputKeyBytes(long bytes);
public abstract Builder setMaxOutputKeyBytes(long bytes);

Builder setMaxOutputValueBytes(long bytes);
public abstract Builder setMaxOutputValueBytes(long bytes);

OperationalLimits build();
public abstract OperationalLimits build();
}

public static Builder builder() {
return new AutoBuilder_OperationalLimits_Builder()
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
public static OperationalLimits.Builder builder() {
return new AutoValue_OperationalLimits.Builder()
.setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.runners.core.metrics.MetricsLogger;
Expand All @@ -49,9 +47,11 @@
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
Expand Down Expand Up @@ -103,9 +103,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -181,7 +179,6 @@ private StreamingDataflowWorker(
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
Expand Down Expand Up @@ -237,8 +234,8 @@ private StreamingDataflowWorker(
streamingCounters,
hotKeyLogger,
sampler,
operationalLimits,
ID_GENERATOR,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);

ThrottlingGetDataMetricTracker getDataMetricTracker =
Expand Down Expand Up @@ -298,6 +295,7 @@ private StreamingDataflowWorker(
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
.setGetDataStatusProvider(getDataClient::printHtml)
.setWorkUnitExecutor(workUnitExecutor)
.setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
.build();

Windmill.GetWorkRequest request =
Expand Down Expand Up @@ -335,8 +333,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
AtomicReference<OperationalLimits> operationalLimits =
new AtomicReference<>(OperationalLimits.builder().build());
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
Expand All @@ -354,7 +350,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
operationalLimits,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
Expand Down Expand Up @@ -412,7 +407,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
workFailureProcessor,
streamingCounters,
memoryMonitor,
operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
Expand All @@ -428,7 +422,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
Expand All @@ -440,13 +433,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
GrpcDispatcherClient.create(createStubFactory(options));
configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient,
config ->
onPipelineConfig(
config,
dispatcherClient::consumeWindmillDispatcherEndpoints,
operationalLimits::set));
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
Expand Down Expand Up @@ -474,7 +462,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport());
}

configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
configFetcher =
new StreamingApplianceComputationConfigFetcher(
windmillServer::getConfig,
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
}

Expand All @@ -494,10 +485,9 @@ static StreamingDataflowWorker forTesting(
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs,
OperationalLimits limits) {
StreamingGlobalConfigHandleImpl globalConfigHandle,
int localRetryTimeoutMs) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
AtomicReference<OperationalLimits> operationalLimits = new AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache =
WindmillStateCache.builder()
Expand All @@ -510,13 +500,20 @@ static StreamingDataflowWorker forTesting(
/* hasReceivedGlobalConfig= */ true,
options.getGlobalConfigRefreshPeriod().getMillis(),
workUnitClient,
executorSupplier,
config ->
onPipelineConfig(
config,
windmillServer::setWindmillServiceEndpoints,
operationalLimits::set))
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
globalConfigHandle,
executorSupplier)
: new StreamingApplianceComputationConfigFetcher(
windmillServer::getConfig, globalConfigHandle);
configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
config -> {
if (config.windmillServiceEndpoints().isEmpty()) {
LOG.warn("Received empty windmill service endpoints");
return;
}
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
});
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
ComputationStateCache computationStateCache =
Expand Down Expand Up @@ -583,7 +580,6 @@ static StreamingDataflowWorker forTesting(
workFailureProcessor,
streamingCounters,
memoryMonitor,
operationalLimits,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
Expand All @@ -594,23 +590,6 @@ static StreamingDataflowWorker forTesting(
stageInfo);
}

private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
Consumer<OperationalLimits> operationalLimits) {

operationalLimits.accept(
OperationalLimits.builder()
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
.setMaxOutputValueBytes(config.maxOutputValueBytes())
.build());

if (!config.windmillServiceEndpoints().isEmpty()) {
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
}
}

private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder(
DataflowWorkerHarnessOptions options, long clientId) {
Duration maxBackoff =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;

Expand Down Expand Up @@ -153,6 +155,7 @@ public StreamingModeExecutionContext(
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
Expand All @@ -163,6 +166,7 @@ public StreamingModeExecutionContext(
sinkByteLimit);
this.computationId = computationId;
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
Expand All @@ -176,11 +180,11 @@ public final long getBacklogBytes() {
}

public long getMaxOutputKeyBytes() {
return operationalLimits.maxOutputKeyBytes;
return operationalLimits.getMaxOutputKeyBytes();
}

public long getMaxOutputValueBytes() {
return operationalLimits.maxOutputValueBytes;
return operationalLimits.getMaxOutputValueBytes();
}

public boolean throwExceptionsForLargeOutput() {
Expand All @@ -196,13 +200,13 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
this.operationalLimits = operationalLimits;
// Snapshot the limits for entire bundle processing.
this.operationalLimits = globalConfigHandle.getConfig().operationalLimits();
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
Expand Down Expand Up @@ -73,11 +72,9 @@ public final void executeWork(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
context()
.start(key, work, stateReader, sideInputStateFetcher, operationalLimits, outputBuilder);
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
workExecutor().execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public static ComputationConfig create(
public abstract ImmutableMap<String, String> stateNameMap();

/** Interface to fetch configurations for a specific computation. */
@FunctionalInterface
public interface Fetcher {
default void start() {}

default void stop() {}

Optional<ComputationConfig> fetchConfig(String computationId);

StreamingGlobalConfigHandle getGlobalConfigHandle();
}
}
Loading