diff --git a/pom.xml b/pom.xml index 3b7a796cb25..375265b7cc9 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,6 @@ 1.8 UTF-8 UTF-8 - 6.22.1.1 3.5.1 1.2.17 1.7.16 @@ -92,11 +91,6 @@ perfmark-traceviewer 0.24.0 - - org.rocksdb - rocksdbjni - ${rocksdb.version} - org.antlr antlr4-runtime diff --git a/src/main/java/org/tikv/br/BackupDecoder.java b/src/main/java/org/tikv/br/BackupDecoder.java deleted file mode 100644 index a6654c6ae8f..00000000000 --- a/src/main/java/org/tikv/br/BackupDecoder.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import java.io.Serializable; -import org.rocksdb.Options; -import org.rocksdb.ReadOptions; -import org.tikv.common.exception.SSTDecodeException; -import org.tikv.kvproto.Brpb; - -public class BackupDecoder implements Serializable { - private final Brpb.BackupMeta backupMeta; - private final boolean ttlEnabled; - private final KVDecoder kvDecoder; - - public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException { - this.backupMeta = backupMeta; - this.ttlEnabled = false; - this.kvDecoder = initKVDecoder(); - } - - public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException { - this.backupMeta = backupMeta; - this.ttlEnabled = ttlEnabled; - this.kvDecoder = initKVDecoder(); - } - - private KVDecoder initKVDecoder() throws SSTDecodeException { - if (backupMeta.getIsRawKv()) { - if ("V1".equals(backupMeta.getApiVersion().name())) { - return new RawKVDecoderV1(ttlEnabled); - } else { - throw new SSTDecodeException( - "does not support decode APIVersion " + backupMeta.getApiVersion().name()); - } - } else { - throw new SSTDecodeException("TxnKV is not supported yet!"); - } - } - - public SSTDecoder decodeSST(String sstFilePath) { - return decodeSST(sstFilePath, new Options(), new ReadOptions()); - } - - public SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) { - return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions); - } - - public Brpb.BackupMeta getBackupMeta() { - return backupMeta; - } -} diff --git a/src/main/java/org/tikv/br/BackupMetaDecoder.java b/src/main/java/org/tikv/br/BackupMetaDecoder.java deleted file mode 100644 index 4ffbf8b53c9..00000000000 --- a/src/main/java/org/tikv/br/BackupMetaDecoder.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import org.tikv.kvproto.Brpb; - -public class BackupMetaDecoder { - private final Brpb.BackupMeta backupMeta; - - public BackupMetaDecoder(byte[] data) throws InvalidProtocolBufferException { - this.backupMeta = Brpb.BackupMeta.parseFrom(data); - } - - public Brpb.BackupMeta getBackupMeta() { - return backupMeta; - } - - public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException { - byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath()); - return new BackupMetaDecoder(data); - } -} diff --git a/src/main/java/org/tikv/br/KVDecoder.java b/src/main/java/org/tikv/br/KVDecoder.java deleted file mode 100644 index 651d90f667f..00000000000 --- a/src/main/java/org/tikv/br/KVDecoder.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.ByteString; -import java.io.Serializable; - -public interface KVDecoder extends Serializable { - ByteString decodeKey(byte[] key); - - ByteString decodeValue(byte[] value); -} diff --git a/src/main/java/org/tikv/br/RawKVDecoderV1.java b/src/main/java/org/tikv/br/RawKVDecoderV1.java deleted file mode 100644 index 02c009914e5..00000000000 --- a/src/main/java/org/tikv/br/RawKVDecoderV1.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.ByteString; -import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RawKVDecoderV1 implements KVDecoder { - private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class); - - private final boolean ttlEnabled; - - public RawKVDecoderV1(boolean ttlEnabled) { - this.ttlEnabled = ttlEnabled; - } - - @Override - public ByteString decodeKey(byte[] key) { - if (key == null || key.length == 0) { - logger.warn( - "skip Key-Value pair because key == null || key.length == 0, key = " - + Arrays.toString(key)); - return null; - } else if (key[0] != 'z') { - logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key)); - return null; - } - return ByteString.copyFrom(key, 1, key.length - 1); - } - - @Override - public ByteString decodeValue(byte[] value) { - if (!ttlEnabled) { - return ByteString.copyFrom(value); - } else { - return ByteString.copyFrom(value).substring(0, value.length - 8); - } - } -} diff --git a/src/main/java/org/tikv/br/SSTDecoder.java b/src/main/java/org/tikv/br/SSTDecoder.java deleted file mode 100644 index 8d235504cd3..00000000000 --- a/src/main/java/org/tikv/br/SSTDecoder.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.ByteString; -import java.util.Iterator; -import org.rocksdb.Options; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileReader; -import org.rocksdb.SstFileReaderIterator; -import org.tikv.common.util.Pair; - -public class SSTDecoder { - private final String filePath; - private final KVDecoder kvDecoder; - private final Options options; - private final ReadOptions readOptions; - - private SstFileReader sstFileReader; - private SstFileReaderIterator iterator; - - public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) { - this.filePath = sstFilePath; - this.kvDecoder = kvDecoder; - this.options = new Options(); - this.readOptions = new ReadOptions(); - } - - public SSTDecoder( - String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) { - this.filePath = filePath; - this.kvDecoder = kvDecoder; - this.options = options; - this.readOptions = readOptions; - } - - public synchronized Iterator> getIterator() throws RocksDBException { - if (sstFileReader != null || iterator != null) { - throw new RocksDBException("File already opened!"); - } - - sstFileReader = new SstFileReader(new Options()); - sstFileReader.open(filePath); - iterator = sstFileReader.newIterator(new ReadOptions()); - return new SSTIterator(iterator, kvDecoder); - } - - public synchronized void close() { - try { - if (iterator != null) { - iterator.close(); - } - } finally { - iterator = null; - } - - try { - if (sstFileReader != null) { - sstFileReader.close(); - } - } finally { - sstFileReader = null; - } - } - - public String getFilePath() { - return filePath; - } - - public Options getOptions() { - return options; - } - - public ReadOptions getReadOptions() { - return readOptions; - } -} diff --git a/src/main/java/org/tikv/br/SSTIterator.java b/src/main/java/org/tikv/br/SSTIterator.java deleted file mode 100644 index 1dd55cbfa81..00000000000 --- a/src/main/java/org/tikv/br/SSTIterator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.ByteString; -import java.util.Iterator; -import org.rocksdb.SstFileReaderIterator; -import org.tikv.common.util.Pair; - -public class SSTIterator implements Iterator> { - private final SstFileReaderIterator iterator; - private final KVDecoder kvDecoder; - - private Pair nextPair; - - public SSTIterator(SstFileReaderIterator iterator, KVDecoder kvDecoder) { - this.iterator = iterator; - this.kvDecoder = kvDecoder; - this.iterator.seekToFirst(); - this.nextPair = processNext(); - } - - @Override - public boolean hasNext() { - return nextPair != null; - } - - @Override - public Pair next() { - Pair result = nextPair; - nextPair = processNext(); - return result; - } - - private Pair processNext() { - if (iterator.isValid()) { - ByteString key = kvDecoder.decodeKey(iterator.key()); - ByteString value = kvDecoder.decodeValue(iterator.value()); - iterator.next(); - if (key != null) { - return Pair.create(key, value); - } else { - return processNext(); - } - } else { - return null; - } - } -} diff --git a/src/main/java/org/tikv/cdc/CDCClient.java b/src/main/java/org/tikv/cdc/CDCClient.java deleted file mode 100644 index 58fd20e97df..00000000000 --- a/src/main/java/org/tikv/cdc/CDCClient.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.cdc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Range; -import com.google.common.collect.TreeMultiset; -import io.grpc.ManagedChannel; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.tikv.common.TiSession; -import org.tikv.common.key.Key; -import org.tikv.common.region.TiRegion; -import org.tikv.common.util.RangeSplitter; -import org.tikv.common.util.RangeSplitter.RegionTask; -import org.tikv.kvproto.Cdcpb.Event.Row; -import org.tikv.kvproto.Coprocessor.KeyRange; -import org.tikv.kvproto.Kvrpcpb; - -public class CDCClient implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class); - - private final TiSession session; - private final KeyRange keyRange; - private final CDCConfig config; - - private final BlockingQueue eventsBuffer; - private final TreeMap regionClients = new TreeMap<>(); - private final Map regionToResolvedTs = new HashMap<>(); - private final TreeMultiset resolvedTsSet = TreeMultiset.create(); - - private boolean started = false; - - public CDCClient(final TiSession session, final KeyRange keyRange) { - this(session, keyRange, new CDCConfig()); - } - - public CDCClient(final TiSession session, final KeyRange keyRange, final CDCConfig config) { - Preconditions.checkState( - session.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), - "Unsupported Isolation Level"); // only support SI for now - this.session = session; - this.keyRange = keyRange; - this.config = config; - eventsBuffer = new ArrayBlockingQueue<>(config.getEventBufferSize()); - } - - public synchronized void start(final long startTs) { - Preconditions.checkState(!started, "Client is already started"); - try { - applyKeyRange(keyRange, startTs); - } catch (final Throwable e) { - LOGGER.error("failed to start:", e); - } - started = true; - } - - public synchronized Row get() throws InterruptedException { - final CDCEvent event = eventsBuffer.poll(100, TimeUnit.MILLISECONDS); - if (event != null) { - switch (event.eventType) { - case ROW: - return event.row; - case RESOLVED_TS: - handleResolvedTs(event.regionId, event.resolvedTs); - break; - case ERROR: - handleErrorEvent(event.regionId, event.error); - break; - } - } - return null; - } - - public synchronized long getMinResolvedTs() { - return resolvedTsSet.firstEntry().getElement(); - } - - public synchronized long getMaxResolvedTs() { - return resolvedTsSet.lastEntry().getElement(); - } - - public synchronized void close() { - removeRegions(regionClients.keySet()); - } - - private synchronized void applyKeyRange(final KeyRange keyRange, final long timestamp) { - final RangeSplitter splitter = RangeSplitter.newSplitter(session.getRegionManager()); - - final Iterator newRegionsIterator = - splitter - .splitRangeByRegion(Arrays.asList(keyRange)) - .stream() - .map(RegionTask::getRegion) - .sorted((a, b) -> Long.compare(a.getId(), b.getId())) - .iterator(); - final Iterator oldRegionsIterator = regionClients.values().iterator(); - - final ArrayList regionsToAdd = new ArrayList<>(); - final ArrayList regionsToRemove = new ArrayList<>(); - - TiRegion newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; - RegionCDCClient oldRegionClient = - oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; - - while (newRegion != null && oldRegionClient != null) { - if (newRegion.getId() == oldRegionClient.getRegion().getId()) { - // check if should refresh region - if (!oldRegionClient.isRunning()) { - regionsToRemove.add(newRegion.getId()); - regionsToAdd.add(newRegion); - } - - newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; - oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; - } else if (newRegion.getId() < oldRegionClient.getRegion().getId()) { - regionsToAdd.add(newRegion); - newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; - } else { - regionsToRemove.add(oldRegionClient.getRegion().getId()); - oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; - } - } - - while (newRegion != null) { - regionsToAdd.add(newRegion); - newRegion = newRegionsIterator.hasNext() ? newRegionsIterator.next() : null; - } - - while (oldRegionClient != null) { - regionsToRemove.add(oldRegionClient.getRegion().getId()); - oldRegionClient = oldRegionsIterator.hasNext() ? oldRegionsIterator.next() : null; - } - - removeRegions(regionsToRemove); - addRegions(regionsToAdd, timestamp); - LOGGER.info("keyRange applied"); - } - - private synchronized void addRegions(final Iterable regions, final long timestamp) { - LOGGER.info("add regions: {}, timestamp: {}", regions, timestamp); - for (final TiRegion region : regions) { - if (overlapWithRegion(region)) { - final String address = - session - .getRegionManager() - .getStoreById(region.getLeader().getStoreId()) - .getStore() - .getAddress(); - final ManagedChannel channel = - session.getChannelFactory().getChannel(address, session.getPDClient().getHostMapping()); - try { - final RegionCDCClient client = - new RegionCDCClient(region, keyRange, channel, eventsBuffer::offer, config); - regionClients.put(region.getId(), client); - regionToResolvedTs.put(region.getId(), timestamp); - resolvedTsSet.add(timestamp); - - client.start(timestamp); - } catch (final Exception e) { - LOGGER.error("failed to add region(regionId: {}, reason: {})", region.getId(), e); - throw new RuntimeException(e); - } - } - } - } - - private synchronized void removeRegions(final Iterable regionIds) { - LOGGER.info("remove regions: {}", regionIds); - for (final long regionId : regionIds) { - final RegionCDCClient regionClient = regionClients.remove(regionId); - if (regionClient != null) { - try { - regionClient.close(); - } catch (final Exception e) { - LOGGER.error("failed to close region client, region id: {}, error: {}", regionId, e); - } finally { - resolvedTsSet.remove(regionToResolvedTs.remove(regionId)); - regionToResolvedTs.remove(regionId); - } - } - } - } - - private boolean overlapWithRegion(final TiRegion region) { - final Range regionRange = - Range.closedOpen(Key.toRawKey(region.getStartKey()), Key.toRawKey(region.getEndKey())); - final Range clientRange = - Range.closedOpen(Key.toRawKey(keyRange.getStart()), Key.toRawKey(keyRange.getEnd())); - final Range intersection = regionRange.intersection(clientRange); - return !intersection.isEmpty(); - } - - private void handleResolvedTs(final long regionId, final long resolvedTs) { - LOGGER.info("handle resolvedTs: {}, regionId: {}", resolvedTs, regionId); - resolvedTsSet.remove(regionToResolvedTs.replace(regionId, resolvedTs)); - resolvedTsSet.add(resolvedTs); - } - - private void handleErrorEvent(final long regionId, final Throwable error) { - LOGGER.info("handle error: {}, regionId: {}", error, regionId); - final TiRegion region = regionClients.get(regionId).getRegion(); - session.getRegionManager().onRequestFail(region); // invalidate cache for corresponding region - - removeRegions(Arrays.asList(regionId)); - applyKeyRange(keyRange, getMinResolvedTs()); // reapply the whole keyRange - } -} diff --git a/src/main/java/org/tikv/cdc/CDCConfig.java b/src/main/java/org/tikv/cdc/CDCConfig.java deleted file mode 100644 index 7de56b04fb4..00000000000 --- a/src/main/java/org/tikv/cdc/CDCConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.cdc; - -import org.tikv.kvproto.Kvrpcpb; - -public class CDCConfig { - private static final int EVENT_BUFFER_SIZE = 50000; - private static final int MAX_ROW_KEY_SIZE = 10240; - private static final boolean READ_OLD_VALUE = true; - - private int eventBufferSize = EVENT_BUFFER_SIZE; - private int maxRowKeySize = MAX_ROW_KEY_SIZE; - private boolean readOldValue = READ_OLD_VALUE; - - public void setEventBufferSize(final int bufferSize) { - eventBufferSize = bufferSize; - } - - public void setMaxRowKeySize(final int rowKeySize) { - maxRowKeySize = rowKeySize; - } - - public void setReadOldValue(final boolean value) { - readOldValue = value; - } - - public int getEventBufferSize() { - return eventBufferSize; - } - - public int getMaxRowKeySize() { - return maxRowKeySize; - } - - public boolean getReadOldValue() { - return readOldValue; - } - - Kvrpcpb.ExtraOp getExtraOp() { - return readOldValue ? Kvrpcpb.ExtraOp.ReadOldValue : Kvrpcpb.ExtraOp.Noop; - } -} diff --git a/src/main/java/org/tikv/cdc/CDCEvent.java b/src/main/java/org/tikv/cdc/CDCEvent.java deleted file mode 100644 index 568f22a9219..00000000000 --- a/src/main/java/org/tikv/cdc/CDCEvent.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.cdc; - -import org.tikv.kvproto.Cdcpb.Event.Row; - -class CDCEvent { - enum CDCEventType { - ROW, - RESOLVED_TS, - ERROR - } - - public final long regionId; - - public final CDCEventType eventType; - - public final long resolvedTs; - - public final Row row; - - public final Throwable error; - - private CDCEvent( - final long regionId, - final CDCEventType eventType, - final long resolvedTs, - final Row row, - final Throwable error) { - this.regionId = regionId; - this.eventType = eventType; - this.resolvedTs = resolvedTs; - this.row = row; - this.error = error; - } - - public static CDCEvent rowEvent(final long regionId, final Row row) { - return new CDCEvent(regionId, CDCEventType.ROW, 0, row, null); - } - - public static CDCEvent resolvedTsEvent(final long regionId, final long resolvedTs) { - return new CDCEvent(regionId, CDCEventType.RESOLVED_TS, resolvedTs, null, null); - } - - public static CDCEvent error(final long regionId, final Throwable error) { - return new CDCEvent(regionId, CDCEventType.ERROR, 0, null, error); - } - - @Override - public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("CDCEvent[").append(eventType.toString()).append("] {"); - switch (eventType) { - case ERROR: - builder.append("error=").append(error.getMessage()); - break; - case RESOLVED_TS: - builder.append("resolvedTs=").append(resolvedTs); - break; - case ROW: - builder.append("row=").append(row); - break; - } - return builder.append("}").toString(); - } -} diff --git a/src/main/java/org/tikv/cdc/RegionCDCClient.java b/src/main/java/org/tikv/cdc/RegionCDCClient.java deleted file mode 100644 index 88146becea1..00000000000 --- a/src/main/java/org/tikv/cdc/RegionCDCClient.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.cdc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.tikv.common.region.TiRegion; -import org.tikv.common.util.FastByteComparisons; -import org.tikv.common.util.KeyRangeUtils; -import org.tikv.kvproto.Cdcpb.ChangeDataEvent; -import org.tikv.kvproto.Cdcpb.ChangeDataRequest; -import org.tikv.kvproto.Cdcpb.Event.LogType; -import org.tikv.kvproto.Cdcpb.Event.Row; -import org.tikv.kvproto.Cdcpb.Header; -import org.tikv.kvproto.Cdcpb.ResolvedTs; -import org.tikv.kvproto.ChangeDataGrpc; -import org.tikv.kvproto.ChangeDataGrpc.ChangeDataStub; -import org.tikv.kvproto.Coprocessor.KeyRange; - -class RegionCDCClient implements AutoCloseable, StreamObserver { - private static final Logger LOGGER = LoggerFactory.getLogger(RegionCDCClient.class); - private static final AtomicLong REQ_ID_COUNTER = new AtomicLong(0); - private static final Set ALLOWED_LOGTYPE = - ImmutableSet.of(LogType.PREWRITE, LogType.COMMIT, LogType.COMMITTED, LogType.ROLLBACK); - - private final TiRegion region; - private final KeyRange keyRange; - private final KeyRange regionKeyRange; - private final ManagedChannel channel; - private final ChangeDataStub asyncStub; - private final Consumer eventConsumer; - private final CDCConfig config; - private final Predicate rowFilter; - - private final AtomicBoolean running = new AtomicBoolean(false); - - private final boolean started = false; - - public RegionCDCClient( - final TiRegion region, - final KeyRange keyRange, - final ManagedChannel channel, - final Consumer eventConsumer, - final CDCConfig config) { - this.region = region; - this.keyRange = keyRange; - this.channel = channel; - this.asyncStub = ChangeDataGrpc.newStub(channel); - this.eventConsumer = eventConsumer; - this.config = config; - - this.regionKeyRange = - KeyRange.newBuilder().setStart(region.getStartKey()).setEnd(region.getEndKey()).build(); - - this.rowFilter = - regionEnclosed() - ? ((row) -> true) - : new Predicate() { - final byte[] buffer = new byte[config.getMaxRowKeySize()]; - - final byte[] start = keyRange.getStart().toByteArray(); - final byte[] end = keyRange.getEnd().toByteArray(); - - @Override - public boolean test(final Row row) { - final int len = row.getKey().size(); - row.getKey().copyTo(buffer, 0); - return (FastByteComparisons.compareTo(buffer, 0, len, start, 0, start.length) >= 0) - && (FastByteComparisons.compareTo(buffer, 0, len, end, 0, end.length) < 0); - } - }; - } - - public synchronized void start(final long startTs) { - Preconditions.checkState(!started, "RegionCDCClient has already started"); - running.set(true); - LOGGER.info("start streaming region: {}, running: {}", region.getId(), running.get()); - final ChangeDataRequest request = - ChangeDataRequest.newBuilder() - .setRequestId(REQ_ID_COUNTER.incrementAndGet()) - .setHeader(Header.newBuilder().setTicdcVersion("5.0.0").build()) - .setRegionId(region.getId()) - .setCheckpointTs(startTs) - .setStartKey(keyRange.getStart()) - .setEndKey(keyRange.getEnd()) - .setRegionEpoch(region.getRegionEpoch()) - .setExtraOp(config.getExtraOp()) - .build(); - final StreamObserver requestObserver = asyncStub.eventFeed(this); - requestObserver.onNext(request); - } - - public TiRegion getRegion() { - return region; - } - - public KeyRange getKeyRange() { - return keyRange; - } - - public KeyRange getRegionKeyRange() { - return regionKeyRange; - } - - public boolean regionEnclosed() { - return KeyRangeUtils.makeRange(keyRange.getStart(), keyRange.getEnd()) - .encloses(KeyRangeUtils.makeRange(regionKeyRange.getStart(), regionKeyRange.getEnd())); - } - - public boolean isRunning() { - return running.get(); - } - - @Override - public void close() throws Exception { - LOGGER.info("close (region: {})", region.getId()); - running.set(false); - synchronized (this) { - channel.shutdown(); - } - try { - LOGGER.debug("awaitTermination (region: {})", region.getId()); - channel.awaitTermination(60, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - LOGGER.error("Failed to shutdown channel(regionId: {})", region.getId()); - Thread.currentThread().interrupt(); - synchronized (this) { - channel.shutdownNow(); - } - } - LOGGER.info("terminated (region: {})", region.getId()); - } - - @Override - public void onCompleted() { - // should never been called - onError(new IllegalStateException("RegionCDCClient should never complete")); - } - - @Override - public void onError(final Throwable error) { - LOGGER.error("region CDC error: region: {}, error: {}", region.getId(), error); - running.set(false); - eventConsumer.accept(CDCEvent.error(region.getId(), error)); - } - - @Override - public void onNext(final ChangeDataEvent event) { - try { - if (running.get()) { - event - .getEventsList() - .stream() - .flatMap(ev -> ev.getEntries().getEntriesList().stream()) - .filter(row -> ALLOWED_LOGTYPE.contains(row.getType())) - .filter(this.rowFilter) - .map(row -> CDCEvent.rowEvent(region.getId(), row)) - .forEach(this::submitEvent); - - if (event.hasResolvedTs()) { - final ResolvedTs resolvedTs = event.getResolvedTs(); - if (resolvedTs.getRegionsList().indexOf(region.getId()) >= 0) { - submitEvent(CDCEvent.resolvedTsEvent(region.getId(), resolvedTs.getTs())); - } - } - } - } catch (final Exception e) { - onError(e); - } - } - - private void submitEvent(final CDCEvent event) { - LOGGER.debug("submit event: {}", event); - eventConsumer.accept(event); - } -} diff --git a/src/test/java/org/tikv/br/BackupDecoderTest.java b/src/test/java/org/tikv/br/BackupDecoderTest.java deleted file mode 100644 index 4abda8604bb..00000000000 --- a/src/test/java/org/tikv/br/BackupDecoderTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2021 TiKV Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.br; - -import com.google.protobuf.ByteString; -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import org.junit.Assert; -import org.junit.Test; -import org.rocksdb.RocksDBException; -import org.tikv.common.util.Pair; -import org.tikv.kvproto.Brpb; - -public class BackupDecoderTest { - - private static final int TOTAL_COUNT = 500; - private static final String KEY_PREFIX = "test_"; - private static final String VALUE = - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; - - @Test - public void rawKVSSTDecoderTest() throws RocksDBException, IOException { - String backupmetaFilePath = "src/test/resources/sst/backupmeta"; - String sst1FilePath = - "src/test/resources/sst/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst"; - String sst2FilePath = - "src/test/resources/sst/4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst"; - - BackupMetaDecoder backupMetaDecoder = BackupMetaDecoder.parse(backupmetaFilePath); - Brpb.BackupMeta backupMeta = backupMetaDecoder.getBackupMeta(); - - BackupDecoder sstBackup = new BackupDecoder(backupMeta); - - decodeSST(sstBackup, sst1FilePath); - decodeSST(sstBackup, sst2FilePath); - } - - @Test - public void rawKVWithTTLSSTDecoderTest() throws RocksDBException, IOException { - String backupmetaFilePath = "src/test/resources/sst_ttl/backupmeta"; - String sst1FilePath = - "src/test/resources/sst_ttl/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst"; - String sst2FilePath = - "src/test/resources/sst_ttl/5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst"; - - BackupMetaDecoder backupMetaDecoder = BackupMetaDecoder.parse(backupmetaFilePath); - Brpb.BackupMeta backupMeta = backupMetaDecoder.getBackupMeta(); - - BackupDecoder sstBackup = new BackupDecoder(backupMeta, true); - - decodeSST(sstBackup, sst1FilePath); - decodeSST(sstBackup, sst2FilePath); - } - - private void decodeSST(BackupDecoder sstBackup, String sst) throws RocksDBException { - String fileName = new File(sst).getName(); - Brpb.File backupFile = - sstBackup - .getBackupMeta() - .getFilesList() - .stream() - .filter(a -> a.getName().equals(fileName)) - .findFirst() - .get(); - Assert.assertEquals(TOTAL_COUNT, backupFile.getTotalKvs()); - - SSTDecoder sstDecoder = sstBackup.decodeSST(sst); - Iterator> iterator = sstDecoder.getIterator(); - int count = 0; - while (iterator.hasNext()) { - Pair pair = iterator.next(); - Assert.assertEquals(VALUE, pair.second.toStringUtf8()); - Assert.assertTrue(pair.first.toStringUtf8().startsWith(KEY_PREFIX)); - count += 1; - } - sstDecoder.close(); - Assert.assertEquals(TOTAL_COUNT, count); - } -} diff --git a/src/test/resources/sst/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst b/src/test/resources/sst/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst deleted file mode 100644 index 8ca01d46781..00000000000 Binary files a/src/test/resources/sst/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst and /dev/null differ diff --git a/src/test/resources/sst/4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst b/src/test/resources/sst/4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst deleted file mode 100644 index 03bcd27c117..00000000000 Binary files a/src/test/resources/sst/4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst and /dev/null differ diff --git a/src/test/resources/sst/backupmeta b/src/test/resources/sst/backupmeta deleted file mode 100644 index abcca0b24ca..00000000000 --- a/src/test/resources/sst/backupmeta +++ /dev/null @@ -1,11 +0,0 @@ -턘a"5.3.0-alpha" -" -`4_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1633919546278_default.sst -婔1N7@h*2,L>c_Q?test"test_-009965169116504@HRdefaultX/" -`1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1633919546277_default.sst H]f;FjLV@7ytest_-009965169116504"u@HRdefaultX0@J -testudefaultR[]ZBR -Release Version: v5.2.1 -Git Commit Hash: cd8fb24c5f7ebd9d479ed228bb41848bd5e97445 -Git Branch: heads/refs/tags/v5.2.1 -Go Version: go1.16.4 -UTC Build Time: 2021-09-07 16:19:11 -Race Enabled: false \ No newline at end of file diff --git a/src/test/resources/sst_ttl/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst b/src/test/resources/sst_ttl/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst deleted file mode 100644 index 6bf2760fa08..00000000000 Binary files a/src/test/resources/sst_ttl/1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst and /dev/null differ diff --git a/src/test/resources/sst_ttl/5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst b/src/test/resources/sst_ttl/5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst deleted file mode 100644 index 9c582e58188..00000000000 Binary files a/src/test/resources/sst_ttl/5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst and /dev/null differ diff --git a/src/test/resources/sst_ttl/backupmeta b/src/test/resources/sst_ttl/backupmeta deleted file mode 100644 index 978ef5f61a9..00000000000 --- a/src/test/resources/sst_ttl/backupmeta +++ /dev/null @@ -1,11 +0,0 @@ -ȗa"5.3.0-alpha" -" -`5_8_2_9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08_1634199092587_default.sst I~Ѷ:8~0KFtest"test_-009965169116504@HRdefaultX1" -`1_2_2_7154800cc311f03afd1532e961b9a878dfbb119b104cf4daad5d0c7c0eacb502_1634199092593_default.sst w2l^9>6#Մލ)qar~test_-009965169116504"u@HRdefaultX1@J -testudefaultR[]ZBR -Release Version: v5.2.1 -Git Commit Hash: cd8fb24c5f7ebd9d479ed228bb41848bd5e97445 -Git Branch: heads/refs/tags/v5.2.1 -Go Version: go1.16.4 -UTC Build Time: 2021-09-07 16:19:11 -Race Enabled: false \ No newline at end of file