diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java index 889252b..0f575e6 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/CeresDBClient.java @@ -293,7 +293,8 @@ private static RpcClient initRpcClient(final CeresDBOptions opts) { private static RouterClient initRouteClient(final CeresDBOptions opts, final RpcClient rpcClient) { final RouterOptions routerOpts = opts.getRouterOptions(); routerOpts.setRpcClient(rpcClient); - final RouterClient routerClient = new RouterClient(); + final RouterClient routerClient = routerOpts.getRouteMode().equals(RouteMode.CLUSTER) ? new RouterClient() : + new StandaloneRouterClient(); if (!routerClient.init(routerOpts)) { throw new IllegalStateException("Fail to start router client"); } diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java new file mode 100644 index 0000000..605bca5 --- /dev/null +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouteMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.ceresdb; + +/** + * route mode + * @author lee + * @version : RouteMode.java, v 0.1 2023.01.17 14:23 lee Exp $ + */ +public enum RouteMode { + + /** + * In this mode, the client does not cache routing information and each request is proxied through the server to the correct server + */ + STANDALONE, + + /** + * In this mode, the client cache routing information. Client find the correct server firstly, and then request to the correct server directly. + */ + CLUSTER +} \ No newline at end of file diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index 374d0b3..69a0dd0 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -80,10 +80,10 @@ public class RouterClient implements Lifecycle, Display, Iterable private ScheduledExecutorService cleaner; private ScheduledExecutorService refresher; - private RouterOptions opts; - private RpcClient rpcClient; - private RouterByMetrics router; - private InnerMetrics metrics; + protected RouterOptions opts; + protected RpcClient rpcClient; + protected RouterByMetrics router; + protected InnerMetrics metrics; private final ConcurrentMap routeCache = new ConcurrentHashMap<>(); diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java new file mode 100644 index 0000000..4f6e0cb --- /dev/null +++ b/ceresdb-protocol/src/main/java/io/ceresdb/StandaloneRouterClient.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.ceresdb; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * A route rpc client which cached nothing about table information, and return + * clusterAddress directly + * @author lee + */ +public class StandaloneRouterClient extends RouterClient { + + public CompletableFuture> routeFor(final Collection metrics) { + if (metrics == null || metrics.isEmpty()) { + return Utils.completedCf(Collections.emptyMap()); + } + + final Map routeMap = new HashMap<>(); + + metrics.forEach(metric -> { + Route route = new Route(); + route.setEndpoint(this.opts.getClusterAddress()); + route.setMetric(metric); + routeMap.put(metric, route); + }); + return Utils.completedCf(routeMap); + + } + +} diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java index f4fb1b7..b6e2bbd 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/options/CeresDBOptions.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import io.ceresdb.LimitedPolicy; +import io.ceresdb.RouteMode; import io.ceresdb.common.Copiable; import io.ceresdb.common.Endpoint; import io.ceresdb.common.Tenant; @@ -237,6 +238,11 @@ public static final class Builder { // all route tables are refreshed every 30 seconds. private long routeTableRefreshPeriodSeconds = 30; + /** Route mode for request + @see RouteMode + **/ + private RouteMode routeMode = RouteMode.CLUSTER; + public Builder(Endpoint clusterAddress) { this.clusterAddress = clusterAddress; } @@ -432,6 +438,18 @@ public Builder routeTableRefreshPeriodSeconds(final long routeTableRefreshPeriod return this; } + /** + * Route mode for request + * @see RouteMode + * + * @param routeMode route mode for request + * @return this builder + */ + public Builder routeMode(final RouteMode routeMode) { + this.routeMode = routeMode; + return this; + } + /** * A good start, happy coding. * @@ -449,6 +467,8 @@ public CeresDBOptions build() { opts.routerOptions.setMaxCachedSize(this.routeTableMaxCachedSize); opts.routerOptions.setGcPeriodSeconds(this.routeTableGcPeriodSeconds); opts.routerOptions.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds); + opts.routerOptions.setRouteMode(this.routeMode); + opts.writeOptions = new WriteOptions(); opts.writeOptions.setMaxWriteSize(this.maxWriteSize); opts.writeOptions.setMaxRetries(this.writeMaxRetries); diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java b/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java index 1e5bf92..3ec8e3a 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/options/RouterOptions.java @@ -16,6 +16,7 @@ */ package io.ceresdb.options; +import io.ceresdb.RouteMode; import io.ceresdb.common.Copiable; import io.ceresdb.common.Endpoint; import io.ceresdb.rpc.RpcClient; @@ -38,6 +39,8 @@ public class RouterOptions implements Copiable { // all route tables are refreshed every 30 seconds. private long refreshPeriodSeconds = 30; + private RouteMode routeMode = RouteMode.CLUSTER; + public RpcClient getRpcClient() { return rpcClient; } @@ -78,6 +81,14 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) { this.refreshPeriodSeconds = refreshPeriodSeconds; } + public RouteMode getRouteMode() { + return routeMode; + } + + public void setRouteMode(RouteMode routeMode) { + this.routeMode = routeMode; + } + @Override public RouterOptions copy() { final RouterOptions opts = new RouterOptions(); @@ -86,6 +97,7 @@ public RouterOptions copy() { opts.maxCachedSize = this.maxCachedSize; opts.gcPeriodSeconds = this.gcPeriodSeconds; opts.refreshPeriodSeconds = this.refreshPeriodSeconds; + opts.routeMode = this.routeMode; return opts; } @@ -97,6 +109,8 @@ public String toString() { ", maxCachedSize=" + maxCachedSize + // ", gcPeriodSeconds=" + gcPeriodSeconds + // ", refreshPeriodSeconds=" + refreshPeriodSeconds + // + ", routeMode=" + routeMode + // '}'; } + } diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index 574a847..bf48c64 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -19,8 +19,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import io.ceresdb.CeresDBClient; - import org.junit.After; import org.junit.Assert; import org.junit.Before;