From f2d3c73a7e75788153a30c7f7a9b532bd50c68c9 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 28 Mar 2017 17:46:17 +1100 Subject: [PATCH 1/6] WIP: logging: make flush wait for writes Updates 1795. This PR still isn't completely correct, since it does not force any RPC to immediately be issued. However, flush should now correctly wait for RPCs representing prior calls to publish to complete and any failures to be reported to ErrorManager before returning. This PR is WIP due to lack of tests. They are forth-coming, but all existing tests pass. Fun fact: I wrote this PR three times, using `synchronized`, `ReentrantLock`, then `Monitor` mirroring the Monitor class's documentation: http://google.github.io/guava/releases/21.0/api/docs/com/google/common/util/concurrent/Monitor.html --- .../google/cloud/logging/LoggingHandler.java | 78 +++++++++++++++---- 1 file changed, 62 insertions(+), 16 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index d82b68db74f3..d03b70b4e76c 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -18,15 +18,15 @@ import static com.google.common.base.MoreObjects.firstNonNull; +import com.google.api.gax.core.ApiFutureCallback; +import com.google.api.gax.core.ApiFutures; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.Logging.WriteOption; -import com.google.api.gax.core.ApiFutures; -import com.google.api.gax.core.ApiFutureCallback; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Monitor; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.logging.ErrorManager; import java.util.logging.Filter; @@ -120,6 +120,46 @@ public class LoggingHandler extends Handler { // https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 . private final Level baseLevel; + private final Monitor flushMonitor = new Monitor(true); + private int numPendingWrites = 0; + private int numFlushers = 0; + private final Monitor.Guard noFlushers = + new Monitor.Guard(flushMonitor) { + @Override + public boolean isSatisfied() { + return numFlushers == 0; + } + }; + private final Monitor.Guard writesCompleted = + new Monitor.Guard(flushMonitor) { + @Override + public boolean isSatisfied() { + return numPendingWrites == 0; + } + }; + private final ApiFutureCallback writeCallback = + new ApiFutureCallback() { + @Override + public void onSuccess(Void v) { + flushMonitor.enter(); + try { + numPendingWrites--; + } finally { + flushMonitor.leave(); + } + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof Exception) { + reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); + } else { + reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); + } + onSuccess(null); + } + }; + /** * Creates an handler that publishes messages to Stackdriver Logging. */ @@ -376,6 +416,9 @@ public void publish(LogRecord record) { if (entry != null) { write(entry, writeOptions); } + if (record.getLevel().intValue() >= flushLevel.intValue()) { + flush(); + } } finally { inPublishCall.remove(); } @@ -457,21 +500,16 @@ void write(LogEntry entry, WriteOption... options) { reportError(null, ex, ErrorManager.FLUSH_FAILURE); } break; + case ASYNC: default: - ApiFutures.addCallback(getLogging().writeAsync(entryList, options), new ApiFutureCallback() { - @Override - public void onSuccess(Void v) {} - - @Override - public void onFailure(Throwable t) { - if (t instanceof Exception) { - reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); - } else { - reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); - } - } - }); + flushMonitor.enterWhenUninterruptibly(noFlushers); + try { + numPendingWrites++; + ApiFutures.addCallback(getLogging().writeAsync(entryList, options), writeCallback); + } finally { + flushMonitor.leave(); + } break; } } @@ -479,6 +517,14 @@ public void onFailure(Throwable t) { @Override public void flush() { // BUG(1795): flush is broken, need support from batching implementation. + flushMonitor.enter(); + try { + numFlushers++; + flushMonitor.waitForUninterruptibly(writesCompleted); + numFlushers--; + } finally { + flushMonitor.leave(); + } } /** From af0cbaed132110fb514acc9caa3baf257608381d Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 29 Mar 2017 17:56:34 +1100 Subject: [PATCH 2/6] allow concurrent writes and flushes --- .../google/cloud/logging/LoggingHandler.java | 100 ++++++++---------- 1 file changed, 46 insertions(+), 54 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index d03b70b4e76c..8bb410f8fc24 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -18,16 +18,19 @@ import static com.google.common.base.MoreObjects.firstNonNull; +import com.google.api.gax.core.ApiFuture; import com.google.api.gax.core.ApiFutureCallback; import com.google.api.gax.core.ApiFutures; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.Logging.WriteOption; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Monitor; +import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.logging.ErrorManager; import java.util.logging.Filter; import java.util.logging.Formatter; @@ -120,45 +123,12 @@ public class LoggingHandler extends Handler { // https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 . private final Level baseLevel; - private final Monitor flushMonitor = new Monitor(true); - private int numPendingWrites = 0; - private int numFlushers = 0; - private final Monitor.Guard noFlushers = - new Monitor.Guard(flushMonitor) { - @Override - public boolean isSatisfied() { - return numFlushers == 0; - } - }; - private final Monitor.Guard writesCompleted = - new Monitor.Guard(flushMonitor) { - @Override - public boolean isSatisfied() { - return numPendingWrites == 0; - } - }; - private final ApiFutureCallback writeCallback = - new ApiFutureCallback() { - @Override - public void onSuccess(Void v) { - flushMonitor.enter(); - try { - numPendingWrites--; - } finally { - flushMonitor.leave(); - } - } - - @Override - public void onFailure(Throwable t) { - if (t instanceof Exception) { - reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); - } else { - reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); - } - onSuccess(null); - } - }; + // A map whose keys are pending write operations. The values of the map are meaningless, but the type is Boolean + // and not Void since the map implementation does not allow null values. + // Since the map has weak keys and we do not hold on to completed futures, + // completed futures are automatically GCed and removed from the map. + private final ConcurrentMap, Boolean> pendingWrites = + new MapMaker().weakKeys().makeMap(); /** * Creates an handler that publishes messages to Stackdriver Logging. @@ -503,13 +473,25 @@ void write(LogEntry entry, WriteOption... options) { case ASYNC: default: - flushMonitor.enterWhenUninterruptibly(noFlushers); - try { - numPendingWrites++; - ApiFutures.addCallback(getLogging().writeAsync(entryList, options), writeCallback); - } finally { - flushMonitor.leave(); - } + ApiFuture writeFuture = getLogging().writeAsync(entryList, options); + pendingWrites.put(writeFuture, Boolean.TRUE); + ApiFutures.addCallback( + writeFuture, + new ApiFutureCallback() { + @Override + public void onSuccess(Void v) { + // Nothing to do. + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof Exception) { + reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); + } else { + reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); + } + } + }); break; } } @@ -517,13 +499,23 @@ void write(LogEntry entry, WriteOption... options) { @Override public void flush() { // BUG(1795): flush is broken, need support from batching implementation. - flushMonitor.enter(); - try { - numFlushers++; - flushMonitor.waitForUninterruptibly(writesCompleted); - numFlushers--; - } finally { - flushMonitor.leave(); + + // Make a copy of currently-pending writes. + // As new writes are made, they might be reflected in the keySet iterator. + // If we naively iterate through keySet, waiting for each future, + // we might never finish. + ArrayList> writes = new ArrayList<>(pendingWrites.size()); + for (ApiFuture write : pendingWrites.keySet()) { + writes.add(write); + } + for (int i = 0; i < writes.size(); i++) { + ApiFuture write = writes.get(i); + try { + Uninterruptibles.getUninterruptibly(write); + } catch (Exception e) { + // Ignore exceptions, they are propagated to the error manager. + } + writes.set(i, null); } } From baf2d43e123cfc56b3633b616a5bf032f729dd33 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 29 Mar 2017 18:40:21 +1100 Subject: [PATCH 3/6] manual set remove --- .../google/cloud/logging/LoggingHandler.java | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index 8bb410f8fc24..c3fb338b0c68 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -25,12 +25,14 @@ import com.google.cloud.logging.Logging.WriteOption; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.ErrorManager; import java.util.logging.Filter; import java.util.logging.Formatter; @@ -123,12 +125,9 @@ public class LoggingHandler extends Handler { // https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 . private final Level baseLevel; - // A map whose keys are pending write operations. The values of the map are meaningless, but the type is Boolean - // and not Void since the map implementation does not allow null values. - // Since the map has weak keys and we do not hold on to completed futures, - // completed futures are automatically GCed and removed from the map. - private final ConcurrentMap, Boolean> pendingWrites = - new MapMaker().weakKeys().makeMap(); + private final Lock writeLock = new ReentrantLock(); + private final Set> pendingWrites = + Collections.newSetFromMap(new IdentityHashMap, Boolean>()); /** * Creates an handler that publishes messages to Stackdriver Logging. @@ -473,22 +472,36 @@ void write(LogEntry entry, WriteOption... options) { case ASYNC: default: - ApiFuture writeFuture = getLogging().writeAsync(entryList, options); - pendingWrites.put(writeFuture, Boolean.TRUE); + final ApiFuture writeFuture = getLogging().writeAsync(entryList, options); + writeLock.lock(); + try { + pendingWrites.add(writeFuture); + } finally { + writeLock.unlock(); + } ApiFutures.addCallback( writeFuture, new ApiFutureCallback() { @Override public void onSuccess(Void v) { - // Nothing to do. + writeLock.lock(); + try { + pendingWrites.remove(writeFuture); + } finally { + writeLock.unlock(); + } } @Override public void onFailure(Throwable t) { - if (t instanceof Exception) { - reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); - } else { - reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); + try { + if (t instanceof Exception) { + reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); + } else { + reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); + } + } finally { + onSuccess(null); } } }); @@ -500,22 +513,19 @@ public void onFailure(Throwable t) { public void flush() { // BUG(1795): flush is broken, need support from batching implementation. - // Make a copy of currently-pending writes. - // As new writes are made, they might be reflected in the keySet iterator. - // If we naively iterate through keySet, waiting for each future, - // we might never finish. - ArrayList> writes = new ArrayList<>(pendingWrites.size()); - for (ApiFuture write : pendingWrites.keySet()) { - writes.add(write); + ArrayList> writes = new ArrayList<>(); + writeLock.lock(); + try { + writes.addAll(pendingWrites); + } finally { + writeLock.unlock(); } - for (int i = 0; i < writes.size(); i++) { - ApiFuture write = writes.get(i); + for (ApiFuture write : writes) { try { Uninterruptibles.getUninterruptibly(write); } catch (Exception e) { // Ignore exceptions, they are propagated to the error manager. } - writes.set(i, null); } } From 5378b396cf1e3620e6c2dc2c6a18a11a79d40545 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 30 Mar 2017 14:18:26 +1100 Subject: [PATCH 4/6] pr comment --- .../google/cloud/logging/LoggingHandler.java | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index c3fb338b0c68..f2ef7d0841dd 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -125,7 +125,7 @@ public class LoggingHandler extends Handler { // https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 . private final Level baseLevel; - private final Lock writeLock = new ReentrantLock(); + private final Object writeLock = new Object(); private final Set> pendingWrites = Collections.newSetFromMap(new IdentityHashMap, Boolean>()); @@ -473,25 +473,23 @@ void write(LogEntry entry, WriteOption... options) { case ASYNC: default: final ApiFuture writeFuture = getLogging().writeAsync(entryList, options); - writeLock.lock(); - try { + synchronized(writeLock) { pendingWrites.add(writeFuture); - } finally { - writeLock.unlock(); } ApiFutures.addCallback( writeFuture, new ApiFutureCallback() { - @Override - public void onSuccess(Void v) { - writeLock.lock(); - try { + private void removeFromPending() { + synchronized(writeLock) { pendingWrites.remove(writeFuture); - } finally { - writeLock.unlock(); } } + @Override + public void onSuccess(Void v) { + removeFromPending(); + } + @Override public void onFailure(Throwable t) { try { @@ -501,7 +499,7 @@ public void onFailure(Throwable t) { reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); } } finally { - onSuccess(null); + removeFromPending(); } } }); @@ -511,16 +509,14 @@ public void onFailure(Throwable t) { @Override public void flush() { - // BUG(1795): flush is broken, need support from batching implementation. + // BUG(1795): We should force batcher to issue RPC call for buffered messages, + // so the code below doesn't wait uselessly. - ArrayList> writes = new ArrayList<>(); - writeLock.lock(); - try { - writes.addAll(pendingWrites); - } finally { - writeLock.unlock(); + ArrayList> writesToFlush = new ArrayList<>(); + synchronized(writeLock) { + writesToFlush.addAll(pendingWrites); } - for (ApiFuture write : writes) { + for (ApiFuture write : writesToFlush) { try { Uninterruptibles.getUninterruptibly(write); } catch (Exception e) { From 2b3a7c27b1f5350e4edfcb9411839dfb4d9041a3 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 30 Mar 2017 15:54:50 +1100 Subject: [PATCH 5/6] add test --- .../google/cloud/logging/LoggingHandler.java | 2 -- .../cloud/logging/LoggingHandlerTest.java | 36 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index f2ef7d0841dd..1474640b5bba 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -31,8 +31,6 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.logging.ErrorManager; import java.util.logging.Filter; import java.util.logging.Formatter; diff --git a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java index 7ac494013234..2d02627f86db 100644 --- a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java +++ b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java @@ -18,8 +18,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.SettableApiFuture; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.LogEntry.Builder; import com.google.cloud.logging.Logging.WriteOption; @@ -380,6 +382,40 @@ public void testFlushLevel() { handler.publish(newLogRecord(Level.WARNING, MESSAGE)); } + @Test + public void testFlush() throws InterruptedException { + final SettableApiFuture mockRpc = SettableApiFuture.create(); + + EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(options.getService()).andReturn(logging); + logging.writeAsync(ImmutableList.of(INFO_ENTRY), DEFAULT_OPTIONS); + EasyMock.expectLastCall().andReturn(mockRpc); + EasyMock.replay(options, logging); + final LoggingHandler handler = new LoggingHandler(LOG_NAME, options); + handler.setFormatter(new TestFormatter()); + + // no messages, nothing to flush. + handler.flush(); + + // send a message + handler.publish(newLogRecord(Level.INFO, MESSAGE)); + Thread flushWaiter = new Thread(new Runnable() { + @Override + public void run() { + handler.flush(); + } + }); + flushWaiter.start(); + + // flushWaiter should be waiting for mockRpc to complete. + flushWaiter.join(100); + assertTrue(flushWaiter.isAlive()); + + // With the RPC completed, flush should return, and the thread should terminate. + mockRpc.set(null); + flushWaiter.join(); + } + @Test public void testSyncWrite() { EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes(); From 91f3a3317c053cef12b9477bcb614640b8a6f0df Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 31 Mar 2017 14:41:14 +1100 Subject: [PATCH 6/6] pr comment --- .../java/com/google/cloud/logging/LoggingHandlerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java index 2d02627f86db..a9d31cbc726e 100644 --- a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java +++ b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import com.google.api.gax.core.ApiFutures; import com.google.api.gax.core.SettableApiFuture; @@ -408,12 +409,13 @@ public void run() { flushWaiter.start(); // flushWaiter should be waiting for mockRpc to complete. - flushWaiter.join(100); + flushWaiter.join(1000); assertTrue(flushWaiter.isAlive()); // With the RPC completed, flush should return, and the thread should terminate. mockRpc.set(null); - flushWaiter.join(); + flushWaiter.join(1000); + assertFalse(flushWaiter.isAlive()); } @Test