From f34ecc3c7d8a197e3539f2ee6dcc22d8792be6fc Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 11 Sep 2020 04:20:49 -0700 Subject: [PATCH] [BEAM-9399] Change DataflowWorkerLoggingHandler to report errors to the original System.err Currently such errors are logged to System.err which is a PrintStream that publishes to the handler. This is perhaps unlikely to work if earlier publishing failed and additionally removes a potential deadlock between the PrintStream object sychronization and the Handler object synchronization. This was attempted to be fixed earlier by dissallowing the PrintStream object to be synchronized when calling into the handler. However this is possible to be triggered by external synchronization on the PrintStream, such as that performed by Throwable.printStackTrace. Changing the PrintStream to use separate synchronization for buffering works in most cases but not for cases where the stream is externally synchronized. --- .../DataflowWorkerLoggingInitializer.java | 33 ++++++++++++++++ .../JulHandlerPrintStreamAdapterFactory.java | 38 ++++++++----------- ...lHandlerPrintStreamAdapterFactoryTest.java | 8 ++++ 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java index 56688470dc79..ebef8ea707cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java @@ -29,6 +29,7 @@ import java.io.PrintStream; import java.util.List; import java.util.Map; +import java.util.logging.ErrorManager; import java.util.logging.Level; import java.util.logging.LogManager; import java.util.logging.Logger; @@ -98,6 +99,35 @@ public class DataflowWorkerLoggingInitializer { private static PrintStream originalStdErr = System.err; private static boolean initialized = false; + // This is the same as ErrorManager except that it uses the provided + // print stream. + public static class PrintStreamErrorManager extends ErrorManager { + public PrintStreamErrorManager(PrintStream stream) { + this.stream = stream; + } + + private PrintStream stream; + private boolean reported = false; + + @Override + public synchronized void error(String msg, Exception ex, int code) { + if (reported) { + // We only report the first error, to avoid clogging + // the screen. + return; + } + reported = true; + String text = "java.util.logging.ErrorManager: " + code; + if (msg != null) { + text = text + ": " + msg; + } + stream.println(text); + if (ex != null) { + ex.printStackTrace(stream); + } + } + }; + private static DataflowWorkerLoggingHandler makeLoggingHandler( String filepathProperty, String defaultFilePath) throws IOException { String filepath = System.getProperty(filepathProperty, defaultFilePath); @@ -105,6 +135,9 @@ private static DataflowWorkerLoggingHandler makeLoggingHandler( DataflowWorkerLoggingHandler handler = new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L); handler.setLevel(Level.ALL); + // To avoid potential deadlock between the handler and the System.err print stream, use the + // original stderr print stream for errors. See BEAM-9399. + handler.setErrorManager(new PrintStreamErrorManager(getOriginalStdErr())); return handler; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java index 08a995d8275a..79ef587a89c6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.logging; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -96,10 +94,10 @@ public void write(int i) throws IOException { @Override public void flush() { - publishIfNonEmpty(flushToString()); + publishIfNonEmpty(flushBufferToString()); } - private synchronized String flushToString() { + private synchronized String flushBufferToString() { if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') { buffer.setLength(buffer.length() - 1); } @@ -167,7 +165,7 @@ public void write(byte[] a, int offset, int length) { int startLength = buffer.length(); buffer.append(decoded); if (flush || buffer.indexOf("\n", startLength) >= 0) { - msg = flushToString(); + msg = flushBufferToString(); } } } @@ -218,7 +216,7 @@ public void print(char[] a) { if (!flush) { return; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -232,7 +230,7 @@ public void print(String s) { if (!flush) { return; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -252,7 +250,7 @@ public void println(boolean b) { String msg; synchronized (this) { buffer.append(b); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -262,7 +260,7 @@ public void println(char c) { String msg; synchronized (this) { buffer.append(c); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -272,7 +270,7 @@ public void println(int i) { String msg; synchronized (this) { buffer.append(i); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -282,7 +280,7 @@ public void println(long l) { String msg; synchronized (this) { buffer.append(l); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -292,7 +290,7 @@ public void println(float f) { String msg; synchronized (this) { buffer.append(f); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -302,7 +300,7 @@ public void println(double d) { String msg; synchronized (this) { buffer.append(d); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -312,7 +310,7 @@ public void println(char[] a) { String msg; synchronized (this) { buffer.append(a); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -322,7 +320,7 @@ public void println(String s) { String msg; synchronized (this) { buffer.append(s); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -332,7 +330,7 @@ public void println(Object o) { String msg; synchronized (this) { buffer.append(o); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -352,7 +350,7 @@ public PrintStream format(Locale locale, String format, Object... args) { if (buffer.indexOf("\n", startLength) < 0) { return this; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); return this; @@ -374,17 +372,13 @@ public PrintStream append(CharSequence cs, int start, int limit) { if (!flush) { return this; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); return this; } - // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399. private void publishIfNonEmpty(String message) { - checkState( - !Thread.holdsLock(this), - "BEAM-9399: publish should not be called with the lock as it may cause deadlock"); if (message == null || message.isEmpty()) { return; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java index ae0396f02e59..eff5f3494791 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java @@ -133,6 +133,14 @@ public void testLogRawBytes() { assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg)); } + @Test + public void testLogThrowable() { + PrintStream printStream = createPrintStreamAdapter(); + Throwable t = new RuntimeException("Test error"); + t.printStackTrace(printStream); + assertThat(handler.getLogs(), hasLogItem("testLogThrowable")); + } + @Test public void testNoEmptyMessages() { try (PrintStream printStream = createPrintStreamAdapter()) {