diff --git a/ceresdb-example/src/test/java/io/ceresdb/CeresDBTest.java b/ceresdb-example/src/test/java/io/ceresdb/CeresDBTest.java index 39e9e48..f4a4c4d 100644 --- a/ceresdb-example/src/test/java/io/ceresdb/CeresDBTest.java +++ b/ceresdb-example/src/test/java/io/ceresdb/CeresDBTest.java @@ -3,12 +3,10 @@ */ package io.ceresdb; -import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/Route.java b/ceresdb-protocol/src/main/java/io/ceresdb/Route.java index 5834bc8..2793367 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/Route.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/Route.java @@ -9,7 +9,7 @@ import io.ceresdb.common.util.Clock; /** - * Route info for metric. + * Route info for table. * */ public class Route { @@ -18,21 +18,21 @@ public class Route { private Object ext; private final AtomicLong lastHit = new AtomicLong(Clock.defaultClock().getTick()); - public static Route invalid(final String metric) { - throw new IllegalStateException("Unexpected, invalid route for metric: " + metric); + public static Route invalid(final String table) { + throw new IllegalStateException("Unexpected, invalid route for table: " + table); } public static Route of(final Endpoint endpoint) { return of(null, endpoint, null); } - public static Route of(final String metric, final Endpoint endpoint) { - return of(metric, endpoint, null); + public static Route of(final String table, final Endpoint endpoint) { + return of(table, endpoint, null); } - public static Route of(final String metric, final Endpoint endpoint, final Object ext) { + public static Route of(final String table, final Endpoint endpoint, final Object ext) { final Route r = new Route(); - r.table = metric; + r.table = table; r.endpoint = endpoint; r.ext = ext; return r; diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java index 20a6e11..267f0a9 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/RouterClient.java @@ -184,6 +184,9 @@ public CompletableFuture> routeFor(final RequestContext reqCt ret = remote; } else { local.putAll(remote); + for (String miss : misses) { + local.putIfAbsent(miss, Route.of(miss, opts.getClusterAddress())); + } ret = local; } return ret; @@ -197,7 +200,6 @@ public CompletableFuture> routeFor(final RequestContext reqCt public CompletableFuture> routeRefreshFor(final RequestContext reqCtx, final Collection tables) { - final long startCall = Clock.defaultClock().getTick(); return this.router.routeFor(reqCtx, tables).whenComplete((remote, err) -> { if (err == null) { this.routeCache.putAll(remote); diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java index 20f3d76..af2d27b 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java @@ -149,7 +149,7 @@ public StreamWriteBuf streamWrite(final RequestContext reqCtx, f final CompletableFuture respFuture = new CompletableFuture<>(); - return this.routerClient.routeFor(reqCtx, Collections.singleton(table)) + return this.routerClient.routeFor(finalReqCtx, Collections.singleton(table)) .thenApply(routes -> routes.values().stream().findFirst().orElseGet(() -> Route.invalid(table))) .thenApply(route -> streamWriteTo(route, finalReqCtx, ctx, Utils.toUnaryObserver(respFuture))) .thenApply(reqObserver -> new StreamWriteBuf() {