Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
import java.util.logging.ErrorManager;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
Expand Down Expand Up @@ -98,13 +99,45 @@ public class DataflowWorkerLoggingInitializer {
private static PrintStream originalStdErr = System.err;
private static boolean initialized = false;

// This is the same as ErrorManager except that it uses the provided
// print stream.
public static class PrintStreamErrorManager extends ErrorManager {
public PrintStreamErrorManager(PrintStream stream) {
this.stream = stream;
}

private PrintStream stream;
private boolean reported = false;

@Override
public synchronized void error(String msg, Exception ex, int code) {
if (reported) {
// We only report the first error, to avoid clogging
// the screen.
return;
}
reported = true;
String text = "java.util.logging.ErrorManager: " + code;
if (msg != null) {
text = text + ": " + msg;
}
stream.println(text);
if (ex != null) {
ex.printStackTrace(stream);
}
}
};

private static DataflowWorkerLoggingHandler makeLoggingHandler(
String filepathProperty, String defaultFilePath) throws IOException {
String filepath = System.getProperty(filepathProperty, defaultFilePath);
int filesizeMb = Integer.parseInt(System.getProperty(FILESIZE_MB_PROPERTY, "1024"));
DataflowWorkerLoggingHandler handler =
new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L);
handler.setLevel(Level.ALL);
// To avoid potential deadlock between the handler and the System.err print stream, use the
// original stderr print stream for errors. See BEAM-9399.
handler.setErrorManager(new PrintStreamErrorManager(getOriginalStdErr()));
return handler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.logging;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
Expand Down Expand Up @@ -96,10 +94,10 @@ public void write(int i) throws IOException {

@Override
public void flush() {
publishIfNonEmpty(flushToString());
publishIfNonEmpty(flushBufferToString());
}

private synchronized String flushToString() {
private synchronized String flushBufferToString() {
if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
buffer.setLength(buffer.length() - 1);
}
Expand Down Expand Up @@ -167,7 +165,7 @@ public void write(byte[] a, int offset, int length) {
int startLength = buffer.length();
buffer.append(decoded);
if (flush || buffer.indexOf("\n", startLength) >= 0) {
msg = flushToString();
msg = flushBufferToString();
}
}
}
Expand Down Expand Up @@ -218,7 +216,7 @@ public void print(char[] a) {
if (!flush) {
return;
}
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -232,7 +230,7 @@ public void print(String s) {
if (!flush) {
return;
}
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -252,7 +250,7 @@ public void println(boolean b) {
String msg;
synchronized (this) {
buffer.append(b);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -262,7 +260,7 @@ public void println(char c) {
String msg;
synchronized (this) {
buffer.append(c);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -272,7 +270,7 @@ public void println(int i) {
String msg;
synchronized (this) {
buffer.append(i);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -282,7 +280,7 @@ public void println(long l) {
String msg;
synchronized (this) {
buffer.append(l);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -292,7 +290,7 @@ public void println(float f) {
String msg;
synchronized (this) {
buffer.append(f);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -302,7 +300,7 @@ public void println(double d) {
String msg;
synchronized (this) {
buffer.append(d);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -312,7 +310,7 @@ public void println(char[] a) {
String msg;
synchronized (this) {
buffer.append(a);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -322,7 +320,7 @@ public void println(String s) {
String msg;
synchronized (this) {
buffer.append(s);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -332,7 +330,7 @@ public void println(Object o) {
String msg;
synchronized (this) {
buffer.append(o);
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
}
Expand All @@ -352,7 +350,7 @@ public PrintStream format(Locale locale, String format, Object... args) {
if (buffer.indexOf("\n", startLength) < 0) {
return this;
}
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
return this;
Expand All @@ -374,17 +372,13 @@ public PrintStream append(CharSequence cs, int start, int limit) {
if (!flush) {
return this;
}
msg = flushToString();
msg = flushBufferToString();
}
publishIfNonEmpty(msg);
return this;
}

// Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399.
private void publishIfNonEmpty(String message) {
checkState(
!Thread.holdsLock(this),
"BEAM-9399: publish should not be called with the lock as it may cause deadlock");
if (message == null || message.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public void testLogRawBytes() {
assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg));
}

@Test
public void testLogThrowable() {
PrintStream printStream = createPrintStreamAdapter();
Throwable t = new RuntimeException("Test error");
t.printStackTrace(printStream);
assertThat(handler.getLogs(), hasLogItem("testLogThrowable"));
}

@Test
public void testNoEmptyMessages() {
try (PrintStream printStream = createPrintStreamAdapter()) {
Expand Down