From 4411f01a74ea00f76cd6f79f37b20b7f7e8e9a86 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 22 Mar 2022 16:24:54 -0700 Subject: [PATCH] [BEAM-14157] Don't call requestObserver.onNext on a closed windmill streams requestObserver.onNext() which is called by send should not be called after requestObserver.onCompleted() is called. requestObserver.onCompleted() is called when the stream is closed. I am investigating stalled windmill streams with logs like `Output channel stalled for 31s, outbound thread`. I think sending messages on a closed stream has something to do with it. In test runs with this change, I'm yet to see a stalled GetWorkStream. --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 6 +++++- .../dataflow/worker/windmill/GrpcWindmillServer.java | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c49865aa7511..a683ff1e748d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -848,7 +848,11 @@ public void run() { new TimerTask() { @Override public void run() { - refreshActiveWork(); + try { + refreshActiveWork(); + } catch (RuntimeException e) { + LOG.warn("Failed to refresh active work: ", e); + } } }, options.getActiveWorkRefreshPeriodMillis(), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 781ee596708a..6631ffa13e8a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -663,6 +663,9 @@ protected AbstractWindmillStream( protected final void send(RequestT request) { lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (clientClosed.get()) { + throw new IllegalStateException("Send called on a client closed stream."); + } requestObserver.onNext(request); } }