diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 69a0ac85663..a3e0086b69a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -487,6 +487,11 @@ private class ForceWriteThread extends BookieCriticalThread { // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; + + boolean shouldForceWrite = true; + int numReqInLastForceWrite = 0; + boolean forceWriteMarkerSent = false; + public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites, StatsLogger statsLogger) { @@ -508,90 +513,108 @@ public void run() { } } - boolean shouldForceWrite = true; - int numReqInLastForceWrite = 0; long busyStartTime = System.nanoTime(); - boolean forceWriteMarkerSent = false; + + List localRequests = new ArrayList<>(); + while (running) { - ForceWriteRequest req = null; try { - forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); - req = forceWriteRequests.take(); - busyStartTime = System.nanoTime(); - // Force write the file and then notify the write completions - // - if (!req.isMarker) { - if (shouldForceWrite) { - // if we are going to force write, any request that is already in the - // queue will benefit from this force write - post a marker prior to issuing - // the flush so until this marker is encountered we can skip the force write - if (enableGroupForceWrites) { - ForceWriteRequest marker = - createForceWriteRequest(req.logFile, 0, 0, null, false, true); - forceWriteMarkerSent = forceWriteRequests.offer(marker); - if (!forceWriteMarkerSent) { - marker.recycle(); - Counter failures = journalStats.getForceWriteGroupingFailures(); - failures.inc(); - LOG.error( - "Fail to send force write grouping marker," - + " Journal.forceWriteRequests queue(capacity {}) is full," - + " current failure counter is {}.", - conf.getJournalQueueSize(), failures.get()); - } - } - // If we are about to issue a write, record the number of requests in - // the last force write and then reset the counter so we can accumulate - // requests in the write we are about to issue - if (numReqInLastForceWrite > 0) { - journalStats.getForceWriteGroupingCountStats() - .registerSuccessfulValue(numReqInLastForceWrite); - numReqInLastForceWrite = 0; - } - } + int requestsCount = forceWriteRequests.drainTo(localRequests); + if (requestsCount == 0) { + ForceWriteRequest fwr = forceWriteRequests.take(); + localRequests.add(fwr); + requestsCount = 1; } - numReqInLastForceWrite += req.process(shouldForceWrite); - - if (enableGroupForceWrites - // if its a marker we should switch back to flushing - && !req.isMarker - // If group marker sending failed, we can't figure out which writes are - // grouped in this force write. So, abandon it even if other writes could - // be grouped. This should be extremely rare as, usually, queue size is - // large enough to accommodate high flush frequencies. - && forceWriteMarkerSent - // This indicates that this is the last request in a given file - // so subsequent requests will go to a different file so we should - // flush on the next request - && !req.shouldClose) { - shouldForceWrite = false; - } else { - shouldForceWrite = true; + + for (int i = 0; i < requestsCount; i++) { + forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); + processForceWriteRequest(localRequests.get(i)); + busyStartTime = System.nanoTime(); } - } catch (IOException ioe) { - LOG.error("I/O exception in ForceWrite thread", ioe); - running = false; } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.info("ForceWrite thread interrupted"); // close is idempotent - if (null != req) { + if (!localRequests.isEmpty()) { + ForceWriteRequest req = localRequests.get(localRequests.size() - 1); req.shouldClose = true; req.closeFileIfNecessary(); } running = false; - } finally { - if (req != null) { - req.recycle(); - } } + + localRequests.clear(); } // Regardless of what caused us to exit, we should notify the // the parent thread as it should either exit or be in the process // of exiting else we will have write requests hang threadToNotifyOnEx.interrupt(); } + + private void processForceWriteRequest(ForceWriteRequest req) { + try { + // Force write the file and then notify the write completions + // + if (!req.isMarker) { + if (shouldForceWrite) { + // if we are going to force write, any request that is already in the + // queue will benefit from this force write - post a marker prior to issuing + // the flush so until this marker is encountered we can skip the force write + if (enableGroupForceWrites) { + ForceWriteRequest marker = + createForceWriteRequest(req.logFile, 0, 0, null, false, true); + forceWriteMarkerSent = forceWriteRequests.offer(marker); + if (!forceWriteMarkerSent) { + marker.recycle(); + Counter failures = journalStats.getForceWriteGroupingFailures(); + failures.inc(); + LOG.error( + "Fail to send force write grouping marker," + + " Journal.forceWriteRequests queue(capacity {}) is full," + + " current failure counter is {}.", + conf.getJournalQueueSize(), failures.get()); + } + } + + // If we are about to issue a write, record the number of requests in + // the last force write and then reset the counter so we can accumulate + // requests in the write we are about to issue + if (numReqInLastForceWrite > 0) { + journalStats.getForceWriteGroupingCountStats() + .registerSuccessfulValue(numReqInLastForceWrite); + numReqInLastForceWrite = 0; + } + } + } + numReqInLastForceWrite += req.process(shouldForceWrite); + + if (enableGroupForceWrites + // if its a marker we should switch back to flushing + && !req.isMarker + // If group marker sending failed, we can't figure out which writes are + // grouped in this force write. So, abandon it even if other writes could + // be grouped. This should be extremely rare as, usually, queue size is + // large enough to accommodate high flush frequencies. + && forceWriteMarkerSent + // This indicates that this is the last request in a given file + // so subsequent requests will go to a different file so we should + // flush on the next request + && !req.shouldClose) { + shouldForceWrite = false; + } else { + shouldForceWrite = true; + } + } catch (IOException ioe) { + LOG.error("I/O exception in ForceWrite thread", ioe); + running = false; + } finally { + if (req != null) { + req.recycle(); + } + } + } + // shutdown sync thread void shutdown() throws InterruptedException { running = false;