From fa893a83efcea965397d5287274204a3000645e4 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 17:52:05 +0800 Subject: [PATCH 01/10] add ceresdb standalone route mode --- .../main/java/io/ceresdb/CeresDBClient.java | 55 ++++++++++--------- .../src/main/java/io/ceresdb/RouteMode.java | 26 +++++++++ .../main/java/io/ceresdb/RouterClient.java | 2 +- .../io/ceresdb/StandaloneRouterClient.java | 52 ++++++++++++++++++ .../io/ceresdb/options/RouterOptions.java | 14 +++++ .../java/io/ceresdb/CeresDBClientTest.java | 12 +++- 6 files changed, 130 insertions(+), 31 deletions(-) create mode 100644 ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java create mode 100644 ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index 889252b..81f1667 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -60,7 +60,7 @@ * * @author jiachun.fjc */ -public class CeresDBClient implements Write, Query, Lifecycle, Display { +public class CeresDBClient implements io.ceresdb.Write, Query, Lifecycle, Display { private static final Logger LOG = LoggerFactory.getLogger(CeresDBClient.class); @@ -74,12 +74,12 @@ public class CeresDBClient implements Write, Query, Lifecycle, D private final AtomicBoolean started = new AtomicBoolean(false); private CeresDBOptions opts; - private RouterClient routerClient; - private WriteClient writeClient; - private QueryClient queryClient; + private io.ceresdb.RouterClient routerClient; + private io.ceresdb.WriteClient writeClient; + private io.ceresdb.QueryClient queryClient; // CeresDBClient is only intended to manage the instance and does not // intend to broker any of its behavior - private Management management; + private io.ceresdb.Management management; // Note: We do not close it to free resources, as we view it as shared private Executor asyncWritePool; private Executor asyncReadPool; @@ -88,9 +88,9 @@ public class CeresDBClient implements Write, Query, Lifecycle, D // load all signal handlers SignalHandlersLoader.load(); // register all rpc service - RpcServiceRegister.registerStorageService(); + io.ceresdb.RpcServiceRegister.registerStorageService(); // start scheduled metric reporter - MetricsUtil.startScheduledReporter(Utils.autoReportPeriodMin(), TimeUnit.MINUTES); + MetricsUtil.startScheduledReporter(io.ceresdb.Utils.autoReportPeriodMin(), TimeUnit.MINUTES); Runtime.getRuntime().addShutdownHook(new Thread(MetricsUtil::stopScheduledReporterAndDestroy)); } @@ -116,7 +116,7 @@ public boolean init(final CeresDBOptions opts) { INSTANCES.put(this.id, this); - Utils.scheduleDisplaySelf(this, new LogPrinter(LOG)); + io.ceresdb.Utils.scheduleDisplaySelf(this, new LogPrinter(LOG)); return true; } @@ -161,7 +161,7 @@ public CompletableFuture> write(final Collection data } @Override - public StreamWriteBuf streamWrite(final String metric, final Context ctx) { + public io.ceresdb.StreamWriteBuf streamWrite(final String metric, final Context ctx) { ensureInitialized(); return this.writeClient.streamWrite(metric, attachCtx(ctx)); } @@ -182,7 +182,7 @@ public boolean hasManagement() { return this.management != null; } - public Management management() { + public io.ceresdb.Management management() { return this.management; } @@ -211,15 +211,15 @@ public Executor asyncReadPool() { return this.asyncReadPool; } - public RouterClient routerClient() { + public io.ceresdb.RouterClient routerClient() { return this.routerClient; } - public WriteClient writeClient() { + public io.ceresdb.WriteClient writeClient() { return this.writeClient; } - public QueryClient queryClient() { + public io.ceresdb.QueryClient queryClient() { return this.queryClient; } @@ -290,51 +290,52 @@ private static RpcClient initRpcClient(final CeresDBOptions opts) { return rpcClient; } - private static RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { + private static io.ceresdb.RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - final RouterClient routerClient = new RouterClient(); + + final io.ceresdb.RouterClient routerClient = routerOpts.getRouteMode().equals(io.ceresdb.RouteMode.STANDALONE)?new io.ceresdb.StandaloneRouterClient():new io.ceresdb.RouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } return routerClient; } - private static WriteClient initWriteClient(final CeresDBOptions opts, // - final RouterClient routerClient, // - final Executor asyncPool) { + private static io.ceresdb.WriteClient initWriteClient(final CeresDBOptions opts, // + final io.ceresdb.RouterClient routerClient, // + final Executor asyncPool) { final WriteOptions writeOpts = opts.getWriteOptions(); writeOpts.setRoutedClient(routerClient); writeOpts.setAsyncPool(asyncPool); - final WriteClient writeClient = new WriteClient(); + final io.ceresdb.WriteClient writeClient = new io.ceresdb.WriteClient(); if (!writeClient.init(writeOpts)) { throw new IllegalStateException("Fail to start write client"); } return writeClient; } - private static QueryClient initQueryClient(final CeresDBOptions opts, // - final RouterClient routerClient, // - final Executor asyncPool) { + private static io.ceresdb.QueryClient initQueryClient(final CeresDBOptions opts, // + final io.ceresdb.RouterClient routerClient, // + final Executor asyncPool) { final QueryOptions queryOpts = opts.getQueryOptions(); queryOpts.setRouterClient(routerClient); queryOpts.setAsyncPool(asyncPool); - final QueryClient queryClient = new QueryClient(); + final io.ceresdb.QueryClient queryClient = new io.ceresdb.QueryClient(); if (!queryClient.init(queryOpts)) { throw new IllegalStateException("Fail to start query client"); } return queryClient; } - private static Management initManagementClient(final CeresDBOptions opts, final RouterClient routerClient) { + private static io.ceresdb.Management initManagementClient(final CeresDBOptions opts, final io.ceresdb.RouterClient routerClient) { final ManagementOptions mOpts = opts.getManagementOptions(); if (mOpts == null) { return null; } - if (!CeresDBManagementProvider.hasManagement()) { + if (!io.ceresdb.CeresDBManagementProvider.hasManagement()) { return null; } - final Management management = CeresDBManagementProvider.createManagement(); + final io.ceresdb.Management management = io.ceresdb.CeresDBManagementProvider.createManagement(); mOpts.setRouterClient(routerClient); if (!management.init(mOpts)) { return null; @@ -344,7 +345,7 @@ private static Management initManagementClient(final CeresDBOptions opts, final private static String loadVersion() { try { - return Utils // + return io.ceresdb.Utils // .loadProperties(CeresDBClient.class.getClassLoader(), "client_version.properties") // .getProperty(VERSION_KEY, "Unknown version"); } catch (final Exception ignored) { diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java new file mode 100644 index 0000000..269dcbc --- /dev/null +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java @@ -0,0 +1,26 @@ +/** + * Alipay.com Inc. Copyright (c) 2004-2017 All Rights Reserved. CreatedBy: RouteMode.java CreatedDate: 2023年01月17日 14:23 ticketNumber: (aone + * link) comments: Description + *

+ * codeReviewBy: codeReviewDate: comments: + *

+ * modifiedBy: modifiedDate: comments: + */ +package io.ceresdb; + +/** + * define route mode + * @author lee + * @version : RouteMode.java, v 0.1 2023年01月17日 14:23 lee Exp $ + */ +public enum RouteMode{ + /** + * In this mode, client request to a server directly, and the server proxy the request to the correct server. + */ + STANDALONE, + + /** + * In this mode, the client will find the correct server first, and then request to the server. + */ + CLUSTER +} \ No newline at end of file diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index 374d0b3..c3d8000 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -80,7 +80,7 @@ public class RouterClient implements Lifecycle, Display, Iterable private ScheduledExecutorService cleaner; private ScheduledExecutorService refresher; - private RouterOptions opts; + protected RouterOptions opts; private RpcClient rpcClient; private RouterByMetrics router; private InnerMetrics metrics; diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java new file mode 100644 index 0000000..d2d5d3a --- /dev/null +++ b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.ceresdb; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * A route rpc client which cached nothing about table information, and return + * clusterAddress directly + * @author lee + */ +public class StandaloneRouterClient extends RouterClient { + + + public CompletableFuture> routeFor(final Collection metrics) { + if (metrics == null || metrics.isEmpty()) { + return Utils.completedCf(Collections.emptyMap()); + } + + final Map routeMap = new HashMap<>(); + + metrics.forEach(metric -> { + Route route = new Route(); + route.setEndpoint(this.opts.getClusterAddress()); + route.setMetric(metric); + routeMap.put(metric, route); + }); + return Utils.completedCf(routeMap); + + } + + + +} diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java b/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java index 1e5bf92..3ec8e3a 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java @@ -16,6 +16,7 @@ */ package io.ceresdb.options; +import io.ceresdb.RouteMode; import io.ceresdb.common.Copiable; import io.ceresdb.common.Endpoint; import io.ceresdb.rpc.RpcClient; @@ -38,6 +39,8 @@ public class RouterOptions implements Copiable { // all route tables are refreshed every 30 seconds. private long refreshPeriodSeconds = 30; + private RouteMode routeMode = RouteMode.CLUSTER; + public RpcClient getRpcClient() { return rpcClient; } @@ -78,6 +81,14 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) { this.refreshPeriodSeconds = refreshPeriodSeconds; } + public RouteMode getRouteMode() { + return routeMode; + } + + public void setRouteMode(RouteMode routeMode) { + this.routeMode = routeMode; + } + @Override public RouterOptions copy() { final RouterOptions opts = new RouterOptions(); @@ -86,6 +97,7 @@ public RouterOptions copy() { opts.maxCachedSize = this.maxCachedSize; opts.gcPeriodSeconds = this.gcPeriodSeconds; opts.refreshPeriodSeconds = this.refreshPeriodSeconds; + opts.routeMode = this.routeMode; return opts; } @@ -97,6 +109,8 @@ public String toString() { ", maxCachedSize=" + maxCachedSize + // ", gcPeriodSeconds=" + gcPeriodSeconds + // ", refreshPeriodSeconds=" + refreshPeriodSeconds + // + ", routeMode=" + routeMode + // '}'; } + } diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index 574a847..2ecd2f6 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -21,6 +21,8 @@ import io.ceresdb.CeresDBClient; +import io.ceresdb.options.RouterOptions; +import io.ceresdb.rpc.RpcOptions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,13 +49,13 @@ @RunWith(value = MockitoJUnitRunner.class) public class CeresDBClientTest { - private static final ReferenceFieldUpdater WC_UPDATER = Updaters // + private static final ReferenceFieldUpdater WC_UPDATER = Updaters // .newReferenceFieldUpdater(CeresDBClient.class, "writeClient"); private CeresDBClient client; private CeresDBOptions opts; @Mock - private WriteClient writeClient; + private io.ceresdb.WriteClient writeClient; @Before public void before() { @@ -62,6 +64,10 @@ public void before() { .writeMaxRetries(1) // .readMaxRetries(1) // .build(); + RouterOptions routerOptions = new RouterOptions(); + // set RouteMode to CLUSTER + routerOptions.setRouteMode(io.ceresdb.RouteMode.CLUSTER); + this.opts.setRouterOptions(routerOptions); this.client = new CeresDBClient(); } @@ -99,7 +105,7 @@ public void helloWorldTest() throws ExecutionException, InterruptedException { final Rows rows = TestUtil.newRow("test_metric1"); Mockito.when(this.writeClient.write(Mockito.anyList(), Mockito.any())) // - .thenReturn(Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult())); + .thenReturn(io.ceresdb.Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult())); final CompletableFuture> f = this.client.write(rows); final Result ret = f.get(); Assert.assertTrue(ret.isOk()); From d3373e397338fb3469fc2c14f6484dd5f8b28184 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 18:14:01 +0800 Subject: [PATCH 02/10] format --- .../main/java/io/ceresdb/CeresDBClient.java | 61 ++++++++----------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index 81f1667..da65be0 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -73,16 +73,16 @@ public class CeresDBClient implements io.ceresdb.Write, Query, Lifecycle> write(final Collection data, final Context ctx) { + @Override public CompletableFuture> write(final Collection data, final Context ctx) { ensureInitialized(); return this.writeClient.write(data, attachCtx(ctx)); } - @Override - public io.ceresdb.StreamWriteBuf streamWrite(final String metric, final Context ctx) { + @Override public io.ceresdb.StreamWriteBuf streamWrite(final String metric, final Context ctx) { ensureInitialized(); return this.writeClient.streamWrite(metric, attachCtx(ctx)); } - @Override - public CompletableFuture> query(final QueryRequest req, final Context ctx) { + @Override public CompletableFuture> query(final QueryRequest req, final Context ctx) { ensureInitialized(); return this.queryClient.query(req, attachCtx(ctx)); } - @Override - public void streamQuery(final QueryRequest req, final Context ctx, final Observer observer) { + @Override public void streamQuery(final QueryRequest req, final Context ctx, final Observer observer) { ensureInitialized(); this.queryClient.streamQuery(req, attachCtx(ctx), observer); } @@ -223,8 +216,7 @@ public io.ceresdb.QueryClient queryClient() { return this.queryClient; } - @Override - public void display(final Printer out) { + @Override public void display(final Printer out) { out.println("--- CeresDBClient ---") // .print("id=") // .println(this.id) // @@ -262,8 +254,7 @@ public void display(final Printer out) { out.println(""); } - @Override - public String toString() { + @Override public String toString() { return "CeresDBClient{" + // "id=" + id + // "version=" + version() + // @@ -294,7 +285,9 @@ private static io.ceresdb.RouterClient initRouteClient(final CeresDBOptions opts final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - final io.ceresdb.RouterClient routerClient = routerOpts.getRouteMode().equals(io.ceresdb.RouteMode.STANDALONE)?new io.ceresdb.StandaloneRouterClient():new io.ceresdb.RouterClient(); + final io.ceresdb.RouterClient routerClient = routerOpts.getRouteMode().equals(io.ceresdb.RouteMode.STANDALONE) ? + new io.ceresdb.StandaloneRouterClient() : + new io.ceresdb.RouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } @@ -327,7 +320,8 @@ private static io.ceresdb.QueryClient initQueryClient(final CeresDBOptions opts, return queryClient; } - private static io.ceresdb.Management initManagementClient(final CeresDBOptions opts, final io.ceresdb.RouterClient routerClient) { + private static io.ceresdb.Management initManagementClient(final CeresDBOptions opts, + final io.ceresdb.RouterClient routerClient) { final ManagementOptions mOpts = opts.getManagementOptions(); if (mOpts == null) { return null; @@ -358,22 +352,19 @@ static final class RpcConnectionObserver implements RpcClient.ConnectionObserver static final Counter CONN_COUNTER = MetricsUtil.counter("connection_counter"); static final Meter CONN_FAILURES = MetricsUtil.meter("connection_failures"); - @Override - public void onReady(final Endpoint ep) { + @Override public void onReady(final Endpoint ep) { CONN_COUNTER.inc(); MetricsUtil.counter("connection_counter", ep).inc(); } - @Override - public void onFailure(final Endpoint ep) { + @Override public void onFailure(final Endpoint ep) { CONN_COUNTER.dec(); CONN_FAILURES.mark(); MetricsUtil.counter("connection_counter", ep).dec(); MetricsUtil.meter("connection_failures", ep).mark(); } - @Override - public void onShutdown(final Endpoint ep) { + @Override public void onShutdown(final Endpoint ep) { CONN_COUNTER.dec(); MetricsUtil.counter("connection_counter", ep).dec(); } @@ -397,14 +388,12 @@ static final class LogPrinter implements Display.Printer { this.logger = logger; } - @Override - public synchronized Printer print(final Object x) { + @Override public synchronized Printer print(final Object x) { this.buf.append(x); return this; } - @Override - public synchronized Printer println(final Object x) { + @Override public synchronized Printer println(final Object x) { this.buf.append(x); this.logger.info(this.buf.toString()); truncateBuf(); From 6303ef94abd872eb238835300d7ecc3c1ee40bfd Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 18:28:33 +0800 Subject: [PATCH 03/10] format --- .../main/java/io/ceresdb/CeresDBClient.java | 104 ++++++++++-------- .../main/java/io/ceresdb/RouterClient.java | 38 +++---- .../io/ceresdb/StandaloneRouterClient.java | 3 - 3 files changed, 72 insertions(+), 73 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index da65be0..889252b 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -60,7 +60,7 @@ * * @author jiachun.fjc */ -public class CeresDBClient implements io.ceresdb.Write, Query, Lifecycle, Display { +public class CeresDBClient implements Write, Query, Lifecycle, Display { private static final Logger LOG = LoggerFactory.getLogger(CeresDBClient.class); @@ -73,24 +73,24 @@ public class CeresDBClient implements io.ceresdb.Write, Query, Lifecycle> write(final Collection data, final Context ctx) { + @Override + public CompletableFuture> write(final Collection data, final Context ctx) { ensureInitialized(); return this.writeClient.write(data, attachCtx(ctx)); } - @Override public io.ceresdb.StreamWriteBuf streamWrite(final String metric, final Context ctx) { + @Override + public StreamWriteBuf streamWrite(final String metric, final Context ctx) { ensureInitialized(); return this.writeClient.streamWrite(metric, attachCtx(ctx)); } - @Override public CompletableFuture> query(final QueryRequest req, final Context ctx) { + @Override + public CompletableFuture> query(final QueryRequest req, final Context ctx) { ensureInitialized(); return this.queryClient.query(req, attachCtx(ctx)); } - @Override public void streamQuery(final QueryRequest req, final Context ctx, final Observer observer) { + @Override + public void streamQuery(final QueryRequest req, final Context ctx, final Observer observer) { ensureInitialized(); this.queryClient.streamQuery(req, attachCtx(ctx), observer); } @@ -175,7 +182,7 @@ public boolean hasManagement() { return this.management != null; } - public io.ceresdb.Management management() { + public Management management() { return this.management; } @@ -204,19 +211,20 @@ public Executor asyncReadPool() { return this.asyncReadPool; } - public io.ceresdb.RouterClient routerClient() { + public RouterClient routerClient() { return this.routerClient; } - public io.ceresdb.WriteClient writeClient() { + public WriteClient writeClient() { return this.writeClient; } - public io.ceresdb.QueryClient queryClient() { + public QueryClient queryClient() { return this.queryClient; } - @Override public void display(final Printer out) { + @Override + public void display(final Printer out) { out.println("--- CeresDBClient ---") // .print("id=") // .println(this.id) // @@ -254,7 +262,8 @@ public io.ceresdb.QueryClient queryClient() { out.println(""); } - @Override public String toString() { + @Override + public String toString() { return "CeresDBClient{" + // "id=" + id + // "version=" + version() + // @@ -281,55 +290,51 @@ private static RpcClient initRpcClient(final CeresDBOptions opts) { return rpcClient; } - private static io.ceresdb.RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { + private static RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - - final io.ceresdb.RouterClient routerClient = routerOpts.getRouteMode().equals(io.ceresdb.RouteMode.STANDALONE) ? - new io.ceresdb.StandaloneRouterClient() : - new io.ceresdb.RouterClient(); + final RouterClient routerClient = new RouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } return routerClient; } - private static io.ceresdb.WriteClient initWriteClient(final CeresDBOptions opts, // - final io.ceresdb.RouterClient routerClient, // - final Executor asyncPool) { + private static WriteClient initWriteClient(final CeresDBOptions opts, // + final RouterClient routerClient, // + final Executor asyncPool) { final WriteOptions writeOpts = opts.getWriteOptions(); writeOpts.setRoutedClient(routerClient); writeOpts.setAsyncPool(asyncPool); - final io.ceresdb.WriteClient writeClient = new io.ceresdb.WriteClient(); + final WriteClient writeClient = new WriteClient(); if (!writeClient.init(writeOpts)) { throw new IllegalStateException("Fail to start write client"); } return writeClient; } - private static io.ceresdb.QueryClient initQueryClient(final CeresDBOptions opts, // - final io.ceresdb.RouterClient routerClient, // - final Executor asyncPool) { + private static QueryClient initQueryClient(final CeresDBOptions opts, // + final RouterClient routerClient, // + final Executor asyncPool) { final QueryOptions queryOpts = opts.getQueryOptions(); queryOpts.setRouterClient(routerClient); queryOpts.setAsyncPool(asyncPool); - final io.ceresdb.QueryClient queryClient = new io.ceresdb.QueryClient(); + final QueryClient queryClient = new QueryClient(); if (!queryClient.init(queryOpts)) { throw new IllegalStateException("Fail to start query client"); } return queryClient; } - private static io.ceresdb.Management initManagementClient(final CeresDBOptions opts, - final io.ceresdb.RouterClient routerClient) { + private static Management initManagementClient(final CeresDBOptions opts, final RouterClient routerClient) { final ManagementOptions mOpts = opts.getManagementOptions(); if (mOpts == null) { return null; } - if (!io.ceresdb.CeresDBManagementProvider.hasManagement()) { + if (!CeresDBManagementProvider.hasManagement()) { return null; } - final io.ceresdb.Management management = io.ceresdb.CeresDBManagementProvider.createManagement(); + final Management management = CeresDBManagementProvider.createManagement(); mOpts.setRouterClient(routerClient); if (!management.init(mOpts)) { return null; @@ -339,7 +344,7 @@ private static io.ceresdb.Management initManagementClient(final CeresDBOptions o private static String loadVersion() { try { - return io.ceresdb.Utils // + return Utils // .loadProperties(CeresDBClient.class.getClassLoader(), "client_version.properties") // .getProperty(VERSION_KEY, "Unknown version"); } catch (final Exception ignored) { @@ -352,19 +357,22 @@ static final class RpcConnectionObserver implements RpcClient.ConnectionObserver static final Counter CONN_COUNTER = MetricsUtil.counter("connection_counter"); static final Meter CONN_FAILURES = MetricsUtil.meter("connection_failures"); - @Override public void onReady(final Endpoint ep) { + @Override + public void onReady(final Endpoint ep) { CONN_COUNTER.inc(); MetricsUtil.counter("connection_counter", ep).inc(); } - @Override public void onFailure(final Endpoint ep) { + @Override + public void onFailure(final Endpoint ep) { CONN_COUNTER.dec(); CONN_FAILURES.mark(); MetricsUtil.counter("connection_counter", ep).dec(); MetricsUtil.meter("connection_failures", ep).mark(); } - @Override public void onShutdown(final Endpoint ep) { + @Override + public void onShutdown(final Endpoint ep) { CONN_COUNTER.dec(); MetricsUtil.counter("connection_counter", ep).dec(); } @@ -388,12 +396,14 @@ static final class LogPrinter implements Display.Printer { this.logger = logger; } - @Override public synchronized Printer print(final Object x) { + @Override + public synchronized Printer print(final Object x) { this.buf.append(x); return this; } - @Override public synchronized Printer println(final Object x) { + @Override + public synchronized Printer println(final Object x) { this.buf.append(x); this.logger.info(this.buf.toString()); truncateBuf(); diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index c3d8000..64cccb4 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -81,9 +81,9 @@ public class RouterClient implements Lifecycle, Display, Iterable private ScheduledExecutorService refresher; protected RouterOptions opts; - private RpcClient rpcClient; - private RouterByMetrics router; - private InnerMetrics metrics; + private RpcClient rpcClient; + private RouterByMetrics router; + private InnerMetrics metrics; private final ConcurrentMap routeCache = new ConcurrentHashMap<>(); @@ -130,8 +130,7 @@ Timer refreshTimer() { } } - @Override - public boolean init(final RouterOptions opts) { + @Override public boolean init(final RouterOptions opts) { this.opts = Requires.requireNonNull(opts, "RouterClient.opts").copy(); this.rpcClient = this.opts.getRpcClient(); @@ -160,8 +159,7 @@ public boolean init(final RouterOptions opts) { return true; } - @Override - public void shutdownGracefully() { + @Override public void shutdownGracefully() { if (this.rpcClient != null) { this.rpcClient.shutdownGracefully(); } @@ -176,8 +174,7 @@ public void shutdownGracefully() { clearRouteCache(); } - @Override - public Iterator iterator() { + @Override public Iterator iterator() { return this.routeCache.values().iterator(); } @@ -305,10 +302,10 @@ private int gc0(final int times) { } final List topK = TopKSelector.selectTopK( // - this.routeCache.entrySet(), // - itemsToGC, // - (o1, o2) -> -Long.compare(o1.getValue().getLastHit(), o2.getValue().getLastHit()) // - ) // + this.routeCache.entrySet(), // + itemsToGC, // + (o1, o2) -> -Long.compare(o1.getValue().getLastHit(), o2.getValue().getLastHit()) // + ) // .map(Map.Entry::getKey) // .collect(Collectors.toList()); @@ -342,13 +339,11 @@ public CompletableFuture invoke(final Endpoint endpoint, // try { this.rpcClient.invokeAsync(endpoint, request, ctx, new Observer() { - @Override - public void onNext(final Resp value) { + @Override public void onNext(final Resp value) { future.complete(value); } - @Override - public void onError(final Throwable err) { + @Override public void onError(final Throwable err) { future.completeExceptionally(err); } }, timeoutMs); @@ -391,8 +386,7 @@ private boolean checkConn(final Endpoint endpoint, final boolean create) { return this.rpcClient.checkConnection(endpoint, create); } - @Override - public void display(final Printer out) { + @Override public void display(final Printer out) { out.println("--- RouterClient ---") // .print("opts=") // .println(this.opts) // @@ -405,8 +399,7 @@ public void display(final Printer out) { } } - @Override - public String toString() { + @Override public String toString() { return "RouterClient{" + // "opts=" + opts + // ", rpcClient=" + rpcClient + // @@ -422,8 +415,7 @@ private RouterByMetrics(Endpoint endpoint) { this.endpoint = endpoint; } - @Override - public CompletableFuture> routeFor(final Collection request) { + @Override public CompletableFuture> routeFor(final Collection request) { if (request == null || request.isEmpty()) { return Utils.completedCf(Collections.emptyMap()); } diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java index d2d5d3a..4f6e0cb 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java @@ -29,7 +29,6 @@ */ public class StandaloneRouterClient extends RouterClient { - public CompletableFuture> routeFor(final Collection metrics) { if (metrics == null || metrics.isEmpty()) { return Utils.completedCf(Collections.emptyMap()); @@ -47,6 +46,4 @@ public CompletableFuture> routeFor(final Collection m } - - } From 655ad5b7737687e9046b7578a6739f64377f75ba Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 18:32:28 +0800 Subject: [PATCH 04/10] format --- ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index 64cccb4..3b657da 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -459,5 +459,6 @@ private Route toRouteObj(final Storage.Route r) { final Storage.Endpoint ep = Requires.requireNonNull(r.getEndpoint(), "CeresDB.Endpoint"); return Route.of(r.getMetric(), Endpoint.of(ep.getIp(), ep.getPort()), r.getExt()); } + } } From f4fef8c2e74aca0249926c2299cd0f4baae09ca0 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 18:43:46 +0800 Subject: [PATCH 05/10] format --- .../main/java/io/ceresdb/RouterClient.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index 3b657da..69a0dd0 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -81,9 +81,9 @@ public class RouterClient implements Lifecycle, Display, Iterable private ScheduledExecutorService refresher; protected RouterOptions opts; - private RpcClient rpcClient; - private RouterByMetrics router; - private InnerMetrics metrics; + protected RpcClient rpcClient; + protected RouterByMetrics router; + protected InnerMetrics metrics; private final ConcurrentMap routeCache = new ConcurrentHashMap<>(); @@ -130,7 +130,8 @@ Timer refreshTimer() { } } - @Override public boolean init(final RouterOptions opts) { + @Override + public boolean init(final RouterOptions opts) { this.opts = Requires.requireNonNull(opts, "RouterClient.opts").copy(); this.rpcClient = this.opts.getRpcClient(); @@ -159,7 +160,8 @@ Timer refreshTimer() { return true; } - @Override public void shutdownGracefully() { + @Override + public void shutdownGracefully() { if (this.rpcClient != null) { this.rpcClient.shutdownGracefully(); } @@ -174,7 +176,8 @@ Timer refreshTimer() { clearRouteCache(); } - @Override public Iterator iterator() { + @Override + public Iterator iterator() { return this.routeCache.values().iterator(); } @@ -302,10 +305,10 @@ private int gc0(final int times) { } final List topK = TopKSelector.selectTopK( // - this.routeCache.entrySet(), // - itemsToGC, // - (o1, o2) -> -Long.compare(o1.getValue().getLastHit(), o2.getValue().getLastHit()) // - ) // + this.routeCache.entrySet(), // + itemsToGC, // + (o1, o2) -> -Long.compare(o1.getValue().getLastHit(), o2.getValue().getLastHit()) // + ) // .map(Map.Entry::getKey) // .collect(Collectors.toList()); @@ -339,11 +342,13 @@ public CompletableFuture invoke(final Endpoint endpoint, // try { this.rpcClient.invokeAsync(endpoint, request, ctx, new Observer() { - @Override public void onNext(final Resp value) { + @Override + public void onNext(final Resp value) { future.complete(value); } - @Override public void onError(final Throwable err) { + @Override + public void onError(final Throwable err) { future.completeExceptionally(err); } }, timeoutMs); @@ -386,7 +391,8 @@ private boolean checkConn(final Endpoint endpoint, final boolean create) { return this.rpcClient.checkConnection(endpoint, create); } - @Override public void display(final Printer out) { + @Override + public void display(final Printer out) { out.println("--- RouterClient ---") // .print("opts=") // .println(this.opts) // @@ -399,7 +405,8 @@ private boolean checkConn(final Endpoint endpoint, final boolean create) { } } - @Override public String toString() { + @Override + public String toString() { return "RouterClient{" + // "opts=" + opts + // ", rpcClient=" + rpcClient + // @@ -415,7 +422,8 @@ private RouterByMetrics(Endpoint endpoint) { this.endpoint = endpoint; } - @Override public CompletableFuture> routeFor(final Collection request) { + @Override + public CompletableFuture> routeFor(final Collection request) { if (request == null || request.isEmpty()) { return Utils.completedCf(Collections.emptyMap()); } @@ -459,6 +467,5 @@ private Route toRouteObj(final Storage.Route r) { final Storage.Endpoint ep = Requires.requireNonNull(r.getEndpoint(), "CeresDB.Endpoint"); return Route.of(r.getMetric(), Endpoint.of(ep.getIp(), ep.getPort()), r.getExt()); } - } } From 26d1a63eaa016f257d1ea0f991d8c440ec079f07 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Tue, 17 Jan 2023 18:56:30 +0800 Subject: [PATCH 06/10] format --- .../src/main/java/io/ceresdb/RouteMode.java | 26 ++++++++++++------- .../java/io/ceresdb/CeresDBClientTest.java | 6 ++--- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java index 269dcbc..a2b3c5f 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java @@ -1,19 +1,27 @@ -/** - * Alipay.com Inc. Copyright (c) 2004-2017 All Rights Reserved. CreatedBy: RouteMode.java CreatedDate: 2023年01月17日 14:23 ticketNumber: (aone - * link) comments: Description - *

- * codeReviewBy: codeReviewDate: comments: - *

- * modifiedBy: modifiedDate: comments: +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.ceresdb; /** * define route mode * @author lee - * @version : RouteMode.java, v 0.1 2023年01月17日 14:23 lee Exp $ + * @version : RouteMode.java, v 0.1 2023锟斤拷01锟斤拷17锟斤拷 14:23 lee Exp $ */ -public enum RouteMode{ +public enum RouteMode { /** * In this mode, client request to a server directly, and the server proxy the request to the correct server. */ diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index 2ecd2f6..e189626 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -49,13 +49,13 @@ @RunWith(value = MockitoJUnitRunner.class) public class CeresDBClientTest { - private static final ReferenceFieldUpdater WC_UPDATER = Updaters // + private static final ReferenceFieldUpdater WC_UPDATER = Updaters // .newReferenceFieldUpdater(CeresDBClient.class, "writeClient"); private CeresDBClient client; private CeresDBOptions opts; @Mock - private io.ceresdb.WriteClient writeClient; + private WriteClient writeClient; @Before public void before() { @@ -105,7 +105,7 @@ public void helloWorldTest() throws ExecutionException, InterruptedException { final Rows rows = TestUtil.newRow("test_metric1"); Mockito.when(this.writeClient.write(Mockito.anyList(), Mockito.any())) // - .thenReturn(io.ceresdb.Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult())); + .thenReturn(Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult())); final CompletableFuture> f = this.client.write(rows); final Result ret = f.get(); Assert.assertTrue(ret.isOk()); From 20629d55ddbbb9544c65b01cb62f183a4f3b2d09 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Wed, 1 Feb 2023 21:13:34 +0800 Subject: [PATCH 07/10] standalone mode --- .../main/java/io/ceresdb/CeresDBClient.java | 2 +- .../src/main/java/io/ceresdb/RouteMode.java | 9 ++++---- .../io/ceresdb/options/CeresDBOptions.java | 21 +++++++++++++++++++ 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index 889252b..f5d9a79 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -293,7 +293,7 @@ private static RpcClient initRpcClient(final CeresDBOptions opts) { private static RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - final RouterClient routerClient = new RouterClient(); + final RouterClient routerClient = routerOpts.getRouteMode().equals(RouteMode.CLUSTER)?new RouterClient():new StandaloneRouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java index a2b3c5f..605bca5 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java @@ -17,18 +17,19 @@ package io.ceresdb; /** - * define route mode + * route mode * @author lee - * @version : RouteMode.java, v 0.1 2023锟斤拷01锟斤拷17锟斤拷 14:23 lee Exp $ + * @version : RouteMode.java, v 0.1 2023.01.17 14:23 lee Exp $ */ public enum RouteMode { + /** - * In this mode, client request to a server directly, and the server proxy the request to the correct server. + * In this mode, the client does not cache routing information and each request is proxied through the server to the correct server */ STANDALONE, /** - * In this mode, the client will find the correct server first, and then request to the server. + * In this mode, the client cache routing information. Client find the correct server firstly, and then request to the correct server directly. */ CLUSTER } \ No newline at end of file diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java index f4fb1b7..c301c5b 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import io.ceresdb.LimitedPolicy; +import io.ceresdb.RouteMode; import io.ceresdb.common.Copiable; import io.ceresdb.common.Endpoint; import io.ceresdb.common.Tenant; @@ -237,6 +238,12 @@ public static final class Builder { // all route tables are refreshed every 30 seconds. private long routeTableRefreshPeriodSeconds = 30; + + /** Route mode for request + @see RouteMode + **/ + private RouteMode routeMode = RouteMode.CLUSTER; + public Builder(Endpoint clusterAddress) { this.clusterAddress = clusterAddress; } @@ -432,6 +439,18 @@ public Builder routeTableRefreshPeriodSeconds(final long routeTableRefreshPeriod return this; } + /** + * Route mode for request + * @see RouteMode + * + * @param routeMode route mode for request + * @return this builder + */ + public Builder routeMode(final RouteMode routeMode) { + this.routeMode = routeMode; + return this; + } + /** * A good start, happy coding. * @@ -449,6 +468,8 @@ public CeresDBOptions build() { opts.routerOptions.setMaxCachedSize(this.routeTableMaxCachedSize); opts.routerOptions.setGcPeriodSeconds(this.routeTableGcPeriodSeconds); opts.routerOptions.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds); + opts.routerOptions.setRouteMode(this.routeMode); + opts.writeOptions = new WriteOptions(); opts.writeOptions.setMaxWriteSize(this.maxWriteSize); opts.writeOptions.setMaxRetries(this.writeMaxRetries); From 0230720a5d466b313385af88976fd1024391b1d8 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 2 Feb 2023 10:29:34 +0800 Subject: [PATCH 08/10] format --- ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java | 3 ++- .../src/main/java/io/ceresdb/options/CeresDBOptions.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index f5d9a79..0f575e6 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -293,7 +293,8 @@ private static RpcClient initRpcClient(final CeresDBOptions opts) { private static RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - final RouterClient routerClient = routerOpts.getRouteMode().equals(RouteMode.CLUSTER)?new RouterClient():new StandaloneRouterClient(); + final RouterClient routerClient = routerOpts.getRouteMode().equals(RouteMode.CLUSTER) ? new RouterClient() : + new StandaloneRouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java index c301c5b..b6e2bbd 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java @@ -238,7 +238,6 @@ public static final class Builder { // all route tables are refreshed every 30 seconds. private long routeTableRefreshPeriodSeconds = 30; - /** Route mode for request @see RouteMode **/ From a2a2f72260d8454cdd088d321af2f98c35381397 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 2 Feb 2023 10:39:44 +0800 Subject: [PATCH 09/10] test case --- .../src/test/java/io/ceresdb/CeresDBClientTest.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index e189626..138b124 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -19,10 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import io.ceresdb.CeresDBClient; -import io.ceresdb.options.RouterOptions; -import io.ceresdb.rpc.RpcOptions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -64,10 +61,6 @@ public void before() { .writeMaxRetries(1) // .readMaxRetries(1) // .build(); - RouterOptions routerOptions = new RouterOptions(); - // set RouteMode to CLUSTER - routerOptions.setRouteMode(io.ceresdb.RouteMode.CLUSTER); - this.opts.setRouterOptions(routerOptions); this.client = new CeresDBClient(); } From 9fe31ae1b4a8a94d6c1f35ba4027fda6e8460543 Mon Sep 17 00:00:00 2001 From: MachaelLee <690585471@qq.com> Date: Thu, 2 Feb 2023 10:40:47 +0800 Subject: [PATCH 10/10] format --- ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index 138b124..bf48c64 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -19,7 +19,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import org.junit.After; import org.junit.Assert; import org.junit.Before;