From 072ddc01bca5f97541957634355a2e3bc367d120 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 27 Feb 2020 17:28:29 -0800 Subject: [PATCH 1/2] [BEAM-9399] Change the redirection of System.err to be a custom PrintStream instead of a custom output stream wrapped by standard PrintStream. --- .../JulHandlerPrintStreamAdapterFactory.java | 401 ++++++++++++++---- .../dataflow/worker/LogRecordMatcher.java | 9 + ...lHandlerPrintStreamAdapterFactoryTest.java | 16 + 3 files changed, 354 insertions(+), 72 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java index 2c5136c15153..1eb935aa6daa 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java @@ -17,12 +17,20 @@ */ package org.apache.beam.runners.dataflow.worker.logging; -import java.io.ByteArrayOutputStream; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderMalfunctionError; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.Formatter; +import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Handler; import java.util.logging.Level; @@ -37,114 +45,363 @@ class JulHandlerPrintStreamAdapterFactory { private static final AtomicBoolean outputWarning = new AtomicBoolean(false); - /** - * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the - * specified {@code loggerName} and {@code level}. - */ - static PrintStream create(Handler handler, String loggerName, Level messageLevel) { - try { - return new PrintStream( - new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel), - false, - StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - /** - * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL - * log handler. The log messages will be buffered until the system dependent new line separator is - * seen, at which point the buffered string will be output. - */ - private static class JulHandlerAdapterOutputStream extends OutputStream { + private static class JulHandlerPrintStream extends PrintStream { private static final String LOGGING_DISCLAIMER = String.format( "Please use a logger instead of System.out or System.err.%n" + "Please switch to using org.slf4j.Logger.%n" + "See: https://cloud.google.com/dataflow/pipelines/logging"); - // This limits the number of bytes which we buffer in case we don't see a newline character. - private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes - private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8); + // This limits the number of bytes which we buffer in case we don't have a flush. + private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars /** Hold reference of named logger to check configured {@link Level}. */ - private Logger logger; + private final Logger logger; - private Handler handler; - private String loggerName; - private ByteArrayOutputStream baos; - private Level messageLevel; - private int matched = 0; + private final Handler handler; + private final String loggerName; + private final StringBuilder buffer; + private final Level messageLevel; + private final CharsetDecoder decoder; + private final CharBuffer decoded; + private int carryOverBytes; + private byte[] carryOverByteArray; - private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) { + private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) { + super( + new OutputStream() { + @Override + public void write(int i) throws IOException { + throw new RuntimeException("All methods should be overwritten so this is unused"); + } + }); this.handler = handler; this.loggerName = loggerName; this.messageLevel = logLevel; this.logger = Logger.getLogger(loggerName); - this.baos = new ByteArrayOutputStream(BUFFER_LIMIT); + this.buffer = new StringBuilder(); + this.decoder = + Charset.defaultCharset() + .newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + this.carryOverByteArray = new byte[6]; + this.carryOverBytes = 0; + this.decoded = CharBuffer.allocate(BUFFER_LIMIT); + } + + @Override + public void flush() { + publish(flushToString()); + } + + private synchronized String flushToString() { + if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') { + buffer.setLength(buffer.length() - 1); + } + String result = buffer.toString(); + buffer.setLength(0); + return result; + } + + @Override + public void close() { + flush(); + } + + @Override + public boolean checkError() { + return false; } @Override - public void write(int b) { - if (outputWarning.compareAndSet(false, true)) { - publish(Level.WARNING, LOGGING_DISCLAIMER); + public synchronized void write(int i) { + buffer.append(i); + } + + @Override + public void write(byte[] a, int offset, int length) { + ByteBuffer incoming = ByteBuffer.wrap(a, offset, length); + // Consume the added bytes, flushing on decoded newlines or if we hit + // the buffer limit. + String msg = null; + synchronized (decoder) { + decoded.clear(); + boolean flush = false; + try { + // Process any remaining bytes from last time by adding a byte at a time. + while (carryOverBytes > 0 && incoming.hasRemaining()) { + carryOverByteArray[carryOverBytes++] = incoming.get(); + ByteBuffer wrapped = ByteBuffer.wrap(carryOverByteArray, 0, carryOverBytes); + decoder.decode(wrapped, decoded, false); + if (!wrapped.hasRemaining()) { + carryOverBytes = 0; + } + } + carryOverBytes = 0; + if (incoming.hasRemaining()) { + CoderResult result = decoder.decode(incoming, decoded, false); + if (result.isOverflow()) { + flush = true; + } + // Keep the unread bytes. + assert (incoming.remaining() <= carryOverByteArray.length); + while (incoming.hasRemaining()) { + carryOverByteArray[carryOverBytes++] = incoming.get(); + } + } + } catch (CoderMalfunctionError error) { + decoder.reset(); + carryOverBytes = 0; + error.printStackTrace(); + } + decoded.flip(); + synchronized (this) { + int startLength = buffer.length(); + buffer.append(decoded); + if (flush || buffer.indexOf("\n", startLength) >= 0) { + msg = flushToString(); + } + } } + if (msg != null) { + publish(msg); + } + } + + @Override + public synchronized void print(boolean b) { + buffer.append(b ? "true" : "false"); + } + + @Override + public synchronized void print(char c) { + buffer.append(c); + } + + @Override + public synchronized void print(int i) { + buffer.append(i); + } + + @Override + public synchronized void print(long l) { + buffer.append(l); + } + + @Override + public synchronized void print(float f) { + buffer.append(f); + } + + @Override + public synchronized void print(double d) { + buffer.append(d); + } + + @Override + public void print(char[] a) { + boolean flush = false; + for (char c : a) { + if (c == '\n') { + flush = true; + } + } + String msg; + synchronized (this) { + buffer.append(a); + if (!flush) { + return; + } + msg = flushToString(); + } + publish(msg); + } - baos.write(b); - // Check to see if the next byte matches further into new line string. - if (NEW_LINE[matched] == b) { - matched += 1; - // If we have matched the entire new line, output the contents of the buffer. - if (matched == NEW_LINE.length) { - output(); + @Override + public void print(String s) { + boolean flush = s.indexOf('\n') >= 0; + String msg; + synchronized (this) { + buffer.append(s); + if (!flush) { + return; } - } else { - // Reset the match - matched = 0; + msg = flushToString(); + } + publish(msg); + } + + @Override + public void print(Object o) { + print(o.toString()); + } + + @Override + public void println() { + flush(); + } + + @Override + public void println(boolean b) { + String msg; + synchronized (this) { + buffer.append(b); + msg = flushToString(); + } + publish(msg); + } + + @Override + public void println(char c) { + String msg; + synchronized (this) { + buffer.append(c); + msg = flushToString(); } - if (baos.size() == BUFFER_LIMIT) { - output(); + publish(msg); + } + + @Override + public void println(int i) { + String msg; + synchronized (this) { + buffer.append(i); + msg = flushToString(); + } + publish(msg); + } + + @Override + public void println(long l) { + String msg; + synchronized (this) { + buffer.append(l); + msg = flushToString(); } + publish(msg); } @Override - public void flush() throws IOException { - output(); + public void println(float f) { + String msg; + synchronized (this) { + buffer.append(f); + msg = flushToString(); + } + publish(msg); } @Override - public void close() throws IOException { - output(); + public void println(double d) { + String msg; + synchronized (this) { + buffer.append(d); + msg = flushToString(); + } + publish(msg); } - private void output() { - // If nothing was output, do not log anything - if (baos.size() == 0) { - return; + @Override + public void println(char[] a) { + String msg; + synchronized (this) { + buffer.append(a); + msg = flushToString(); } - try { - String message = baos.toString(StandardCharsets.UTF_8.name()); - // Strip the new line if it exists - if (message.endsWith(System.lineSeparator())) { - message = message.substring(0, message.length() - System.lineSeparator().length()); + publish(msg); + } + + @Override + public void println(String s) { + String msg; + synchronized (this) { + buffer.append(s); + msg = flushToString(); + } + publish(msg); + } + + @Override + public void println(Object o) { + String msg; + synchronized (this) { + buffer.append(o); + msg = flushToString(); + } + publish(msg); + } + + @Override + public PrintStream format(String format, Object... args) { + return format(Locale.getDefault(), format, args); + } + + @Override + public PrintStream format(Locale locale, String format, Object... args) { + String msg; + synchronized (this) { + int startLength = buffer.length(); + Formatter formatter = new Formatter(buffer, locale); + formatter.format(format, args); + if (buffer.indexOf("\n", startLength) < 0) { + return this; } + msg = flushToString(); + } + publish(msg); + return this; + } - publish(messageLevel, message); - } catch (UnsupportedEncodingException e) { - publish( - Level.SEVERE, String.format("Unable to decode string output to stdout/stderr %s", e)); + @Override + public PrintStream append(CharSequence cs, int start, int limit) { + CharSequence subsequence = cs.subSequence(start, limit); + boolean flush = false; + for (int i = 0; i < subsequence.length(); ++i) { + if (subsequence.charAt(i) == '\n') { + flush = true; + break; + } + } + String msg; + synchronized (this) { + buffer.append(cs.subSequence(start, limit)); + if (!flush) { + return this; + } + msg = flushToString(); } - matched = 0; - baos.reset(); + publish(msg); + return this; } - private void publish(Level level, String message) { - if (logger.isLoggable(level)) { - LogRecord log = new LogRecord(level, message); + // Note to avoid a deadlock, publish may never be called synchronized. See BEAM-9399. + private void publish(Level messageLevel, String message) { + checkState( + !Thread.holdsLock(this), + "BEAM-9399: publish should not be called with the lock as it may cause deadlock"); + if (logger.isLoggable(messageLevel)) { + if (outputWarning.compareAndSet(false, true)) { + LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER); + log.setLoggerName(loggerName); + handler.publish(log); + } + LogRecord log = new LogRecord(messageLevel, message); log.setLoggerName(loggerName); handler.publish(log); } } + + private void publish(String message) { + publish(messageLevel, message); + } + } + + /** + * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the + * specified {@code loggerName} and {@code level}. + */ + static PrintStream create(Handler handler, String loggerName, Level messageLevel) { + return new JulHandlerPrintStream(handler, loggerName, messageLevel); } @VisibleForTesting diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LogRecordMatcher.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LogRecordMatcher.java index 2145374606da..00b09aae7d7b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LogRecordMatcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/LogRecordMatcher.java @@ -74,6 +74,15 @@ public void describeTo(Description description) { description.appendText(String.format(" and message containing <%s>", substring)); } + @Override + public void describeMismatchSafely(LogRecord item, Description description) { + description + .appendText("was log with message \"") + .appendText(item.getMessage()) + .appendText("\" at severity ") + .appendValue(item.getLevel()); + } + @Override protected boolean matchesSafely(LogRecord item) { return levelMatcher.matches(item.getLevel()) && item.getMessage().contains(substring); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java index 65f5128f8bd4..0bdd72cb47e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java @@ -115,6 +115,22 @@ public void testLogOnClose() { assertThat(handler.getLogs(), hasLogItem("blah")); } + @Test + public void testLogRawBytes() { + PrintStream printStream = createPrintStreamAdapter(); + String msg = "♠ ♡ ♢ ♣ ♤ ♥ ♦ ♧"; + byte[] bytes = msg.getBytes(Charset.defaultCharset()); + printStream.write(bytes, 0, 1); + printStream.write(bytes, 1, 4); + printStream.write(bytes, 5, 15); + printStream.write(bytes, 20, bytes.length - 20); + assertThat(handler.getLogs(), is(empty())); + String newlineMsg = "♠ ♡ \n♦ ♧"; + byte[] newlineMsgBytes = newlineMsg.getBytes(Charset.defaultCharset()); + printStream.write(newlineMsgBytes, 0, newlineMsgBytes.length); + assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg)); + } + private PrintStream createPrintStreamAdapter() { return JulHandlerPrintStreamAdapterFactory.create(handler, LOGGER_NAME, Level.INFO); } From e2e8b62ea57af30daa279cd1c42409eb437a3900 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 31 Mar 2020 10:02:24 -0700 Subject: [PATCH 2/2] Fix missing test import --- .../worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java index 0bdd72cb47e7..a93c26b468a6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import java.io.PrintStream; +import java.nio.charset.Charset; import java.util.logging.Level; import java.util.logging.LogRecord; import org.apache.beam.runners.dataflow.worker.LogSaver;