diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java new file mode 100644 index 00000000000..1617773d505 --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCClient.java @@ -0,0 +1,211 @@ +package org.tikv.cdc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.TreeMultiset; +import io.grpc.ManagedChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.region.TiRegion; +import org.tikv.common.util.RangeSplitter; +import org.tikv.common.util.RangeSplitter.RegionTask; +import org.tikv.kvproto.Cdcpb.Event.Row; +import org.tikv.kvproto.Coprocessor.KeyRange; +import org.tikv.kvproto.Kvrpcpb; + +public class CDCClient implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class); + + private final TiSession session; + private final KeyRange keyRange; + private final CDCConfig config; + + private final BlockingQueue eventsBuffer; + private final TreeMap regionClients = new TreeMap<>(); + private final Map regionToResolvedTs = new HashMap<>(); + private final TreeMultiset resolvedTsSet = TreeMultiset.create(); + + private boolean started = false; + + public CDCClient(final TiSession session, final KeyRange keyRange) { + this(session, keyRange, new CDCConfig()); + } + + public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) { + Preconditions.checkState( + session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), + "Unsupported Isolation Level"); // only support SI for now + this.session = session; + this.keyRange = keyRange; + this.config = config; + eventsBuffer = new ArrayBlockingQueue<>(config.getEventBufferSize()); + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "Client is already started"); + try { + applyKeyRange(keyRange, startTs); + } catch (final Throwable e) { + LOGGER.error("failed to start:", e); + } + started = true; + } + + public synchronized Row get() throws InterruptedException { + final CDCEvent event = eventsBuffer.poll(100, TimeUnit.MILLISECONDS); + if (event != null) { + switch (event.eventType) { + case ROW: + return event.row; + case RESOLVED_TS: + handleResolvedTs(event.regionId, event.resolvedTs); + break; + case ERROR: + handleErrorEvent(event.regionId, event.error); + break; + } + } + return null; + } + + public synchronized long getMinResolvedTs() { + return resolvedTsSet.firstEntry().getElement(); + } + + public synchronized long getMaxResolvedTs() { + return resolvedTsSet.lastEntry().getElement(); + } + + public synchronized void close() { + removeRegions(regionClients.keySet()); + } + + private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) { + final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager()); + + final Iterator newRegionsIterator = + splitter + .splitRangeByRegion(Arrays.asList(keyRange)) + .stream() + .map(RegionTask::getRegion) + .sorted((a, b) -> Long.compare(a.getId(), b.getId())) + .iterator(); + final Iterator oldRegionsIterator = regionClients.values().iterator(); + + final ArrayList regionsToAdd = new ArrayList<>(); + final ArrayList regionsToRemove = new ArrayList<>(); + + TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + RegionCDCClient oldRegionClient = + oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + + while (newRegion != null && oldRegionClient != null) { + if (newRegion.getId() == oldRegionClient.getRegion().getId()) { + // check if should refresh region + if (!oldRegionClient.isRunning()) { + regionsToRemove.add(newRegion.getId()); + regionsToAdd.add(newRegion); + } + + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } else if (newRegion.getId() < oldRegionClient.getRegion().getId()) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } else { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + } + + while (newRegion != null) { + regionsToAdd.add(newRegion); + newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; + } + + while (oldRegionClient != null) { + regionsToRemove.add(oldRegionClient.getRegion().getId()); + oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; + } + + removeRegions(regionsToRemove); + addRegions(regionsToAdd, timestamp); + LOGGER.info("keyRange applied"); + } + + private synchronized void addRegions(final Iterable regions, final long timestamp) { + LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp); + for (final TiRegion region : regions) { + if (overlapWithRegion(region)) { + final String address = + session.getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress(); + final ManagedChannel channel = + session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping()); + try { + final RegionCDCClient client = + new RegionCDCClient(region, keyRange, channel, eventsBuffer::offer, config); + regionClients.put(region.getId(), client); + regionToResolvedTs.put(region.getId(), timestamp); + resolvedTsSet.add(timestamp); + + client.start(timestamp); + } catch (final Exception e) { + LOGGER.error("failed to add region(regionId: {}, reason: {})", region.getId(), e); + throw new RuntimeException(e); + } + } + } + } + + private synchronized void removeRegions(final Iterable regionIds) { + LOGGER.info("remove regions: {}", regionIds); + for (final long regionId : regionIds) { + final RegionCDCClient regionClient = regionClients.remove(regionId); + if (regionClient != null) { + try { + regionClient.close(); + } catch (final Exception e) { + LOGGER.error("failed to close region client, region id: {}, error: {}", regionId, e); + } finally { + resolvedTsSet.remove(regionToResolvedTs.remove(regionId)); + regionToResolvedTs.remove(regionId); + } + } + } + } + + private boolean overlapWithRegion(final TiRegion region) { + final Range regionRange = + Range.closedOpen(Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey())); + final Range clientRange = + Range.closedOpen(Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd())); + final Range intersection = regionRange.intersection(clientRange); + return !intersection.isEmpty(); + } + + private void handleResolvedTs(final long regionId, final long resolvedTs) { + LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId); + resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs)); + resolvedTsSet.add(resolvedTs); + } + + private void handleErrorEvent(final long regionId, final Throwable error) { + LOGGER.info("handle error: {}, regionId: {}", error, regionId); + final TiRegion region = regionClients.get(regionId).getRegion(); + session.getRegionManager().onRequestFail(region); // invalidate cache for corresponding region + + removeRegions(Arrays.asList(regionId)); + applyKeyRange(keyRange, getMinResolvedTs()); // reapply the whole keyRange + } +} diff --git a/src/main/java/org/tikv/cdc/CDCConfig.java b/src/main/java/org/tikv/cdc/CDCConfig.java new file mode 100644 index 00000000000..9692a3bd0ae --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCConfig.java @@ -0,0 +1,31 @@ +package org.tikv.cdc; + +import org.tikv.kvproto.Kvrpcpb; + +public class CDCConfig { + private static final int EVENT_BUFFER_SIZE = 50000; + private static final boolean READ_OLD_VALUE = true; + + private int eventBufferSize = EVENT_BUFFER_SIZE; + private boolean readOldValue = READ_OLD_VALUE; + + public void setEventBufferSize(final int bufferSize) { + eventBufferSize = bufferSize; + } + + public void setReadOldValue(final boolean value) { + readOldValue = value; + } + + public int getEventBufferSize() { + return eventBufferSize; + } + + public boolean getReadOldValue() { + return readOldValue; + } + + Kvrpcpb.ExtraOp getExtraOp() { + return readOldValue ? Kvrpcpb.ExtraOp.ReadOldValue : Kvrpcpb.ExtraOp.Noop; + } +} diff --git a/src/main/java/org/tikv/cdc/CDCEvent.java b/src/main/java/org/tikv/cdc/CDCEvent.java new file mode 100644 index 00000000000..52013af2fe1 --- /dev/null +++ b/src/main/java/org/tikv/cdc/CDCEvent.java @@ -0,0 +1,65 @@ +package org.tikv.cdc; + +import java.util.Objects; +import org.tikv.kvproto.Cdcpb.Event.Row; + +class CDCEvent { + enum CDCEventType { + ROW, + RESOLVED_TS, + ERROR; + }; + + public final long regionId; + + public final CDCEventType eventType; + + public final long resolvedTs; + + public final Row row; + + public final Throwable error; + + private CDCEvent( + final long regionId, + final CDCEventType eventType, + final long resolvedTs, + final Row row, + final Throwable error) { + this.regionId = regionId; + this.eventType = eventType; + this.resolvedTs = resolvedTs; + this.row = row; + this.error = error; + } + + public static CDCEvent rowEvent(final long regionId, final Row row) { + return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null); + } + + public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) { + return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null); + } + + public static CDCEvent error(final long regionId, final Throwable error) { + return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CDCEvent[").append(eventType.toString()).append("] {"); + switch (eventType) { + case ERROR: + builder.append("error=").append(error.getMessage()); + break; + case RESOLVED_TS: + builder.append("resolvedTs=").append(resolvedTs); + break; + case ROW: + builder.append("row=").append(Objects.toString(row)); + break; + } + return builder.append("}").toString(); + } +} diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java new file mode 100644 index 00000000000..2bdf58324b4 --- /dev/null +++ b/src/main/java/org/tikv/cdc/RegionCDCClient.java @@ -0,0 +1,155 @@ +package org.tikv.cdc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.region.TiRegion; +import org.tikv.kvproto.Cdcpb.ChangeDataEvent; +import org.tikv.kvproto.Cdcpb.ChangeDataRequest; +import org.tikv.kvproto.Cdcpb.Event.LogType; +import org.tikv.kvproto.Cdcpb.Header; +import org.tikv.kvproto.Cdcpb.ResolvedTs; +import org.tikv.kvproto.ChangeDataGrpc; +import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; +import org.tikv.kvproto.Coprocessor.KeyRange; + +class RegionCDCClient implements AutoCloseable, StreamObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); + private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0); + private static final Set ALLOWED_LOGTYPE = + ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); + + private final TiRegion region; + private final KeyRange keyRange; + private final KeyRange regionKeyRange; + private final ManagedChannel channel; + private final ChangeDataStub asyncStub; + private final Consumer eventConsumer; + private final CDCConfig config; + + private final AtomicBoolean running = new AtomicBoolean(false); + + private boolean started = false; + + public RegionCDCClient( + final TiRegion region, + final KeyRange keyRange, + final ManagedChannel channel, + final Consumer eventConsumer, + final CDCConfig config) { + this.region = region; + this.keyRange = keyRange; + this.channel = channel; + this.asyncStub = ChangeDataGrpc.newStub(channel); + this.eventConsumer = eventConsumer; + this.config = config; + + this.regionKeyRange = + KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build(); + } + + public synchronized void start(final long startTs) { + Preconditions.checkState(!started, "RegionCDCClient has already started"); + running.set(true); + LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get()); + final ChangeDataRequest request = + ChangeDataRequest.newBuilder() + .setRequestId(REQ_ID_COUNTER.incrementAndGet()) + .setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build()) + .setRegionId(region.getId()) + .setCheckpointTs(startTs) + .setStartKey(keyRange.getStart()) + .setEndKey(keyRange.getEnd()) + .setRegionEpoch(region.getRegionEpoch()) + .setExtraOp(config.getExtraOp()) + .build(); + final StreamObserver requestObserver = asyncStub.eventFeed(this); + requestObserver.onNext(request); + } + + public TiRegion getRegion() { + return region; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + public KeyRange getRegionKeyRange() { + return regionKeyRange; + } + + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() throws Exception { + LOGGER.info("close (region: {})", region.getId()); + running.set(false); + synchronized (this) { + channel.shutdown(); + } + try { + LOGGER.debug("awaitTermination (region: {})", region.getId()); + channel.awaitTermination(60, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId()); + Thread.currentThread().interrupt(); + synchronized (this) { + channel.shutdownNow(); + } + } + LOGGER.info("terminated (region: {})", region.getId()); + } + + @Override + public void onCompleted() { + // should never been called + onError(new IllegalStateException("RegionCDCClient should never complete")); + } + + @Override + public void onError(final Throwable error) { + LOGGER.error("region CDC error: region: {}, error: {}", region.getId(), error); + running.set(false); + eventConsumer.accept(CDCEvent.error(region.getId(), error)); + } + + @Override + public void onNext(final ChangeDataEvent event) { + try { + if (running.get()) { + event + .getEventsList() + .stream() + .flatMap(ev -> ev.getEntries().getEntriesList().stream()) + .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) + .map(row -> CDCEvent.rowEvent(region.getId(), row)) + .forEach(this::submitEvent); + + if (event.hasResolvedTs()) { + final ResolvedTs resolvedTs = event.getResolvedTs(); + if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { + submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); + } + } + } + } catch (final Exception e) { + onError(e); + } + } + + private void submitEvent(final CDCEvent event) { + LOGGER.info("submit event: {}", event); + eventConsumer.accept(event); + } +} diff --git a/src/main/java/org/tikv/common/codec/TableCodec.java b/src/main/java/org/tikv/common/codec/TableCodec.java index f904dbad97e..ed835f5391d 100644 --- a/src/main/java/org/tikv/common/codec/TableCodec.java +++ b/src/main/java/org/tikv/common/codec/TableCodec.java @@ -40,6 +40,16 @@ public static byte[] encodeRow( return TableCodecV1.encodeRow(columnInfos, values, isPkHandle); } + public static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { + if (value.length == 0) { + throw new CodecException("Decode fails: value length is zero"); + } + if ((value[0] & 0xff) == org.tikv.common.codec.RowV2.CODEC_VER) { + return TableCodecV2.decodeObjects(value, handle, tableInfo); + } + return TableCodecV1.decodeObjects(value, handle, tableInfo); + } + public static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { if (value.length == 0) { throw new CodecException("Decode fails: value length is zero"); diff --git a/src/main/java/org/tikv/common/codec/TableCodecV1.java b/src/main/java/org/tikv/common/codec/TableCodecV1.java index 2945409a252..f72022de287 100644 --- a/src/main/java/org/tikv/common/codec/TableCodecV1.java +++ b/src/main/java/org/tikv/common/codec/TableCodecV1.java @@ -49,7 +49,7 @@ protected static byte[] encodeRow( return cdo.toBytes(); } - protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { if (handle == null && tableInfo.isPkHandle()) { throw new IllegalArgumentException("when pk is handle, handle cannot be null"); } @@ -80,7 +80,10 @@ protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) res[i] = decodedDataMap.get(col.getId()); } } + return res; + } - return ObjectRowImpl.create(res); + protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo)); } } diff --git a/src/main/java/org/tikv/common/codec/TableCodecV2.java b/src/main/java/org/tikv/common/codec/TableCodecV2.java index 7536578a8d9..d9fa8efc47b 100644 --- a/src/main/java/org/tikv/common/codec/TableCodecV2.java +++ b/src/main/java/org/tikv/common/codec/TableCodecV2.java @@ -49,7 +49,7 @@ protected static byte[] encodeRow( return encoder.encode(columnInfoList, valueList); } - protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) { if (handle == null && tableInfo.isPkHandle()) { throw new IllegalArgumentException("when pk is handle, handle cannot be null"); } @@ -85,6 +85,10 @@ protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) TiColumnInfo col = tableInfo.getColumn(i); res[i] = decodedDataMap.get(col.getId()); } - return ObjectRowImpl.create(res); + return res; + } + + protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) { + return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo)); } } diff --git a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java index 9dca512a839..8d7e8af42ad 100644 --- a/src/main/java/org/tikv/txn/TwoPhaseCommitter.java +++ b/src/main/java/org/tikv/txn/TwoPhaseCommitter.java @@ -82,17 +82,21 @@ public class TwoPhaseCommitter { private final ExecutorService executorService; public TwoPhaseCommitter(TiSession session, long startTime) { - this.kvClient = session.createTxnClient(); - this.regionManager = kvClient.getRegionManager(); - this.startTs = startTime; - this.lockTTL = DEFAULT_BATCH_WRITE_LOCK_TTL; - this.retryCommitSecondaryKeys = true; - this.txnPrewriteBatchSize = TXN_COMMIT_BATCH_SIZE; - this.txnCommitBatchSize = TXN_COMMIT_BATCH_SIZE; - this.writeBufferSize = WRITE_BUFFER_SIZE; - this.writeThreadPerTask = 1; - this.prewriteMaxRetryTimes = 3; - this.executorService = createExecutorService(); + this(session, startTime, DEFAULT_BATCH_WRITE_LOCK_TTL); + } + + public TwoPhaseCommitter(TiSession session, long startTime, long lockTTL) { + this( + session, + startTime, + lockTTL, + TXN_COMMIT_BATCH_SIZE, + TXN_COMMIT_BATCH_SIZE, + WRITE_BUFFER_SIZE, + 1, + true, + 3, + createExecutorService(WRITE_BUFFER_SIZE)); } public TwoPhaseCommitter( @@ -104,7 +108,8 @@ public TwoPhaseCommitter( int writeBufferSize, int writeThreadPerTask, boolean retryCommitSecondaryKeys, - int prewriteMaxRetryTimes) { + int prewriteMaxRetryTimes, + ExecutorService executorService) { this.kvClient = session.createTxnClient(); this.regionManager = kvClient.getRegionManager(); this.startTs = startTime; @@ -115,13 +120,12 @@ public TwoPhaseCommitter( this.writeBufferSize = writeBufferSize; this.writeThreadPerTask = writeThreadPerTask; this.prewriteMaxRetryTimes = prewriteMaxRetryTimes; - this.executorService = createExecutorService(); + this.executorService = executorService; } - private ExecutorService createExecutorService() { + private static ExecutorService createExecutorService(int size) { return Executors.newFixedThreadPool( - writeThreadPerTask, - new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); + size, new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build()); } public void close() throws Exception {