diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java index 0b35a2c7d5b1..f68dfec3e521 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java @@ -65,8 +65,6 @@ public BeamFnStatusClient( PipelineOptions options, Cache cache) { this.channel = channelFactory.apply(apiServiceDescriptor); - this.outboundObserver = - BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); this.processBundleCache = processBundleCache; this.memoryMonitor = MemoryMonitor.fromOptions(options); this.cache = cache; @@ -76,6 +74,11 @@ public BeamFnStatusClient( thread.setPriority(Thread.MIN_PRIORITY); thread.setName("MemoryMonitor"); thread.start(); + + // Start the rpc after all the initialization is complete as the InboundObserver + // may be called any time after this. + this.outboundObserver = + BeamFnWorkerStatusGrpc.newStub(channel).workerStatus(new InboundObserver()); } @Override