Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ private class ForceWriteThread extends BookieCriticalThread {
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;

boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
boolean forceWriteMarkerSent = false;

public ForceWriteThread(Thread threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
Expand All @@ -508,90 +513,108 @@ public void run() {
}
}

boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
long busyStartTime = System.nanoTime();
boolean forceWriteMarkerSent = false;

List<ForceWriteRequest> localRequests = new ArrayList<>();

while (running) {
ForceWriteRequest req = null;
try {
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
req = forceWriteRequests.take();
busyStartTime = System.nanoTime();
// Force write the file and then notify the write completions
//
if (!req.isMarker) {
if (shouldForceWrite) {
// if we are going to force write, any request that is already in the
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
ForceWriteRequest marker =
createForceWriteRequest(req.logFile, 0, 0, null, false, true);
forceWriteMarkerSent = forceWriteRequests.offer(marker);
if (!forceWriteMarkerSent) {
marker.recycle();
Counter failures = journalStats.getForceWriteGroupingFailures();
failures.inc();
LOG.error(
"Fail to send force write grouping marker,"
+ " Journal.forceWriteRequests queue(capacity {}) is full,"
+ " current failure counter is {}.",
conf.getJournalQueueSize(), failures.get());
}
}

// If we are about to issue a write, record the number of requests in
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
int requestsCount = forceWriteRequests.drainTo(localRequests);
if (requestsCount == 0) {
ForceWriteRequest fwr = forceWriteRequests.take();
localRequests.add(fwr);
requestsCount = 1;
}
numReqInLastForceWrite += req.process(shouldForceWrite);

if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// If group marker sending failed, we can't figure out which writes are
// grouped in this force write. So, abandon it even if other writes could
// be grouped. This should be extremely rare as, usually, queue size is
// large enough to accommodate high flush frequencies.
&& forceWriteMarkerSent
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
&& !req.shouldClose) {
shouldForceWrite = false;
} else {
shouldForceWrite = true;

for (int i = 0; i < requestsCount; i++) {
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
processForceWriteRequest(localRequests.get(i));
busyStartTime = System.nanoTime();
}
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("ForceWrite thread interrupted");
// close is idempotent
if (null != req) {
if (!localRequests.isEmpty()) {
ForceWriteRequest req = localRequests.get(localRequests.size() - 1);
req.shouldClose = true;
req.closeFileIfNecessary();
}
running = false;
} finally {
if (req != null) {
req.recycle();
}
}

localRequests.clear();
}
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
threadToNotifyOnEx.interrupt();
}

private void processForceWriteRequest(ForceWriteRequest req) {
try {
// Force write the file and then notify the write completions
//
if (!req.isMarker) {
if (shouldForceWrite) {
// if we are going to force write, any request that is already in the
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
ForceWriteRequest marker =
createForceWriteRequest(req.logFile, 0, 0, null, false, true);
forceWriteMarkerSent = forceWriteRequests.offer(marker);
if (!forceWriteMarkerSent) {
marker.recycle();
Counter failures = journalStats.getForceWriteGroupingFailures();
failures.inc();
LOG.error(
"Fail to send force write grouping marker,"
+ " Journal.forceWriteRequests queue(capacity {}) is full,"
+ " current failure counter is {}.",
conf.getJournalQueueSize(), failures.get());
}
}

// If we are about to issue a write, record the number of requests in
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
}
numReqInLastForceWrite += req.process(shouldForceWrite);

if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// If group marker sending failed, we can't figure out which writes are
// grouped in this force write. So, abandon it even if other writes could
// be grouped. This should be extremely rare as, usually, queue size is
// large enough to accommodate high flush frequencies.
&& forceWriteMarkerSent
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
&& !req.shouldClose) {
shouldForceWrite = false;
} else {
shouldForceWrite = true;
}
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} finally {
if (req != null) {
req.recycle();
}
}
}

// shutdown sync thread
void shutdown() throws InterruptedException {
running = false;
Expand Down