From b8e3426b788e5e7a0f8764bb690bd42dd26e2d43 Mon Sep 17 00:00:00 2001 From: chase Date: Wed, 19 May 2021 22:18:31 +0900 Subject: [PATCH 1/5] Add CDCClient and update TableCodec Signed-off-by: chase --- pom.xml | 6 +- src/main/java/org/tikv/cdc/CDCClient.java | 205 ++++++++++++++++++ src/main/java/org/tikv/cdc/CDCEvent.java | 65 ++++++ .../java/org/tikv/cdc/RegionCDCClient.java | 149 +++++++++++++ .../org/tikv/common/codec/TableCodec.java | 10 + .../org/tikv/common/codec/TableCodecV1.java | 7 +- .../org/tikv/common/codec/TableCodecV2.java | 8 +- 7 files changed, 443 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/tikv/cdc/CDCClient.java create mode 100644 src/main/java/org/tikv/cdc/CDCEvent.java create mode 100644 src/main/java/org/tikv/cdc/RegionCDCClient.java diff --git a/pom.xml b/pom.xml index 4e4cff5be70..1e0b84e660c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.tikv tikv-client-java - 3.1.0-SNAPSHOT + 3.2.0-SNAPSHOT jar TiKV Java Client A Java Client for TiKV @@ -337,8 +337,8 @@ maven-compiler-plugin 3.7.0 - 1.8 - 1.8 + 9 + 9 UTF-8 true true diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java new file mode 100644 index 00000000000..42aef4e2eff --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCClient.java @@ -0,0 +1,205 @@ +package org.tikv.cdc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.TreeMultiset; +import io.grpc.ManagedChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.RangeSplitter; +import org.tikv.common.util.RangeSplitter.RegionTask; +import org.tikv.kvproto.Cdcpb.Event.Row; +import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.kvproto.Kvrpcpb; + +public class CDCClient implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class); + private static final int EVENT_BUFFER_SIZE = 10000; + + private final TiSession session; + private final KeyRange keyRange; + + private final BlockingQueue eventsBuffer = new ArrayBlockingQueue<>(EVENT_BUFFER_SIZE); + private final TreeMap regionClients = new TreeMap<>(); + private final Map regionToResolvedTs = new HashMap<>(); + private final TreeMultiset resolvedTsSet = TreeMultiset.create(); + + private boolean started = false; + + public CDCClient(final TiSession session, final KeyRange keyRange) { + Preconditions.checkState( + session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), + "Unsupported Isolation Level"); // only support SI for now + this.session = session; + this.keyRange = keyRange; + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "Client is already started"); + try { + applyKeyRange(keyRange, startTs); + } catch (final Throwable e) { + LOGGER.error("failed to start:", e); + } + started = true; + } + + public synchronized Row get() throws InterruptedException { + final CDCEvent event = eventsBuffer.poll(100, TimeUnit.MILLISECONDS); + if (event != null) { + switch (event.eventType) { + case ROW: + return event.row; + case RESOLVED_TS: + handleResolvedTs(event.regionId, event.resolvedTs); + break; + case ERROR: + handleErrorEvent(event.regionId, event.error); + break; + } + } + return null; + } + + public synchronized long getMinResolvedTs() { + return resolvedTsSet.firstEntry().getElement(); + } + + public synchronized long getMaxResolvedTs() { + return resolvedTsSet.lastEntry().getElement(); + } + + public synchronized void close() { + removeRegions(regionClients.keySet()); + } + + private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) { + final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager()); + + final Iterator newRegionsIterator = + splitter + .splitRangeByRegion(Arrays.asList(keyRange)) + .stream() + .map(RegionTask::getRegion) + .sorted((a, b) -> Long.compare(a.getId(), b.getId())) + .iterator(); + final Iterator oldRegionsIterator = regionClients.values().iterator(); + + final ArrayList regionsToAdd = new ArrayList<>(); + final ArrayList regionsToRemove = new ArrayList<>(); + + TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + RegionCDCClient oldRegionClient = + oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + + while (newRegion != null && oldRegionClient != null) { + if (newRegion.getId() == oldRegionClient.getRegion().getId()) { + // check if should refresh region + if (!oldRegionClient.isRunning()) { + regionsToRemove.add(newRegion.getId()); + regionsToAdd.add(newRegion); + } + + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } else if (newRegion.getId() < oldRegionClient.getRegion().getId()) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } else { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + } + + while (newRegion != null) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } + + while (oldRegionClient != null) { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + + removeRegions(regionsToRemove); + addRegions(regionsToAdd, timestamp); + LOGGER.info("keyRange applied"); + } + + private synchronized void addRegions(final Iterable regions, final long timestamp) { + LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp); + for (final TiRegion region : regions) { + if (overlapWithRegion(region)) { + final String address = + session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress(); + final ManagedChannel channel = + session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping()); + try { + final RegionCDCClient client = + new RegionCDCClient(region, keyRange, channel, eventsBuffer::offer); + regionClients.put(region.getId(), client); + regionToResolvedTs.put(region.getId(), timestamp); + resolvedTsSet.add(timestamp); + + client.start(timestamp); + } catch (final Exception e) { + LOGGER.error("failed to add region(regionId: {}, reason: {})", region.getId(), e); + throw new RuntimeException(e); + } + } + } + } + + private synchronized void removeRegions(final Iterable regionIds) { + LOGGER.info("remove regions: {}", regionIds); + for (final long regionId : regionIds) { + final RegionCDCClient regionClient = regionClients.remove(regionId); + if (regionClient != null) { + try { + regionClient.close(); + } catch (final Exception e) { + LOGGER.error("failed to close region client, region id: {}, error: {}", regionId, e); + } finally { + resolvedTsSet.remove(regionToResolvedTs.remove(regionId)); + regionToResolvedTs.remove(regionId); + } + } + } + } + + private boolean overlapWithRegion(final TiRegion region) { + final Range regionRange = + Range.closedOpen(Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey())); + final Range clientRange = + Range.closedOpen(Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd())); + final Range intersection = regionRange.intersection(clientRange); + return !intersection.isEmpty(); + } + + private void handleResolvedTs(final long regionId, final long resolvedTs) { + LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId); + resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs)); + resolvedTsSet.add(resolvedTs); + } + + private void handleErrorEvent(final long regionId, final Throwable error) { + LOGGER.info("handle error: {}, regionId: {}", error, regionId); + final TiRegion region = regionClients.get(regionId).getRegion(); + session.getRegionManager().onRequestFail(region); // invalidate cache for corresponding region + + removeRegions(Arrays.asList(regionId)); + applyKeyRange(keyRange, getMinResolvedTs()); // reapply the whole keyRange + } +} diff --git a/src/main/java/org/tikv/cdc/CDCEvent.java b/src/main/java/org/tikv/cdc/CDCEvent.java new file mode 100644 index 00000000000..52013af2fe1 --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCEvent.java @@ -0,0 +1,65 @@ +package org.tikv.cdc; + +import java.util.Objects; +import org.tikv.kvproto.Cdcpb.Event.Row; + +class CDCEvent { + enum CDCEventType { + ROW, + RESOLVED_TS, + ERROR; + }; + + public final long regionId; + + public final CDCEventType eventType; + + public final long resolvedTs; + + public final Row row; + + public final Throwable error; + + private CDCEvent( + final long regionId, + final CDCEventType eventType, + final long resolvedTs, + final Row row, + final Throwable error) { + this.regionId = regionId; + this.eventType = eventType; + this.resolvedTs = resolvedTs; + this.row = row; + this.error = error; + } + + public static CDCEvent rowEvent(final long regionId, final Row row) { + return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null); + } + + public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) { + return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null); + } + + public static CDCEvent error(final long regionId, final Throwable error) { + return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CDCEvent[").append(eventType.toString()).append("] {"); + switch (eventType) { + case ERROR: + builder.append("error=").append(error.getMessage()); + break; + case RESOLVED_TS: + builder.append("resolvedTs=").append(resolvedTs); + break; + case ROW: + builder.append("row=").append(Objects.toString(row)); + break; + } + return builder.append("}").toString(); + } +} diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java new file mode 100644 index 00000000000..63454976b45 --- /dev/null +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -0,0 +1,149 @@ +package org.tikv.cdc; + +import com.google.common.base.Preconditions; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.region.TiRegion; +import org.tikv.kvproto.Cdcpb.ChangeDataEvent; +import org.tikv.kvproto.Cdcpb.ChangeDataRequest; +import org.tikv.kvproto.Cdcpb.Event.LogType; +import org.tikv.kvproto.Cdcpb.Header; +import org.tikv.kvproto.Cdcpb.ResolvedTs; +import org.tikv.kvproto.ChangeDataGrpc; +import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; +import org.tikv.kvproto.Coprocessor.KeyRange; + +class RegionCDCClient implements AutoCloseable, StreamObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); + private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0); + private static final Set ALLOWED_LOGTYPE = + Set.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); + + private final TiRegion region; + private final KeyRange keyRange; + private final KeyRange regionKeyRange; + private final ManagedChannel channel; + private final ChangeDataStub asyncStub; + private final Consumer eventConsumer; + private final AtomicBoolean running = new AtomicBoolean(false); + + private boolean started = false; + + public RegionCDCClient( + final TiRegion region, + final KeyRange keyRange, + final ManagedChannel channel, + final Consumer eventConsumer) { + this.region = region; + this.keyRange = keyRange; + this.channel = channel; + this.asyncStub = ChangeDataGrpc.newStub(channel); + this.eventConsumer = eventConsumer; + + this.regionKeyRange = + KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build(); + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "RegionCDCClient has already started"); + running.set(true); + LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get()); + final ChangeDataRequest request = + ChangeDataRequest.newBuilder() + .setRequestId(REQ_ID_COUNTER.incrementAndGet()) + .setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build()) + .setRegionId(region.getId()) + .setCheckpointTs(startTs) + .setStartKey(keyRange.getStart()) + .setEndKey(keyRange.getEnd()) + .setRegionEpoch(region.getRegionEpoch()) + .build(); + final StreamObserver requestObserver = asyncStub.eventFeed(this); + requestObserver.onNext(request); + } + + public TiRegion getRegion() { + return region; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + public KeyRange getRegionKeyRange() { + return regionKeyRange; + } + + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() throws Exception { + LOGGER.info("close (region: {})", region.getId()); + running.set(false); + synchronized (this) { + channel.shutdown(); + } + try { + LOGGER.debug("awaitTermination (region: {})", region.getId()); + channel.awaitTermination(60, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId()); + Thread.currentThread().interrupt(); + synchronized (this) { + channel.shutdownNow(); + } + } + LOGGER.info("terminated (region: {})", region.getId()); + } + + @Override + public void onCompleted() { + // should never been called + onError(new IllegalStateException("RegionCDCClient should never complete")); + } + + @Override + public void onError(final Throwable error) { + LOGGER.error("region CDC error: region: {}, error: {}", region.getId(), error); + running.set(false); + eventConsumer.accept(CDCEvent.error(region.getId(), error)); + } + + @Override + public void onNext(final ChangeDataEvent event) { + try { + if (running.get()) { + event + .getEventsList() + .stream() + .flatMap(ev -> ev.getEntries().getEntriesList().stream()) + .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) + .map(row -> CDCEvent.rowEvent(region.getId(), row)) + .forEach(this::submitEvent); + + if (event.hasResolvedTs()) { + final ResolvedTs resolvedTs = event.getResolvedTs(); + if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { + submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); + } + } + } + } catch (final Exception e) { + onError(e); + } + } + + private void submitEvent(final CDCEvent event) { + LOGGER.info("submit event: {}", event); + eventConsumer.accept(event); + } +} diff --git a/src/main/java/org/tikv/common/codec/TableCodec.java b/src/main/java/org/tikv/common/codec/TableCodec.java index f904dbad97e..ed835f5391d 100644 --- a/src/main/java/org/tikv/common/codec/TableCodec.java +++ b/src/main/java/org/tikv/common/codec/TableCodec.java @@ -40,6 +40,16 @@ public static byte[] encodeRow( return TableCodecV1.encodeRow(columnInfos, values, isPkHandle); } + public static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { + if (value.length == 0) { + throw new CodecException("Decode fails: value length is zero"); + } + if ((value[0] & 0xff) == org.tikv.common.codec.RowV2.CODEC_VER) { + return TableCodecV2.decodeObjects(value, handle, tableInfo); + } + return TableCodecV1.decodeObjects(value, handle, tableInfo); + } + public static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { if (value.length == 0) { throw new CodecException("Decode fails: value length is zero"); diff --git a/src/main/java/org/tikv/common/codec/TableCodecV1.java b/src/main/java/org/tikv/common/codec/TableCodecV1.java index 2945409a252..f72022de287 100644 --- a/src/main/java/org/tikv/common/codec/TableCodecV1.java +++ b/src/main/java/org/tikv/common/codec/TableCodecV1.java @@ -49,7 +49,7 @@ protected static byte[] encodeRow( return cdo.toBytes(); } - protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { if (handle == null && tableInfo.isPkHandle()) { throw new IllegalArgumentException("when pk is handle, handle cannot be null"); } @@ -80,7 +80,10 @@ protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) res[i] = decodedDataMap.get(col.getId()); } } + return res; + } - return ObjectRowImpl.create(res); + protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo)); } } diff --git a/src/main/java/org/tikv/common/codec/TableCodecV2.java b/src/main/java/org/tikv/common/codec/TableCodecV2.java index 7536578a8d9..d9fa8efc47b 100644 --- a/src/main/java/org/tikv/common/codec/TableCodecV2.java +++ b/src/main/java/org/tikv/common/codec/TableCodecV2.java @@ -49,7 +49,7 @@ protected static byte[] encodeRow( return encoder.encode(columnInfoList, valueList); } - protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { if (handle == null && tableInfo.isPkHandle()) { throw new IllegalArgumentException("when pk is handle, handle cannot be null"); } @@ -85,6 +85,10 @@ protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) TiColumnInfo col = tableInfo.getColumn(i); res[i] = decodedDataMap.get(col.getId()); } - return ObjectRowImpl.create(res); + return res; + } + + protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo)); } } From 7514d0fa08ce47cc5457d5f1728530bfe7fef5ca Mon Sep 17 00:00:00 2001 From: chase Date: Wed, 19 May 2021 23:16:23 +0900 Subject: [PATCH 2/5] Add convenient constructor for TwoPhaseCommitter Signed-off-by: chase --- .../java/org/tikv/cdc/RegionCDCClient.java | 6 ++-- .../java/org/tikv/txn/TwoPhaseCommitter.java | 36 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java index 63454976b45..fd28e5fe918 100644 --- a/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -19,6 +19,7 @@ import org.tikv.kvproto.ChangeDataGrpc; import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.kvproto.Kvrpcpb; class RegionCDCClient implements AutoCloseable, StreamObserver { private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); @@ -64,6 +65,7 @@ public synchronized void start(final long startTs) { .setStartKey(keyRange.getStart()) .setEndKey(keyRange.getEnd()) .setRegionEpoch(region.getRegionEpoch()) + .setExtraOp(Kvrpcpb.ExtraOp.ReadOldValue) .build(); final StreamObserver requestObserver = asyncStub.eventFeed(this); requestObserver.onNext(request); @@ -122,9 +124,7 @@ public void onError(final Throwable error) { public void onNext(final ChangeDataEvent event) { try { if (running.get()) { - event - .getEventsList() - .stream() + event.getEventsList().stream() .flatMap(ev -> ev.getEntries().getEntriesList().stream()) .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) .map(row -> CDCEvent.rowEvent(region.getId(), row)) diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java index 9dca512a839..8d7e8af42ad 100644 --- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java +++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java @@ -82,17 +82,21 @@ public class TwoPhaseCommitter { private final ExecutorService executorService; public TwoPhaseCommitter(TiSession session, long startTime) { - this.kvClient = session.createTxnClient(); - this.regionManager = kvClient.getRegionManager(); - this.startTs = startTime; - this.lockTTL = DEFAULT_BATCH_WRITE_LOCK_TTL; - this.retryCommitSecondaryKeys = true; - this.txnPrewriteBatchSize = TXN_COMMIT_BATCH_SIZE; - this.txnCommitBatchSize = TXN_COMMIT_BATCH_SIZE; - this.writeBufferSize = WRITE_BUFFER_SIZE; - this.writeThreadPerTask = 1; - this.prewriteMaxRetryTimes = 3; - this.executorService = createExecutorService(); + this(session, startTime, DEFAULT_BATCH_WRITE_LOCK_TTL); + } + + public TwoPhaseCommitter(TiSession session, long startTime, long lockTTL) { + this( + session, + startTime, + lockTTL, + TXN_COMMIT_BATCH_SIZE, + TXN_COMMIT_BATCH_SIZE, + WRITE_BUFFER_SIZE, + 1, + true, + 3, + createExecutorService(WRITE_BUFFER_SIZE)); } public TwoPhaseCommitter( @@ -104,7 +108,8 @@ public TwoPhaseCommitter( int writeBufferSize, int writeThreadPerTask, boolean retryCommitSecondaryKeys, - int prewriteMaxRetryTimes) { + int prewriteMaxRetryTimes, + ExecutorService executorService) { this.kvClient = session.createTxnClient(); this.regionManager = kvClient.getRegionManager(); this.startTs = startTime; @@ -115,13 +120,12 @@ public TwoPhaseCommitter( this.writeBufferSize = writeBufferSize; this.writeThreadPerTask = writeThreadPerTask; this.prewriteMaxRetryTimes = prewriteMaxRetryTimes; - this.executorService = createExecutorService(); + this.executorService = executorService; } - private ExecutorService createExecutorService() { + private static ExecutorService createExecutorService(int size) { return Executors.newFixedThreadPool( - writeThreadPerTask, - new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + size, new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); } public void close() throws Exception { From a0c2529a8dcb8ec8728ed45f784a9bf1b6fb0f22 Mon Sep 17 00:00:00 2001 From: chase Date: Tue, 25 May 2021 22:26:34 +0900 Subject: [PATCH 3/5] Add CDCConfig Signed-off-by: chase --- pom.xml | 4 +-- src/main/java/org/tikv/cdc/CDCClient.java | 12 +++++-- src/main/java/org/tikv/cdc/CDCConfig.java | 31 +++++++++++++++++++ .../java/org/tikv/cdc/RegionCDCClient.java | 13 +++++--- 4 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/tikv/cdc/CDCConfig.java diff --git a/pom.xml b/pom.xml index 1e0b84e660c..d1d7e59c3cb 100644 --- a/pom.xml +++ b/pom.xml @@ -337,8 +337,8 @@ maven-compiler-plugin 3.7.0 - 9 - 9 + 1.8 + 1.8 UTF-8 true true diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java index 42aef4e2eff..1617773d505 100644 --- a/src/main/java/org/tikv/cdc/CDCClient.java +++ b/src/main/java/org/tikv/cdc/CDCClient.java @@ -26,12 +26,12 @@ public class CDCClient implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class); - private static final int EVENT_BUFFER_SIZE = 10000; private final TiSession session; private final KeyRange keyRange; + private final CDCConfig config; - private final BlockingQueue eventsBuffer = new ArrayBlockingQueue<>(EVENT_BUFFER_SIZE); + private final BlockingQueue eventsBuffer; private final TreeMap regionClients = new TreeMap<>(); private final Map regionToResolvedTs = new HashMap<>(); private final TreeMultiset resolvedTsSet = TreeMultiset.create(); @@ -39,11 +39,17 @@ public class CDCClient implements AutoCloseable { private boolean started = false; public CDCClient(final TiSession session, final KeyRange keyRange) { + this(session, keyRange, new CDCConfig()); + } + + public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) { Preconditions.checkState( session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), "Unsupported Isolation Level"); // only support SI for now this.session = session; this.keyRange = keyRange; + this.config = config; + eventsBuffer = new ArrayBlockingQueue<>(config.getEventBufferSize()); } public synchronized void start(final long startTs) { @@ -148,7 +154,7 @@ private synchronized void addRegions(final Iterable regions, final lon session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping()); try { final RegionCDCClient client = - new RegionCDCClient(region, keyRange, channel, eventsBuffer::offer); + new RegionCDCClient(region, keyRange, channel, eventsBuffer::offer, config); regionClients.put(region.getId(), client); regionToResolvedTs.put(region.getId(), timestamp); resolvedTsSet.add(timestamp); diff --git a/src/main/java/org/tikv/cdc/CDCConfig.java b/src/main/java/org/tikv/cdc/CDCConfig.java new file mode 100644 index 00000000000..9692a3bd0ae --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCConfig.java @@ -0,0 +1,31 @@ +package org.tikv.cdc; + +import org.tikv.kvproto.Kvrpcpb; + +public class CDCConfig { + private static final int EVENT_BUFFER_SIZE = 50000; + private static final boolean READ_OLD_VALUE = true; + + private int eventBufferSize = EVENT_BUFFER_SIZE; + private boolean readOldValue = READ_OLD_VALUE; + + public void setEventBufferSize(final int bufferSize) { + eventBufferSize = bufferSize; + } + + public void setReadOldValue(final boolean value) { + readOldValue = value; + } + + public int getEventBufferSize() { + return eventBufferSize; + } + + public boolean getReadOldValue() { + return readOldValue; + } + + Kvrpcpb.ExtraOp getExtraOp() { + return readOldValue ? Kvrpcpb.ExtraOp.ReadOldValue : Kvrpcpb.ExtraOp.Noop; + } +} diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java index fd28e5fe918..54bed736636 100644 --- a/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -19,7 +19,6 @@ import org.tikv.kvproto.ChangeDataGrpc; import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; import org.tikv.kvproto.Coprocessor.KeyRange; -import org.tikv.kvproto.Kvrpcpb; class RegionCDCClient implements AutoCloseable, StreamObserver { private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); @@ -33,6 +32,8 @@ class RegionCDCClient implements AutoCloseable, StreamObserver private final ManagedChannel channel; private final ChangeDataStub asyncStub; private final Consumer eventConsumer; + private final CDCConfig config; + private final AtomicBoolean running = new AtomicBoolean(false); private boolean started = false; @@ -41,12 +42,14 @@ public RegionCDCClient( final TiRegion region, final KeyRange keyRange, final ManagedChannel channel, - final Consumer eventConsumer) { + final Consumer eventConsumer, + final CDCConfig config) { this.region = region; this.keyRange = keyRange; this.channel = channel; this.asyncStub = ChangeDataGrpc.newStub(channel); this.eventConsumer = eventConsumer; + this.config = config; this.regionKeyRange = KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build(); @@ -65,7 +68,7 @@ public synchronized void start(final long startTs) { .setStartKey(keyRange.getStart()) .setEndKey(keyRange.getEnd()) .setRegionEpoch(region.getRegionEpoch()) - .setExtraOp(Kvrpcpb.ExtraOp.ReadOldValue) + .setExtraOp(config.getExtraOp()) .build(); final StreamObserver requestObserver = asyncStub.eventFeed(this); requestObserver.onNext(request); @@ -124,7 +127,9 @@ public void onError(final Throwable error) { public void onNext(final ChangeDataEvent event) { try { if (running.get()) { - event.getEventsList().stream() + event + .getEventsList() + .stream() .flatMap(ev -> ev.getEntries().getEntriesList().stream()) .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) .map(row -> CDCEvent.rowEvent(region.getId(), row)) From f895a7cd52f3f5fa9814f42c0140c4aff7df8f1d Mon Sep 17 00:00:00 2001 From: chase Date: Mon, 31 May 2021 21:25:04 +0900 Subject: [PATCH 4/5] Resolve compile errors due to java version Signed-off-by: chase --- src/main/java/org/tikv/cdc/RegionCDCClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java index 54bed736636..2bdf58324b4 100644 --- a/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -1,6 +1,7 @@ package org.tikv.cdc; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import java.util.Set; @@ -24,7 +25,7 @@ class RegionCDCClient implements AutoCloseable, StreamObserver private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0); private static final Set ALLOWED_LOGTYPE = - Set.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); + ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); private final TiRegion region; private final KeyRange keyRange; From 936ede3b06b610a96c7d21d14367589c39170ac8 Mon Sep 17 00:00:00 2001 From: chase Date: Thu, 3 Jun 2021 19:27:57 +0900 Subject: [PATCH 5/5] Revert lib version Signed-off-by: chase --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d1d7e59c3cb..4e4cff5be70 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.tikv tikv-client-java - 3.2.0-SNAPSHOT + 3.1.0-SNAPSHOT jar TiKV Java Client A Java Client for TiKV