diff --git a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java index 11ccddb..c2f54ba 100644 --- a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java +++ b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java @@ -262,8 +262,8 @@ private long onReceived(final boolean onError) { MetricsUtil.timer(REQ_RT, method.getFullMethodName(), address).update(duration, TimeUnit.MILLISECONDS); if (onError) { - MetricsUtil.meter(REQ_FAILED, method).mark(); - MetricsUtil.meter(REQ_FAILED, method, address).mark(); + MetricsUtil.meter(REQ_FAILED, method.getFullMethodName()).mark(); + MetricsUtil.meter(REQ_FAILED, method.getFullMethodName(), address).mark(); } return duration; diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java index adceb11..e812fe9 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java @@ -160,24 +160,26 @@ private CompletableFuture> query0(final SqlQueryRequest } final Err err = r.getErr(); - LOG.warn("Failed to read from {}, retries={}, err={}.", Utils.DB_NAME, retries, err); - if (retries > this.opts.getMaxRetries()) { - LOG.error("Retried {} times still failed.", retries); - return Utils.completedCf(r); - } + LOG.warn("Failed to read from {}, err={}.", Utils.DB_NAME, err); // Should refresh route table final Set toRefresh = err.stream() // .filter(Utils::shouldRefreshRouteTable) // .flatMap(e -> e.getFailedTables().stream()) // .collect(Collectors.toSet()); + this.routerClient.clearRouteCacheBy(toRefresh); - if (toRefresh.isEmpty()) { + // Should not retry + if (Utils.shouldNotRetry(err)) { return Utils.completedCf(r); } // Async to refresh route info - return this.routerClient.routeRefreshFor(req.getReqCtx(), toRefresh) + if (retries > this.opts.getMaxRetries()) { + LOG.error("Retried {} times still failed.", retries); + return Utils.completedCf(r); + } + return this.routerClient.routeFor(req.getReqCtx(), toRefresh) .thenComposeAsync(routes -> query0(req, ctx, retries + 1), this.asyncPool); }, this.asyncPool); } diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java b/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java index 21b9b91..e73153b 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java @@ -234,11 +234,7 @@ private CompletableFuture> write0(final RequestContext reqC } final Err err = r.getErr(); - LOG.warn("Failed to write to {}, retries={}, err={}.", Utils.DB_NAME, retries, err); - if (retries + 1 > this.opts.getMaxRetries()) { - LOG.error("Retried {} times still failed.", retries); - return Utils.completedCf(r); - } + LOG.warn("Failed to write to {}, err={}.", Utils.DB_NAME, err); // Should refresh route table final Set toRefresh = err.stream() // @@ -246,27 +242,33 @@ private CompletableFuture> write0(final RequestContext reqC .flatMap(e -> e.getFailedWrites().stream()) // .map(Point::getTable) // .collect(Collectors.toSet()); + this.routerClient.clearRouteCacheBy(toRefresh); // Should retry final List pointsToRetry = err.stream() // .filter(Utils::shouldRetry) // .flatMap(e -> e.getFailedWrites().stream()) // .collect(Collectors.toList()); + if (pointsToRetry.isEmpty()) { + return Utils.completedCf(r); + } - // Should not retry - final Optional noRetryErr = err.stream() // - .filter(Utils::shouldNotRetry) // - .reduce(Err::combine); + if (retries + 1 > this.opts.getMaxRetries()) { + LOG.error("Retried {} times still failed.", retries); + return Utils.completedCf(r); + } - // Async refresh route info - final CompletableFuture> rwf = this.routerClient - .routeRefreshFor(reqCtx, toRefresh) + final CompletableFuture> rwf = this.routerClient.routeFor(reqCtx, toRefresh) // Even for some data that does not require a refresh of the routing table, // we still wait until the routing table is flushed successfully before // retrying it, in order to give the server a break. .thenComposeAsync(routes -> write0(reqCtx, pointsToRetry, ctx, retries + 1), this.asyncPool); + // Should not retry + final Optional noRetryErr = err.stream() // + .filter(Utils::shouldNotRetry) // + .reduce(Err::combine); return noRetryErr.isPresent() ? rwf.thenApplyAsync(ret -> Utils.combineResult(noRetryErr.get().mapToResult(), ret), this.asyncPool) : diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java b/ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java index d91e132..a3a6da8 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java @@ -13,10 +13,9 @@ */ public final class Result { - public static final int SUCCESS = 200; - public static final int INVALID_ROUTE = 302; - public static final int SHOULD_RETRY = 310; - public static final int FLOW_CONTROL = 503; + public static final int SUCCESS = 200; + public static final int BAD_REQUEST = 400; + public static final int FLOW_CONTROL = 503; private final Ok ok; private final Err err; diff --git a/ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java b/ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java index 274c829..90895e2 100644 --- a/ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java +++ b/ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java @@ -331,15 +331,11 @@ public static boolean shouldNotRetry(final Err err) { } public static boolean shouldRetry(final Err err) { - if (err == null) { - return false; - } - final int errCode = err.getCode(); - return errCode == Result.INVALID_ROUTE || errCode == Result.SHOULD_RETRY; + return false; } public static boolean shouldRefreshRouteTable(final Err err) { - return err.getCode() == Result.INVALID_ROUTE; + return err != null; } public static Observer toUnaryObserver(final CompletableFuture future) { diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java index 8b65e7d..68bdfc8 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java @@ -57,7 +57,7 @@ public void after() { @Test(expected = IllegalStateException.class) public void withoutInitTest() { - final List points = TestUtil.newTablePoints("test_table1_not_init"); + final List points = TestUtil.newTableTwoPoints("test_table1_not_init"); this.client.write(new WriteRequest(points)); } @@ -80,7 +80,7 @@ public void instancesTest() { public void helloWorldTest() throws ExecutionException, InterruptedException { initAndMockWriteClient(); - final List points = TestUtil.newTablePoints("test_table1"); + final List points = TestUtil.newTableTwoPoints("test_table1"); Mockito.when(this.writeClient.write(new WriteRequest(Mockito.anyList()), Mockito.any())) // .thenReturn(Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult())); diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/WriteClientTest.java b/ceresdb-protocol/src/test/java/io/ceresdb/WriteClientTest.java index adc6f8f..7e975da 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/WriteClientTest.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/WriteClientTest.java @@ -112,263 +112,6 @@ public void writeAllSuccessTest() throws ExecutionException, InterruptedExceptio Assert.assertEquals(new Integer(0), ret.mapOr(-1, WriteOk::getFailed)); } - @Test - public void write3And1InvalidRoute() throws ExecutionException, InterruptedException { - final List data = TestUtil.newMultiTablePoints("write_client_test_table1", // - "write_client_test_table2", // - "write_client_test_table3"); - - final Endpoint ep1 = Endpoint.of("127.0.0.1", 8081); - final Endpoint ep2 = Endpoint.of("127.0.0.2", 8081); - final Endpoint ep3 = Endpoint.of("127.0.0.3", 8081); - final Endpoint ep4 = Endpoint.of("127.0.0.4", 8081); - - final Storage.WriteResponse resp = TestUtil.newSuccessWriteResp(2); - final Storage.WriteResponse errResp = TestUtil.newFailedWriteResp(Result.INVALID_ROUTE, 2); - - Mockito.when(this.routerClient.invoke(Mockito.eq(ep1), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep2), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep3), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep4), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.routeFor(Mockito.any(), Mockito.eq(TestUtil.asSet("write_client_test_table1", // - "write_client_test_table2", "write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -8646902388192715970L; - - { - put("write_client_test_table1", Route.of("write_client_test_table1", ep1)); - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeRefreshFor(Mockito.any(), Mockito.any())) - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -3271323053870289591L; - - { - put("write_client_test_table3", Route.of("write_client_test_table3", ep4)); - } - })); - Mockito.when(this.routerClient.routeFor(Mockito.any(), Mockito.eq(TestUtil.asSet("write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = 4340010451723257789L; - - { - put("write_client_test_table3", Route.of("write_client_test_table3", ep4)); - } - })); - - final CompletableFuture> f = this.writeClient.write(new WriteRequest(data), - Context.newDefault()); - final Result ret = f.get(); - - Assert.assertTrue(ret.isOk()); - Assert.assertEquals(new Integer(6), ret.mapOr(0, WriteOk::getSuccess)); - Assert.assertEquals(new Integer(0), ret.mapOr(-1, WriteOk::getFailed)); - } - - @Test - public void write3And1InvalidRouteAndRetryFailed() throws ExecutionException, InterruptedException { - final List data = TestUtil.newMultiTablePoints("write_client_test_table1", // - "write_client_test_table2", // - "write_client_test_table3"); - - final Endpoint ep1 = Endpoint.of("127.0.0.1", 8081); - final Endpoint ep2 = Endpoint.of("127.0.0.2", 8081); - final Endpoint ep3 = Endpoint.of("127.0.0.3", 8081); - - final Storage.WriteResponse resp = TestUtil.newSuccessWriteResp(2); - final Storage.WriteResponse errResp = TestUtil.newFailedWriteResp(Result.INVALID_ROUTE, 2); - - Mockito.when(this.routerClient.invoke(Mockito.eq(ep1), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep2), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep3), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp)); - Mockito.when(this.routerClient.routeFor(Mockito.any(), - Mockito.eq(TestUtil.asSet("write_client_test_table1", "write_client_test_table2", - "write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -7535390185627686991L; - - { - put("write_client_test_table1", Route.of("write_client_test_table1", ep1)); - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeRefreshFor(Mockito.any(), Mockito.any())) - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -3191375160670801662L; - - { - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeFor(Mockito.any(), Mockito.eq(TestUtil.asSet("write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = 1341458669202248824L; - - { - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - - final CompletableFuture> f = this.writeClient.write(new WriteRequest(data), - Context.newDefault()); - final Result ret = f.get(); - - final int success = ret.mapOrElse(err -> -1, WriteOk::getSuccess); - Assert.assertEquals(-1, success); - Assert.assertFalse(ret.isOk()); - Assert.assertEquals(4, ret.getErr().getSubOk().getSuccess()); - Assert.assertEquals(0, ret.getErr().getSubOk().getFailed()); - Assert.assertEquals(2, ret.getErr().getFailedWrites().size()); - } - - @Test - public void write3And2FailedAndRetryFailed() throws ExecutionException, InterruptedException { - final List data = TestUtil.newMultiTablePoints("write_client_test_table1", // - "write_client_test_table2", // - "write_client_test_table3"); - - final Endpoint ep1 = Endpoint.of("127.0.0.1", 8081); - final Endpoint ep2 = Endpoint.of("127.0.0.2", 8081); - final Endpoint ep3 = Endpoint.of("127.0.0.3", 8081); - - final Storage.WriteResponse resp = TestUtil.newSuccessWriteResp(2); - final Storage.WriteResponse errResp = TestUtil.newFailedWriteResp(Result.SHOULD_RETRY, 2); - - Mockito.when(this.routerClient.invoke(Mockito.eq(ep1), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep2), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep3), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp)); - Mockito.when(this.routerClient.routeFor(Mockito.any(), - Mockito.eq(TestUtil.asSet("write_client_test_table1", "write_client_test_table2", - "write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -5936788008084035345L; - - { - put("write_client_test_table1", Route.of("write_client_test_table1", ep1)); - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeRefreshFor(Mockito.any(), Mockito.any())) - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -4748944007591733357L; - - { - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeFor(Mockito.any(), - Mockito.eq(TestUtil.asSet("write_client_test_table2", "write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -1811964578845864624L; - - { - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeFor(Mockito.any(), - Mockito.eq(TestUtil.asSet("write_client_test_table3", "write_client_test_table2")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = 3940955382371644111L; - - { - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - - final CompletableFuture> f = this.writeClient.write(new WriteRequest(data), - Context.newDefault()); - final Result ret = f.get(); - - Assert.assertFalse(ret.isOk()); - Assert.assertEquals(2, ret.getErr().getSubOk().getSuccess()); - Assert.assertEquals(0, ret.getErr().getSubOk().getFailed()); - Assert.assertTrue(ret.getErr().stream().findFirst().isPresent()); - Assert.assertEquals(2, ret.getErr().stream().count()); - Assert.assertEquals(4, ret.getErr().stream().mapToInt(err -> err.getFailedWrites().size()).sum()); - - } - - @Test - public void write3And2FailedAndSomeNoRetry() throws ExecutionException, InterruptedException { - final List data = TestUtil.newMultiTablePoints("write_client_test_table1", // - "write_client_test_table2", // - "write_client_test_table3"); - - final Endpoint ep1 = Endpoint.of("127.0.0.1", 8081); - final Endpoint ep2 = Endpoint.of("127.0.0.2", 8081); - final Endpoint ep3 = Endpoint.of("127.0.0.3", 8081); - - final Storage.WriteResponse resp = TestUtil.newSuccessWriteResp(2); - final Storage.WriteResponse errResp1 = TestUtil.newFailedWriteResp(Result.SHOULD_RETRY, 2); - final Storage.WriteResponse errResp2 = TestUtil.newFailedWriteResp(400, 2); - - Mockito.when(this.routerClient.invoke(Mockito.eq(ep1), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(resp)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep2), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp1)); - Mockito.when(this.routerClient.invoke(Mockito.eq(ep3), Mockito.any(), Mockito.any())) // - .thenReturn(Utils.completedCf(errResp2)); - Mockito.when(this.routerClient.routeFor(Mockito.any(), - Mockito.eq(TestUtil.asSet("write_client_test_table1", "write_client_test_table2", - "write_client_test_table3")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = 1040769477529210661L; - - { - put("write_client_test_table1", Route.of("write_client_test_table1", ep1)); - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeRefreshFor(Mockito.any(), Mockito.any())) - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -6892083230027668740L; - - { - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - put("write_client_test_table3", Route.of("write_client_test_table3", ep3)); - } - })); - Mockito.when(this.routerClient.routeFor(Mockito.any(), Mockito.eq(TestUtil.asSet("write_client_test_table2")))) // - .thenReturn(Utils.completedCf(new HashMap() { - private static final long serialVersionUID = -9174308983134252825L; - - { - put("write_client_test_table2", Route.of("write_client_test_table2", ep2)); - } - })); - - final CompletableFuture> f = this.writeClient.write(new WriteRequest(data), - Context.newDefault()); - final Result ret = f.get(); - - Assert.assertFalse(ret.isOk()); - Assert.assertEquals(2, ret.getErr().getSubOk().getSuccess()); - Assert.assertEquals(0, ret.getErr().getSubOk().getFailed()); - // TODO - // How to process multi err - //Assert.assertEquals(1, ret.getErr().getFailedWrites().size()); - //Assert.assertTrue(ret.getErr().stream().findFirst().isPresent()); - //Assert.assertEquals(1, ret.getErr().stream().findFirst().get().getFailedWrites().size()); - } - @Test public void writeSplitTest() throws ExecutionException, InterruptedException { writeSplit(1, 1); @@ -473,11 +216,11 @@ public void onCompleted() { final StreamWriteBuf writer = this.writeClient.streamWrite(testTable); final CompletableFuture ret = writer // - .write(TestUtil.newTablePoints(testTable)) // - .write(TestUtil.newTablePoints(testTable)) // - .write(TestUtil.newTablePoints(testTable)) // + .write(TestUtil.newTableTwoPoints(testTable)) // + .write(TestUtil.newTableTwoPoints(testTable)) // + .write(TestUtil.newTableTwoPoints(testTable)) // .flush() // - .write(TestUtil.newTablePoints(testTable)) // + .write(TestUtil.newTableTwoPoints(testTable)) // .flush() // .writeAndFlush(TestUtil.newMultiTablePoints(testTable, testTable)) // .completed(); diff --git a/ceresdb-protocol/src/test/java/io/ceresdb/util/TestUtil.java b/ceresdb-protocol/src/test/java/io/ceresdb/util/TestUtil.java index b02d612..05f85d8 100644 --- a/ceresdb-protocol/src/test/java/io/ceresdb/util/TestUtil.java +++ b/ceresdb-protocol/src/test/java/io/ceresdb/util/TestUtil.java @@ -18,7 +18,7 @@ public class TestUtil { - public static List newTablePoints(final String table) { + public static List newTableTwoPoints(final String table) { final long time = Clock.defaultClock().getTick() - 1; List data = new ArrayList<>(); @@ -39,7 +39,7 @@ public static List newTablePoints(final String table) { public static List newMultiTablePoints(final String... tables) { final List pointsList = new ArrayList<>(); for (final String table : tables) { - pointsList.addAll(newTablePoints(table)); + pointsList.addAll(newTableTwoPoints(table)); } return pointsList; }