diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java index 56688470dc79..ebef8ea707cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java @@ -29,6 +29,7 @@ import java.io.PrintStream; import java.util.List; import java.util.Map; +import java.util.logging.ErrorManager; import java.util.logging.Level; import java.util.logging.LogManager; import java.util.logging.Logger; @@ -98,6 +99,35 @@ public class DataflowWorkerLoggingInitializer { private static PrintStream originalStdErr = System.err; private static boolean initialized = false; + // This is the same as ErrorManager except that it uses the provided + // print stream. + public static class PrintStreamErrorManager extends ErrorManager { + public PrintStreamErrorManager(PrintStream stream) { + this.stream = stream; + } + + private PrintStream stream; + private boolean reported = false; + + @Override + public synchronized void error(String msg, Exception ex, int code) { + if (reported) { + // We only report the first error, to avoid clogging + // the screen. + return; + } + reported = true; + String text = "java.util.logging.ErrorManager: " + code; + if (msg != null) { + text = text + ": " + msg; + } + stream.println(text); + if (ex != null) { + ex.printStackTrace(stream); + } + } + }; + private static DataflowWorkerLoggingHandler makeLoggingHandler( String filepathProperty, String defaultFilePath) throws IOException { String filepath = System.getProperty(filepathProperty, defaultFilePath); @@ -105,6 +135,9 @@ private static DataflowWorkerLoggingHandler makeLoggingHandler( DataflowWorkerLoggingHandler handler = new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L); handler.setLevel(Level.ALL); + // To avoid potential deadlock between the handler and the System.err print stream, use the + // original stderr print stream for errors. See BEAM-9399. + handler.setErrorManager(new PrintStreamErrorManager(getOriginalStdErr())); return handler; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java index 08a995d8275a..79ef587a89c6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.logging; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -96,10 +94,10 @@ public void write(int i) throws IOException { @Override public void flush() { - publishIfNonEmpty(flushToString()); + publishIfNonEmpty(flushBufferToString()); } - private synchronized String flushToString() { + private synchronized String flushBufferToString() { if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') { buffer.setLength(buffer.length() - 1); } @@ -167,7 +165,7 @@ public void write(byte[] a, int offset, int length) { int startLength = buffer.length(); buffer.append(decoded); if (flush || buffer.indexOf("\n", startLength) >= 0) { - msg = flushToString(); + msg = flushBufferToString(); } } } @@ -218,7 +216,7 @@ public void print(char[] a) { if (!flush) { return; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -232,7 +230,7 @@ public void print(String s) { if (!flush) { return; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -252,7 +250,7 @@ public void println(boolean b) { String msg; synchronized (this) { buffer.append(b); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -262,7 +260,7 @@ public void println(char c) { String msg; synchronized (this) { buffer.append(c); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -272,7 +270,7 @@ public void println(int i) { String msg; synchronized (this) { buffer.append(i); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -282,7 +280,7 @@ public void println(long l) { String msg; synchronized (this) { buffer.append(l); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -292,7 +290,7 @@ public void println(float f) { String msg; synchronized (this) { buffer.append(f); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -302,7 +300,7 @@ public void println(double d) { String msg; synchronized (this) { buffer.append(d); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -312,7 +310,7 @@ public void println(char[] a) { String msg; synchronized (this) { buffer.append(a); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -322,7 +320,7 @@ public void println(String s) { String msg; synchronized (this) { buffer.append(s); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -332,7 +330,7 @@ public void println(Object o) { String msg; synchronized (this) { buffer.append(o); - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); } @@ -352,7 +350,7 @@ public PrintStream format(Locale locale, String format, Object... args) { if (buffer.indexOf("\n", startLength) < 0) { return this; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); return this; @@ -374,17 +372,13 @@ public PrintStream append(CharSequence cs, int start, int limit) { if (!flush) { return this; } - msg = flushToString(); + msg = flushBufferToString(); } publishIfNonEmpty(msg); return this; } - // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399. private void publishIfNonEmpty(String message) { - checkState( - !Thread.holdsLock(this), - "BEAM-9399: publish should not be called with the lock as it may cause deadlock"); if (message == null || message.isEmpty()) { return; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java index ae0396f02e59..eff5f3494791 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java @@ -133,6 +133,14 @@ public void testLogRawBytes() { assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg)); } + @Test + public void testLogThrowable() { + PrintStream printStream = createPrintStreamAdapter(); + Throwable t = new RuntimeException("Test error"); + t.printStackTrace(printStream); + assertThat(handler.getLogs(), hasLogItem("testLogThrowable")); + } + @Test public void testNoEmptyMessages() { try (PrintStream printStream = createPrintStreamAdapter()) {