From 455faf41882979bda7c1458da236fcb88e251d33 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 27 Feb 2023 18:53:04 +0800 Subject: [PATCH 1/7] implement UpdateServiceGCSafePoint Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 25 +++++++++++++++++++ .../org/tikv/common/ReadOnlyPDClient.java | 2 ++ 2 files changed, 27 insertions(+) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 43383bbda87..19cae0cdabf 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -104,6 +104,7 @@ import org.tikv.kvproto.Pdpb.Timestamp; import org.tikv.kvproto.Pdpb.TsoRequest; import org.tikv.kvproto.Pdpb.TsoResponse; +import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest; public class PDClient extends AbstractGRPCClient implements ReadOnlyPDClient { @@ -383,6 +384,17 @@ private Supplier buildGetAllStoresReq() { return () -> GetAllStoresRequest.newBuilder().setHeader(header).build(); } + private Supplier buildUpdateServiceGCSafePointRequest( + ByteString serverId, long ttl, long savePoint) { + return () -> + UpdateServiceGCSafePointRequest.newBuilder() + .setHeader(header) + .setSafePoint(savePoint) + .setServiceId(serverId) + .setTTL(ttl) + .build(); + } + private PDErrorHandler buildPDErrorHandler() { return new PDErrorHandler<>( r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this); @@ -419,6 +431,19 @@ public TiConfiguration.ReplicaRead getReplicaRead() { return conf.getReplicaRead(); } + @Override + public Long UpdateServiceGCSafePoint( + String serverId, long ttl, long savePoint, BackOffer backOffer) { + return callWithRetry( + backOffer, + PDGrpc.getUpdateServiceGCSafePointMethod(), + buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serverId), ttl, savePoint), + new PDErrorHandler<>( + r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, + this)) + .getMinSafePoint(); + } + @Override public void close() throws InterruptedException { etcdClient.close(); diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index ddf1855e614..63e995c8939 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -72,4 +72,6 @@ List scanRegions( Long getClusterId(); RequestKeyCodec getCodec(); + + Long UpdateServiceGCSafePoint(String serverId, long ttl, long savePoint, BackOffer backOffer); } From f9a0ec7d360bc7fa68c520ff7985644d12e8ab12 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 27 Feb 2023 18:58:06 +0800 Subject: [PATCH 2/7] fmt Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 19cae0cdabf..3ee44d035ef 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -435,12 +435,12 @@ public TiConfiguration.ReplicaRead getReplicaRead() { public Long UpdateServiceGCSafePoint( String serverId, long ttl, long savePoint, BackOffer backOffer) { return callWithRetry( - backOffer, - PDGrpc.getUpdateServiceGCSafePointMethod(), - buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serverId), ttl, savePoint), - new PDErrorHandler<>( - r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, - this)) + backOffer, + PDGrpc.getUpdateServiceGCSafePointMethod(), + buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serverId), ttl, savePoint), + new PDErrorHandler<>( + r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, + this)) .getMinSafePoint(); } From fbf39fb38fcd511d4c0eb6168f5279d4682c798f Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Tue, 28 Feb 2023 13:52:48 +0800 Subject: [PATCH 3/7] typo Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 10 +++++----- src/main/java/org/tikv/common/ReadOnlyPDClient.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 3ee44d035ef..01fc2f8e2f5 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -385,12 +385,12 @@ private Supplier buildGetAllStoresReq() { } private Supplier buildUpdateServiceGCSafePointRequest( - ByteString serverId, long ttl, long savePoint) { + ByteString serviceId, long ttl, long safePoint) { return () -> UpdateServiceGCSafePointRequest.newBuilder() .setHeader(header) - .setSafePoint(savePoint) - .setServiceId(serverId) + .setSafePoint(safePoint) + .setServiceId(serviceId) .setTTL(ttl) .build(); } @@ -433,11 +433,11 @@ public TiConfiguration.ReplicaRead getReplicaRead() { @Override public Long UpdateServiceGCSafePoint( - String serverId, long ttl, long savePoint, BackOffer backOffer) { + String serviceId, long ttl, long safePoint, BackOffer backOffer) { return callWithRetry( backOffer, PDGrpc.getUpdateServiceGCSafePointMethod(), - buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serverId), ttl, savePoint), + buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serviceId), ttl, safePoint), new PDErrorHandler<>( r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this)) diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 63e995c8939..2bd82e5db09 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -73,5 +73,5 @@ List scanRegions( RequestKeyCodec getCodec(); - Long UpdateServiceGCSafePoint(String serverId, long ttl, long savePoint, BackOffer backOffer); + Long UpdateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); } From 5a41fef07140de2019569a788ea3334b25e04230 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 6 Mar 2023 11:41:20 +0800 Subject: [PATCH 4/7] add comment Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 2 +- src/main/java/org/tikv/common/ReadOnlyPDClient.java | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 01fc2f8e2f5..e74601af233 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -432,7 +432,7 @@ public TiConfiguration.ReplicaRead getReplicaRead() { } @Override - public Long UpdateServiceGCSafePoint( + public Long updateServiceGCSafePoint( String serviceId, long ttl, long safePoint, BackOffer backOffer) { return callWithRetry( backOffer, diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 2bd82e5db09..88dd389e8d9 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -73,5 +73,13 @@ List scanRegions( RequestKeyCodec getCodec(); - Long UpdateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); + /** + * Update ServiceGCSafePoint + * + * @param serviceId ServiceId + * @param ttl TTL in seconds + * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good practise + * @return the MinSafePoint of all services. If this value is greater than safePoint, it means update failedmvn + */ + Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); } From a2ba75f5842ae2f7fe0a9d9b7555c9ba27177baf Mon Sep 17 00:00:00 2001 From: shi yuhang <52435083+shiyuhang0@users.noreply.github.com> Date: Fri, 3 Mar 2023 23:55:40 +0800 Subject: [PATCH 5/7] fix pair equality check (#724) Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/util/Pair.java | 18 +++++ .../java/org/tikv/common/util/PairTest.java | 74 +++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/test/java/org/tikv/common/util/PairTest.java diff --git a/src/main/java/org/tikv/common/util/Pair.java b/src/main/java/org/tikv/common/util/Pair.java index 803880a9d14..65ae082e788 100644 --- a/src/main/java/org/tikv/common/util/Pair.java +++ b/src/main/java/org/tikv/common/util/Pair.java @@ -18,6 +18,7 @@ package org.tikv.common.util; import java.io.Serializable; +import java.util.Objects; public class Pair implements Serializable { public final F first; @@ -36,4 +37,21 @@ public static Pair create(F f, S s) { public String toString() { return String.format("[%s:%s]", first, second); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pair pair = (Pair) o; + return Objects.equals(first, pair.first) && Objects.equals(second, pair.second); + } + + @Override + public int hashCode() { + return Objects.hash(first, second); + } } diff --git a/src/test/java/org/tikv/common/util/PairTest.java b/src/test/java/org/tikv/common/util/PairTest.java new file mode 100644 index 00000000000..b1fd0c6bc98 --- /dev/null +++ b/src/test/java/org/tikv/common/util/PairTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.util; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.junit.Test; +import org.tikv.common.PDMockServerTest; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Peer; + +public class PairTest extends PDMockServerTest { + + @Test + public void testPair() { + Metapb.Region r = + Metapb.Region.newBuilder() + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(2)) + .setId(233) + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .addPeers(Peer.getDefaultInstance()) + .build(); + List s = + ImmutableList.of( + Metapb.Store.newBuilder() + .setAddress(LOCAL_ADDR + ":" + 4000) + .setVersion("5.0.0") + .setId(1) + .build()); + + TiRegion region = + new TiRegion( + session.getConf(), + r, + r.getPeers(0), + r.getPeersList(), + s.stream().map(TiStore::new).collect(Collectors.toList())); + TiStore store = new TiStore(s.get(0)); + + Map, List> groupKeyMap = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + Pair pair = Pair.create(region, store); + groupKeyMap + .computeIfAbsent(pair, e -> new ArrayList<>()) + .add(ByteString.copyFromUtf8("test")); + } + Pair pair = Pair.create(region, store); + assert (groupKeyMap.get(pair).size() == 10); + } +} From 3137615b44eb07238c97eaf11de6233a96ac1e39 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 6 Mar 2023 11:43:02 +0800 Subject: [PATCH 6/7] fmt Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/PDClient.java | 3 ++- src/main/java/org/tikv/common/ReadOnlyPDClient.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index e74601af233..e3695b69161 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -437,7 +437,8 @@ public Long updateServiceGCSafePoint( return callWithRetry( backOffer, PDGrpc.getUpdateServiceGCSafePointMethod(), - buildUpdateServiceGCSafePointRequest(ByteString.copyFromUtf8(serviceId), ttl, safePoint), + buildUpdateServiceGCSafePointRequest( + ByteString.copyFromUtf8(serviceId), ttl, safePoint), new PDErrorHandler<>( r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this)) diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 88dd389e8d9..983a1e09acf 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -78,8 +78,10 @@ List scanRegions( * * @param serviceId ServiceId * @param ttl TTL in seconds - * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good practise - * @return the MinSafePoint of all services. If this value is greater than safePoint, it means update failedmvn + * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good + * practise + * @return the MinSafePoint of all services. If this value is greater than safePoint, it means + * update failedmvn */ Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); } From 2cf61fb17dcd19b53658a188cb3ab5013e21c579 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Mon, 6 Mar 2023 11:49:06 +0800 Subject: [PATCH 7/7] typo Signed-off-by: shiyuhang <1136742008@qq.com> --- src/main/java/org/tikv/common/ReadOnlyPDClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/tikv/common/ReadOnlyPDClient.java b/src/main/java/org/tikv/common/ReadOnlyPDClient.java index 983a1e09acf..58ad9b2a626 100644 --- a/src/main/java/org/tikv/common/ReadOnlyPDClient.java +++ b/src/main/java/org/tikv/common/ReadOnlyPDClient.java @@ -79,9 +79,9 @@ List scanRegions( * @param serviceId ServiceId * @param ttl TTL in seconds * @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good - * practise + * practice * @return the MinSafePoint of all services. If this value is greater than safePoint, it means - * update failedmvn + * update failed */ Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer); }