Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/cdc/CDCConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@

public class CDCConfig {
private static final int EVENT_BUFFER_SIZE = 50000;
private static final int MAX_ROW_KEY_SIZE = 10240;
private static final boolean READ_OLD_VALUE = true;

private int eventBufferSize = EVENT_BUFFER_SIZE;
private int maxRowKeySize = MAX_ROW_KEY_SIZE;
private boolean readOldValue = READ_OLD_VALUE;

public void setEventBufferSize(final int bufferSize) {
eventBufferSize = bufferSize;
}

public void setMaxRowKeySize(final int rowKeySize) {
maxRowKeySize = rowKeySize;
}

public void setReadOldValue(final boolean value) {
readOldValue = value;
}
Expand All @@ -21,6 +27,10 @@ public int getEventBufferSize() {
return eventBufferSize;
}

public int getMaxRowKeySize() {
return maxRowKeySize;
}

public boolean getReadOldValue() {
return readOldValue;
}
Expand Down
31 changes: 30 additions & 1 deletion src/main/java/org/tikv/cdc/RegionCDCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.FastByteComparisons;
import org.tikv.common.util.KeyRangeUtils;
import org.tikv.kvproto.Cdcpb.ChangeDataEvent;
import org.tikv.kvproto.Cdcpb.ChangeDataRequest;
import org.tikv.kvproto.Cdcpb.Event.LogType;
import org.tikv.kvproto.Cdcpb.Event.Row;
import org.tikv.kvproto.Cdcpb.Header;
import org.tikv.kvproto.Cdcpb.ResolvedTs;
import org.tikv.kvproto.ChangeDataGrpc;
Expand All @@ -34,6 +38,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver<ChangeDataEvent>
private final ChangeDataStub asyncStub;
private final Consumer<CDCEvent> eventConsumer;
private final CDCConfig config;
private final Predicate<Row> rowFilter;

private final AtomicBoolean running = new AtomicBoolean(false);

Expand All @@ -54,6 +59,24 @@ public RegionCDCClient(

this.regionKeyRange =
KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build();

this.rowFilter =
regionEnclosed()
? ((row) -> true)
: new Predicate<Row>() {
final byte[] buffer = new byte[config.getMaxRowKeySize()];

final byte[] start = keyRange.getStart().toByteArray();
final byte[] end = keyRange.getEnd().toByteArray();

@Override
public boolean test(final Row row) {
final int len = row.getKey().size();
row.getKey().copyTo(buffer, 0);
return (FastByteComparisons.compareTo(buffer, 0, len, start, 0, start.length) >= 0)
&& (FastByteComparisons.compareTo(buffer, 0, len, end, 0, end.length) < 0);
}
};
}

public synchronized void start(final long startTs) {
Expand Down Expand Up @@ -87,6 +110,11 @@ public KeyRange getRegionKeyRange() {
return regionKeyRange;
}

public boolean regionEnclosed() {
return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd())
.encloses(KeyRangeUtils.makeRange(regionKeyRange.getStart(), regionKeyRange.getEnd()));
}

public boolean isRunning() {
return running.get();
}
Expand Down Expand Up @@ -133,6 +161,7 @@ public void onNext(final ChangeDataEvent event) {
.stream()
.flatMap(ev -> ev.getEntries().getEntriesList().stream())
.filter(row -> ALLOWED_LOGTYPE.contains(row.getType()))
.filter(this.rowFilter)
.map(row -> CDCEvent.rowEvent(region.getId(), row))
.forEach(this::submitEvent);

Expand All @@ -149,7 +178,7 @@ public void onNext(final ChangeDataEvent event) {
}

private void submitEvent(final CDCEvent event) {
LOGGER.info("submit event: {}", event);
LOGGER.debug("submit event: {}", event);
eventConsumer.accept(event);
}
}