From b3854f4d4940ac2d2290977b75d91581aef560d3 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 12 Jun 2025 10:42:20 +0200 Subject: [PATCH 1/2] [Java FnApi] Fix race in BeamFnStatusClient by initializing all fields before starting rpc. --- .../apache/beam/fn/harness/status/BeamFnStatusClient.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java index 0b35a2c7d5b1..e32f3491bf8b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java @@ -65,8 +65,6 @@ public BeamFnStatusClient( PipelineOptions options, Cache cache) { this.channel = channelFactory.apply(apiServiceDescriptor); - this.outboundObserver = - BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); this.processBundleCache = processBundleCache; this.memoryMonitor = MemoryMonitor.fromOptions(options); this.cache = cache; @@ -76,6 +74,11 @@ public BeamFnStatusClient( thread.setPriority(Thread.MIN_PRIORITY); thread.setName("MemoryMonitor"); thread.start(); + + // Start the rpc after all the initialization is complete as the InboundObserver + // may be called any time after this. + this.outboundObserver = + BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); } @Override From 71629c694921f39c794ba01bc2e95b8038831161 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 12 Jun 2025 11:01:06 +0200 Subject: [PATCH 2/2] spotless --- .../org/apache/beam/fn/harness/status/BeamFnStatusClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java index e32f3491bf8b..f68dfec3e521 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java @@ -78,7 +78,7 @@ public BeamFnStatusClient( // Start the rpc after all the initialization is complete as the InboundObserver // may be called any time after this. this.outboundObserver = - BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); + BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); } @Override