From 003bb3ad324a9d3ebd4ee86262beae0d5e09b8e1 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sat, 26 Mar 2022 13:12:00 -0700 Subject: [PATCH 1/7] [BEAM-14157] GrpcWindmillServer: Use stream specific boolean to do client closed check This is a follow up to #17162. An AbstractWindmillStream can have more than one grpc stream during its lifetime, new streams can be created after client closed for sending pending requests. So it is not correct to check `if(clientClosed)` in `send()`, this PR adds a new grpc stream level boolean to do the closed check in `send()`. --- .../dataflow/worker/windmill/GrpcWindmillServer.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 6631ffa13e8a..6d678f876e7c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -632,6 +632,8 @@ private abstract class AbstractWindmillStream implements Wi // The following should be protected by synchronizing on this, except for // the atomics which may be read atomically for status pages. private StreamObserver requestObserver; + // Indicates if the current stream in requestObserver is client closed + private final AtomicBoolean streamClosed = new AtomicBoolean(); private final AtomicLong startTimeMs = new AtomicLong(); private final AtomicLong lastSendTimeMs = new AtomicLong(); private final AtomicLong lastResponseTimeMs = new AtomicLong(); @@ -663,7 +665,7 @@ protected AbstractWindmillStream( protected final void send(RequestT request) { lastSendTimeMs.set(Instant.now().getMillis()); synchronized (this) { - if (clientClosed.get()) { + if (streamClosed.get()) { throw new IllegalStateException("Send called on a client closed stream."); } requestObserver.onNext(request); @@ -681,6 +683,7 @@ protected final void startStream() { startTimeMs.set(Instant.now().getMillis()); lastResponseTimeMs.set(0); requestObserver = streamObserverFactory.from(clientFactory, new ResponseObserver()); + streamClosed.set(false); onNewStream(); if (clientClosed.get()) { close(); @@ -742,10 +745,11 @@ public final void appendSummaryHtml(PrintWriter writer) { writer.format(", %dms backoff remaining", sleepLeft); } writer.format( - ", current stream is %dms old, last send %dms, last response %dms", + ", current stream is %dms old, last send %dms, last response %dms, closed: %s", debugDuration(nowMs, startTimeMs.get()), debugDuration(nowMs, lastSendTimeMs.get()), - debugDuration(nowMs, lastResponseTimeMs.get())); + debugDuration(nowMs, lastResponseTimeMs.get()), + streamClosed.get() ? "true" : "false"); } // Don't require synchronization on stream, see the appendSummaryHtml comment. @@ -838,6 +842,7 @@ public final synchronized void close() { // Synchronization of close and onCompleted necessary for correct retry logic in onNewStream. clientClosed.set(true); requestObserver.onCompleted(); + streamClosed.set(true); } @Override From 0d4e5d21732942f9e991da4231ac66bb10ee4fbe Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 3 Apr 2022 23:23:40 -0700 Subject: [PATCH 2/7] [BEAM-14157] Add unit test testing CommitWorkStream retries around stream closing --- .../windmill/GrpcWindmillServerTest.java | 230 ++++++++++++------ 1 file changed, 157 insertions(+), 73 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java index c5d7b0c0f323..bfa1af05cfa6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java @@ -28,11 +28,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; @@ -491,11 +493,96 @@ private WorkItemCommitRequest makeCommitRequest(int i, int size) { .build(); } + // This server receives WorkItemCommitRequests, and verifies they are equal to the above + // commitRequest. + private StreamObserver getTestCommitStreamObserver( + StreamObserver responseObserver, + Map commitRequests) { + return new StreamObserver() { + boolean sawHeader = false; + InputStream buffer = null; + long remainingBytes = 0; + ResponseErrorInjector injector = new ResponseErrorInjector(responseObserver); + + @Override + public void onNext(StreamingCommitWorkRequest request) { + maybeInjectError(responseObserver); + + if (!sawHeader) { + errorCollector.checkThat( + request.getHeader(), + Matchers.equalTo( + JobHeader.newBuilder() + .setJobId("job") + .setProjectId("project") + .setWorkerId("worker") + .build())); + sawHeader = true; + LOG.info("Received header"); + } else { + boolean first = true; + LOG.info("Received request with {} chunks", request.getCommitChunkCount()); + for (StreamingCommitRequestChunk chunk : request.getCommitChunkList()) { + assertTrue(chunk.getSerializedWorkItemCommit().size() <= STREAM_CHUNK_SIZE); + if (first || chunk.hasComputationId()) { + errorCollector.checkThat(chunk.getComputationId(), Matchers.equalTo("computation")); + } + + if (remainingBytes != 0) { + errorCollector.checkThat(buffer, Matchers.notNullValue()); + errorCollector.checkThat( + remainingBytes, + Matchers.is( + chunk.getSerializedWorkItemCommit().size() + + chunk.getRemainingBytesForWorkItem())); + buffer = + new SequenceInputStream(buffer, chunk.getSerializedWorkItemCommit().newInput()); + } else { + errorCollector.checkThat(buffer, Matchers.nullValue()); + buffer = chunk.getSerializedWorkItemCommit().newInput(); + } + remainingBytes = chunk.getRemainingBytesForWorkItem(); + if (remainingBytes == 0) { + try { + WorkItemCommitRequest received = WorkItemCommitRequest.parseFrom(buffer); + errorCollector.checkThat( + received, Matchers.equalTo(commitRequests.get(received.getWorkToken()))); + try { + responseObserver.onNext( + StreamingCommitResponse.newBuilder() + .addRequestId(chunk.getRequestId()) + .build()); + } catch (IllegalStateException e) { + // Stream is closed. + } + } catch (Exception e) { + errorCollector.addError(e); + } + buffer = null; + } else { + errorCollector.checkThat(first, Matchers.is(true)); + } + first = false; + } + } + } + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() { + injector.cancel(); + responseObserver.onCompleted(); + } + }; + } + @Test public void testStreamingCommit() throws Exception { List commitRequestList = new ArrayList<>(); List latches = new ArrayList<>(); - Map commitRequests = new HashMap<>(); + Map commitRequests = new ConcurrentHashMap<>(); for (int i = 0; i < 500; ++i) { // Build some requests of varying size with a few big ones. WorkItemCommitRequest request = makeCommitRequest(i, i * (i < 480 ? 8 : 128)); @@ -505,92 +592,86 @@ public void testStreamingCommit() throws Exception { } Collections.shuffle(commitRequestList); - // This server receives WorkItemCommitRequests, and verifies they are equal to the above - // commitRequest. serviceRegistry.addService( new CloudWindmillServiceV1Alpha1ImplBase() { @Override public StreamObserver commitWorkStream( StreamObserver responseObserver) { - return new StreamObserver() { - boolean sawHeader = false; - InputStream buffer = null; - long remainingBytes = 0; - ResponseErrorInjector injector = new ResponseErrorInjector(responseObserver); + return getTestCommitStreamObserver(responseObserver, commitRequests); + } + }); - @Override - public void onNext(StreamingCommitWorkRequest request) { - maybeInjectError(responseObserver); + // Make the commit requests, waiting for each of them to be verified and acknowledged. + CommitWorkStream stream = client.commitWorkStream(); + for (int i = 0; i < commitRequestList.size(); ) { + final CountDownLatch latch = latches.get(i); + if (stream.commitWorkItem( + "computation", + commitRequestList.get(i), + (CommitStatus status) -> { + assertEquals(status, CommitStatus.OK); + latch.countDown(); + })) { + i++; + } else { + stream.flush(); + } + } + stream.flush(); + stream.close(); + for (CountDownLatch latch : latches) { + assertTrue(latch.await(1, TimeUnit.MINUTES)); + } + assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS)); + } - if (!sawHeader) { - errorCollector.checkThat( - request.getHeader(), - Matchers.equalTo( - JobHeader.newBuilder() - .setJobId("job") - .setProjectId("project") - .setWorkerId("worker") - .build())); - sawHeader = true; - LOG.info("Received header"); - } else { - boolean first = true; - LOG.info("Received request with {} chunks", request.getCommitChunkCount()); - for (StreamingCommitRequestChunk chunk : request.getCommitChunkList()) { - assertTrue(chunk.getSerializedWorkItemCommit().size() <= STREAM_CHUNK_SIZE); - if (first || chunk.hasComputationId()) { - errorCollector.checkThat( - chunk.getComputationId(), Matchers.equalTo("computation")); - } + @Test + // Tests stream retries on server errors before and after `close()` + public void testStreamingCommitClosedStream() throws Exception { + List commitRequestList = new ArrayList<>(); + List latches = new ArrayList<>(); + Map commitRequests = new ConcurrentHashMap<>(); + AtomicBoolean shouldServerReturnError = new AtomicBoolean(true); + for (int i = 0; i < 500; ++i) { + // Build some requests of varying size with a few big ones. + WorkItemCommitRequest request = makeCommitRequest(i, i * (i < 480 ? 8 : 128)); + commitRequestList.add(request); + commitRequests.put((long) i, request); + latches.add(new CountDownLatch(1)); + } + Collections.shuffle(commitRequestList); - if (remainingBytes != 0) { - errorCollector.checkThat(buffer, Matchers.notNullValue()); - errorCollector.checkThat( - remainingBytes, - Matchers.is( - chunk.getSerializedWorkItemCommit().size() - + chunk.getRemainingBytesForWorkItem())); - buffer = - new SequenceInputStream( - buffer, chunk.getSerializedWorkItemCommit().newInput()); - } else { - errorCollector.checkThat(buffer, Matchers.nullValue()); - buffer = chunk.getSerializedWorkItemCommit().newInput(); - } - remainingBytes = chunk.getRemainingBytesForWorkItem(); - if (remainingBytes == 0) { - try { - WorkItemCommitRequest received = WorkItemCommitRequest.parseFrom(buffer); - errorCollector.checkThat( - received, - Matchers.equalTo(commitRequests.get(received.getWorkToken()))); - try { - responseObserver.onNext( - StreamingCommitResponse.newBuilder() - .addRequestId(chunk.getRequestId()) - .build()); - } catch (IllegalStateException e) { - // Stream is closed. - } - } catch (Exception e) { - errorCollector.addError(e); - } - buffer = null; - } else { - errorCollector.checkThat(first, Matchers.is(true)); - } - first = false; + // This server returns errors if shouldServerReturnError is true, else returns valid responses. + serviceRegistry.addService( + new CloudWindmillServiceV1Alpha1ImplBase() { + @Override + public StreamObserver commitWorkStream( + StreamObserver responseObserver) { + StreamObserver testCommitStreamObserver = + getTestCommitStreamObserver(responseObserver, commitRequests); + return new StreamObserver() { + @Override + public void onNext(StreamingCommitWorkRequest request) { + if (shouldServerReturnError.get()) { + try { + responseObserver.onError( + new RuntimeException("shouldServerReturnError = true")); + } catch (IllegalStateException e) { + // The stream is already closed. } + } else { + testCommitStreamObserver.onNext(request); } } @Override - public void onError(Throwable throwable) {} + public void onError(Throwable throwable) { + testCommitStreamObserver.onError(throwable); + } @Override public void onCompleted() { - injector.cancel(); - responseObserver.onCompleted(); + testCommitStreamObserver.onCompleted(); } }; } @@ -613,11 +694,14 @@ public void onCompleted() { } } stream.flush(); + stream.close(); + + Thread.sleep(100); + shouldServerReturnError.set(false); + for (CountDownLatch latch : latches) { assertTrue(latch.await(1, TimeUnit.MINUTES)); } - - stream.close(); assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS)); } From dffd16f0e25c76a3c69ce9d2bbc84e4520af04d3 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 4 Apr 2022 00:10:09 -0700 Subject: [PATCH 3/7] [BEAM-14157] review comments --- .../runners/dataflow/worker/windmill/GrpcWindmillServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 6d678f876e7c..e914ef160deb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -632,7 +632,7 @@ private abstract class AbstractWindmillStream implements Wi // The following should be protected by synchronizing on this, except for // the atomics which may be read atomically for status pages. private StreamObserver requestObserver; - // Indicates if the current stream in requestObserver is client closed + // Indicates if the current stream in requestObserver is closed by calling close() method private final AtomicBoolean streamClosed = new AtomicBoolean(); private final AtomicLong startTimeMs = new AtomicLong(); private final AtomicLong lastSendTimeMs = new AtomicLong(); @@ -749,7 +749,7 @@ public final void appendSummaryHtml(PrintWriter writer) { debugDuration(nowMs, startTimeMs.get()), debugDuration(nowMs, lastSendTimeMs.get()), debugDuration(nowMs, lastResponseTimeMs.get()), - streamClosed.get() ? "true" : "false"); + streamClosed.get()); } // Don't require synchronization on stream, see the appendSummaryHtml comment. From 6108688d8b4417925915411a1ea3bc83909c6eaf Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 4 Apr 2022 09:29:05 -0700 Subject: [PATCH 4/7] [BEAM-14157] review comments --- .../windmill/GrpcWindmillServerTest.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java index bfa1af05cfa6..b19f0ab85aa1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.InputStream; import java.io.SequenceInputStream; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; @@ -493,7 +495,7 @@ private WorkItemCommitRequest makeCommitRequest(int i, int size) { .build(); } - // This server receives WorkItemCommitRequests, and verifies they are equal to the above + // This server receives WorkItemCommitRequests, and verifies they are equal to the provided // commitRequest. private StreamObserver getTestCommitStreamObserver( StreamObserver responseObserver, @@ -632,6 +634,9 @@ public void testStreamingCommitClosedStream() throws Exception { List latches = new ArrayList<>(); Map commitRequests = new ConcurrentHashMap<>(); AtomicBoolean shouldServerReturnError = new AtomicBoolean(true); + AtomicBoolean isClientClosed = new AtomicBoolean(false); + AtomicInteger errorsBeforeClose = new AtomicInteger(); + AtomicInteger errorsAfterClose = new AtomicInteger(); for (int i = 0; i < 500; ++i) { // Build some requests of varying size with a few big ones. WorkItemCommitRequest request = makeCommitRequest(i, i * (i < 480 ? 8 : 128)); @@ -656,6 +661,11 @@ public void onNext(StreamingCommitWorkRequest request) { try { responseObserver.onError( new RuntimeException("shouldServerReturnError = true")); + if (isClientClosed.get()) { + errorsAfterClose.incrementAndGet(); + } else { + errorsBeforeClose.incrementAndGet(); + } } catch (IllegalStateException e) { // The stream is already closed. } @@ -695,8 +705,24 @@ public void onCompleted() { } stream.flush(); stream.close(); + isClientClosed.set(true); - Thread.sleep(100); + long deadline = System.currentTimeMillis() + 60_000; // 1 min + while (true) { + Thread.sleep(100); + int tmpErrorsAfterClose = errorsAfterClose.get(); + int tmpErrorsBeforeClose = errorsBeforeClose.get(); + // wait for at least 2 errors before and after + if (tmpErrorsAfterClose > 1 && tmpErrorsBeforeClose > 1) { + break; + } + if (System.currentTimeMillis() > deadline) { + fail( + String.format( + "Expected errors not sent by server errorsAfterClose: %s errorsBeforeClose: %s", + tmpErrorsAfterClose, tmpErrorsBeforeClose)); + } + } shouldServerReturnError.set(false); for (CountDownLatch latch : latches) { From 241aae50fe8cc94946ddc864c93431de804b55dc Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 4 Apr 2022 09:51:38 -0700 Subject: [PATCH 5/7] [BEAM-14157] review comments --- .../dataflow/worker/windmill/GrpcWindmillServerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java index b19f0ab85aa1..1ddf4656c76b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java @@ -717,9 +717,11 @@ public void onCompleted() { break; } if (System.currentTimeMillis() > deadline) { + // Control should not reach here if the test is working as expected fail( String.format( - "Expected errors not sent by server errorsAfterClose: %s errorsBeforeClose: %s", + "Expected errors not sent by server errorsAfterClose: %s errorsBeforeClose: %s" + + " \n Should not reach here if the test is working as expected.", tmpErrorsAfterClose, tmpErrorsBeforeClose)); } } From 82dfb1af5301c0ce6b34e7e76271c38591e950be Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 4 Apr 2022 13:12:23 -0700 Subject: [PATCH 6/7] [BEAM-14157] fix test --- .../dataflow/worker/windmill/GrpcWindmillServerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java index 1ddf4656c76b..64e6905284c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java @@ -712,8 +712,8 @@ public void onCompleted() { Thread.sleep(100); int tmpErrorsAfterClose = errorsAfterClose.get(); int tmpErrorsBeforeClose = errorsBeforeClose.get(); - // wait for at least 2 errors before and after - if (tmpErrorsAfterClose > 1 && tmpErrorsBeforeClose > 1) { + // wait for at least 1 errors before and after + if (tmpErrorsAfterClose > 0 && tmpErrorsBeforeClose > 0) { break; } if (System.currentTimeMillis() > deadline) { From 306c32dce268adb9c86c650417ee030e587dd495 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 4 Apr 2022 13:18:13 -0700 Subject: [PATCH 7/7] [BEAM-14157] fix test --- .../windmill/GrpcWindmillServerTest.java | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java index 64e6905284c1..64a31f368315 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java @@ -704,29 +704,47 @@ public void onCompleted() { } } stream.flush(); + + long deadline = System.currentTimeMillis() + 60_000; // 1 min + while (true) { + Thread.sleep(100); + int tmpErrorsBeforeClose = errorsBeforeClose.get(); + // wait for at least 1 errors before close + if (tmpErrorsBeforeClose > 0) { + break; + } + if (System.currentTimeMillis() > deadline) { + // Control should not reach here if the test is working as expected + fail( + String.format( + "Expected errors not sent by server errorsBeforeClose: %s" + + " \n Should not reach here if the test is working as expected.", + tmpErrorsBeforeClose)); + } + } + stream.close(); isClientClosed.set(true); - long deadline = System.currentTimeMillis() + 60_000; // 1 min + deadline = System.currentTimeMillis() + 60_000; // 1 min while (true) { Thread.sleep(100); int tmpErrorsAfterClose = errorsAfterClose.get(); - int tmpErrorsBeforeClose = errorsBeforeClose.get(); - // wait for at least 1 errors before and after - if (tmpErrorsAfterClose > 0 && tmpErrorsBeforeClose > 0) { + // wait for at least 1 errors after close + if (tmpErrorsAfterClose > 0) { break; } if (System.currentTimeMillis() > deadline) { // Control should not reach here if the test is working as expected fail( String.format( - "Expected errors not sent by server errorsAfterClose: %s errorsBeforeClose: %s" + "Expected errors not sent by server errorsAfterClose: %s" + " \n Should not reach here if the test is working as expected.", - tmpErrorsAfterClose, tmpErrorsBeforeClose)); + tmpErrorsAfterClose)); } } - shouldServerReturnError.set(false); + shouldServerReturnError.set(false); for (CountDownLatch latch : latches) { assertTrue(latch.await(1, TimeUnit.MINUTES)); }