From 84d5402b640a19dd32efc6174da6f41d7e320be5 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Wed, 8 Apr 2020 12:18:29 -0700 Subject: [PATCH] Ensure that empty messages are not flushed to handler. --- .../JulHandlerPrintStreamAdapterFactory.java | 44 +++++++++---------- ...lHandlerPrintStreamAdapterFactoryTest.java | 21 +++++++++ 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java index 1eb935aa6daa..e4de8cb980f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java @@ -91,13 +91,16 @@ public void write(int i) throws IOException { @Override public void flush() { - publish(flushToString()); + publishIfNonEmpty(flushToString()); } private synchronized String flushToString() { if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') { buffer.setLength(buffer.length() - 1); } + if (buffer.length() == 0) { + return null; + } String result = buffer.toString(); buffer.setLength(0); return result; @@ -163,9 +166,7 @@ public void write(byte[] a, int offset, int length) { } } } - if (msg != null) { - publish(msg); - } + publishIfNonEmpty(msg); } @Override @@ -214,7 +215,7 @@ public void print(char[] a) { } msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -228,7 +229,7 @@ public void print(String s) { } msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -248,7 +249,7 @@ public void println(boolean b) { buffer.append(b); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -258,7 +259,7 @@ public void println(char c) { buffer.append(c); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -268,7 +269,7 @@ public void println(int i) { buffer.append(i); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -278,7 +279,7 @@ public void println(long l) { buffer.append(l); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -288,7 +289,7 @@ public void println(float f) { buffer.append(f); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -298,7 +299,7 @@ public void println(double d) { buffer.append(d); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -308,7 +309,7 @@ public void println(char[] a) { buffer.append(a); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -318,7 +319,7 @@ public void println(String s) { buffer.append(s); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -328,7 +329,7 @@ public void println(Object o) { buffer.append(o); msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); } @Override @@ -348,7 +349,7 @@ public PrintStream format(Locale locale, String format, Object... args) { } msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); return this; } @@ -370,15 +371,18 @@ public PrintStream append(CharSequence cs, int start, int limit) { } msg = flushToString(); } - publish(msg); + publishIfNonEmpty(msg); return this; } // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399. - private void publish(Level messageLevel, String message) { + private void publishIfNonEmpty(String message) { checkState( !Thread.holdsLock(this), "BEAM-9399: publish should not be called with the lock as it may cause deadlock"); + if (message == null || message.isEmpty()) { + return; + } if (logger.isLoggable(messageLevel)) { if (outputWarning.compareAndSet(false, true)) { LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER); @@ -390,10 +394,6 @@ private void publish(Level messageLevel, String message) { handler.publish(log); } } - - private void publish(String message) { - publish(messageLevel, message); - } } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java index a93c26b468a6..ae0396f02e59 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.logging; import static org.apache.beam.runners.dataflow.worker.LogRecordMatcher.hasLogItem; +import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -132,6 +133,26 @@ public void testLogRawBytes() { assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg)); } + @Test + public void testNoEmptyMessages() { + try (PrintStream printStream = createPrintStreamAdapter()) { + printStream.println("blah"); + printStream.print("\n"); + printStream.flush(); + printStream.println(""); + printStream.flush(); + printStream.print(""); + printStream.flush(); + byte[] bytes = "a".getBytes(Charset.defaultCharset()); + printStream.write(bytes, 0, 0); + printStream.flush(); + } + + for (LogRecord log : handler.getLogs()) { + assertThat(log.getMessage(), not(blankOrNullString())); + } + } + private PrintStream createPrintStreamAdapter() { return JulHandlerPrintStreamAdapterFactory.create(handler, LOGGER_NAME, Level.INFO); }