Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
if (r.hasMasterKey()) {
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
}

r.recycle();
return buf;
} else if (r instanceof BookieProtocol.AuthRequest) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
Expand Down Expand Up @@ -193,9 +193,9 @@ public Object decode(ByteBuf packet)
if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING
&& version >= 2) {
byte[] masterKey = readMasterKey(packet);
return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey);
return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey);
} else {
return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null);
return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null);
}
case BookieProtocol.AUTH:
BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,43 @@ public void recycle() {
* A Request that reads data.
*/
class ReadRequest extends Request {
ReadRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);

static ReadRequest create(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
ReadRequest read = RECYCLER.get();
read.protocolVersion = protocolVersion;
read.opCode = READENTRY;
read.ledgerId = ledgerId;
read.entryId = entryId;
read.flags = flags;
read.masterKey = masterKey;
return read;
}

boolean isFencing() {
return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING;
}

private final Handle<ReadRequest> recyclerHandle;

private ReadRequest(Handle<ReadRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<ReadRequest> RECYCLER = new Recycler<ReadRequest>() {
@Override
protected ReadRequest newObject(Handle<ReadRequest> handle) {
return new ReadRequest(handle);
}
};

@Override
public void recycle() {
ledgerId = -1;
entryId = -1;
masterKey = null;
recyclerHandle.recycle(this);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0, null);
completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
Expand Down Expand Up @@ -933,7 +933,7 @@ private void readEntryInternal(final long ledgerId,
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) flags, masterKey);
completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
Expand Down Expand Up @@ -1168,7 +1168,6 @@ private void writeAndFlush(final Channel channel,
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});

channel.writeAndFlush(request, promise);
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public String toString() {
}

private void recycle() {
request.recycle();
super.reset();
this.recyclerHandle.recycle(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void testAsynchronousRequest(boolean result, int errorCode) throws Excep

ExecutorService service = Executors.newCachedThreadPool();
long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, service, true);
Expand Down Expand Up @@ -150,7 +150,7 @@ private void testSynchronousRequest(boolean result, int errorCode) throws Except
}).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
fenceResult.complete(result);
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testNonFenceRequest() throws Exception {
}).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, (short) 0, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
processor.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ public void testAuthFail() throws Exception {

}

client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
1L, 1L, (short) 0, null));
ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
1L, 1L, (short) 0, null);
client.sendRequest(read);
Response response = client.takeResponse();
assertEquals("Should have failed",
response.getErrorCode(), BookieProtocol.EUA);
Expand Down