diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c49865aa7511..a683ff1e748d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -848,7 +848,11 @@ public void run() { new TimerTask() { @Override public void run() { - refreshActiveWork(); + try { + refreshActiveWork(); + } catch (RuntimeException e) { + LOG.warn("Failed to refresh active work: ", e); + } } }, options.getActiveWorkRefreshPeriodMillis(), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 781ee596708a..6631ffa13e8a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -663,6 +663,9 @@ protected AbstractWindmillStream( protected final void send(RequestT request) { lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { + if (clientClosed.get()) { + throw new IllegalStateException("Send called on a client closed stream."); + } requestObserver.onNext(request); } }