Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,11 @@
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;

@Slf4j
class CRC32CDigestManager extends DigestManager {

private static final FastThreadLocal<MutableInt> currentCrc = new FastThreadLocal<MutableInt>() {
@Override
protected MutableInt initialValue() throws Exception {
return new MutableInt(0);
}
};

public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
super(ledgerId, useV2Protocol, allocator);
}
Expand All @@ -45,16 +36,17 @@ int getMacCodeLength() {
}

@Override
void populateValueAndReset(ByteBuf buf) {
MutableInt current = currentCrc.get();
buf.writeInt(current.intValue());
current.setValue(0);
boolean isInt32Digest() {
return true;
}

@Override
void populateValueAndReset(int digest, ByteBuf buf) {
buf.writeInt(digest);
}

@Override
void update(ByteBuf data, int offset, int len) {
MutableInt current = currentCrc.get();
final int lastCrc = current.intValue();
current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data, offset, len));
int update(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ int getMacCodeLength() {
}

@Override
void populateValueAndReset(ByteBuf buf) {
void populateValueAndReset(int digest, ByteBuf buf) {
buf.writeLong(crc.get().getValueAndReset());
}

@Override
void update(ByteBuf data, int offset, int len) {
int update(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ public abstract class DigestManager {

abstract int getMacCodeLength();

void update(byte[] data) {
update(Unpooled.wrappedBuffer(data), 0, data.length);
}
abstract int update(int digest, ByteBuf buffer, int offset, int len);

abstract void update(ByteBuf buffer, int offset, int len);
abstract void populateValueAndReset(int digest, ByteBuf buffer);

abstract void populateValueAndReset(ByteBuf buffer);
abstract boolean isInt32Digest();

final int macCodeLength;

Expand Down Expand Up @@ -112,7 +110,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
headersBuffer.writeLong(lastAddConfirmed);
headersBuffer.writeLong(length);

update(headersBuffer, 0, METADATA_LENGTH);
int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
Expand All @@ -121,11 +119,15 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> update(b, b.readerIndex(), b.readableBytes()));
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
populateValueAndReset(headersBuffer);
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
}
Expand All @@ -147,8 +149,8 @@ public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(lac);

update(headersBuffer, 0, LAC_METADATA_LENGTH);
populateValueAndReset(headersBuffer);
int digest = update(0, headersBuffer, 0, LAC_METADATA_LENGTH);
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer);
}
Expand Down Expand Up @@ -183,18 +185,26 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI
this.getClass().getName(), dataReceived.readableBytes());
throw new BKDigestMatchException();
}
update(dataReceived, 0, METADATA_LENGTH);
int digest = update(0, dataReceived, 0, METADATA_LENGTH);

int offset = METADATA_LENGTH + macCodeLength;
update(dataReceived, offset, dataReceived.readableBytes() - offset);

ByteBuf digest = DIGEST_BUFFER.get();
digest.clear();
populateValueAndReset(digest);

if (!ByteBufUtil.equals(digest, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
digest = update(digest, dataReceived, offset, dataReceived.readableBytes() - offset);

if (isInt32Digest()) {
int receivedDigest = dataReceived.getInt(METADATA_LENGTH);
if (receivedDigest != digest) {
logger.error("Digest mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
}
} else {
ByteBuf digestBuf = DIGEST_BUFFER.get();
digestBuf.clear();
populateValueAndReset(digest, digestBuf);

if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
}
}

long actualLedgerId = dataReceived.readLong();
Expand Down Expand Up @@ -223,17 +233,25 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE
throw new BKDigestMatchException();
}

update(dataReceived, 0, LAC_METADATA_LENGTH);
int digest = update(0, dataReceived, 0, LAC_METADATA_LENGTH);

ByteBuf digest = DIGEST_BUFFER.get();
digest.clear();

populateValueAndReset(digest);

if (!ByteBufUtil.equals(digest, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
if (isInt32Digest()) {
int receivedDigest = dataReceived.getInt(LAC_METADATA_LENGTH);
if (receivedDigest != digest) {
logger.error("Digest mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
}
} else {
ByteBuf digestBuf = DIGEST_BUFFER.get();
digestBuf.clear();
populateValueAndReset(digest, digestBuf);

if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
}
}

long actualLedgerId = dataReceived.readLong();
long lac = dataReceived.readLong();
if (actualLedgerId != ledgerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,15 @@ int getMacCodeLength() {
}

@Override
void update(ByteBuf buffer, int offset, int len) {}
int update(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
void populateValueAndReset(ByteBuf buffer) {}
void populateValueAndReset(int digest, ByteBuf buffer) {}

@Override
boolean isInt32Digest() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ int getMacCodeLength() {


@Override
void populateValueAndReset(ByteBuf buffer) {
void populateValueAndReset(int digest, ByteBuf buffer) {
buffer.writeBytes(mac.get().doFinal());
}

@Override
void update(ByteBuf data, int offset, int len) {
int update(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}


@Override
boolean isInt32Digest() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public DigestManager getDigestManager(Digest digest) {
public void digestManager(MyState state) {
final ByteBuf buff = state.getByteBuff(state.bufferType);
final DigestManager dm = state.getDigestManager(state.digest);
dm.update(buff, 0, buff.readableBytes());
int digest = dm.update(0, buff, 0, buff.readableBytes());
state.digestBuf.clear();
dm.populateValueAndReset(state.digestBuf);
dm.populateValueAndReset(digest, state.digestBuf);
}

}