Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ private long onReceived(final boolean onError) {
MetricsUtil.timer(REQ_RT, method.getFullMethodName(), address).update(duration, TimeUnit.MILLISECONDS);

if (onError) {
MetricsUtil.meter(REQ_FAILED, method).mark();
MetricsUtil.meter(REQ_FAILED, method, address).mark();
MetricsUtil.meter(REQ_FAILED, method.getFullMethodName()).mark();
MetricsUtil.meter(REQ_FAILED, method.getFullMethodName(), address).mark();
}

return duration;
Expand Down
16 changes: 9 additions & 7 deletions ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,26 @@ private CompletableFuture<Result<SqlQueryOk, Err>> query0(final SqlQueryRequest
}

final Err err = r.getErr();
LOG.warn("Failed to read from {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
if (retries > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}
LOG.warn("Failed to read from {}, err={}.", Utils.DB_NAME, err);

// Should refresh route table
final Set<String> toRefresh = err.stream() //
.filter(Utils::shouldRefreshRouteTable) //
.flatMap(e -> e.getFailedTables().stream()) //
.collect(Collectors.toSet());
this.routerClient.clearRouteCacheBy(toRefresh);

if (toRefresh.isEmpty()) {
// Should not retry
if (Utils.shouldNotRetry(err)) {
return Utils.completedCf(r);
}

// Async to refresh route info
return this.routerClient.routeRefreshFor(req.getReqCtx(), toRefresh)
if (retries > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}
return this.routerClient.routeFor(req.getReqCtx(), toRefresh)
.thenComposeAsync(routes -> query0(req, ctx, retries + 1), this.asyncPool);
}, this.asyncPool);
}
Expand Down
26 changes: 14 additions & 12 deletions ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,39 +234,41 @@ private CompletableFuture<Result<WriteOk, Err>> write0(final RequestContext reqC
}

final Err err = r.getErr();
LOG.warn("Failed to write to {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
if (retries + 1 > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}
LOG.warn("Failed to write to {}, err={}.", Utils.DB_NAME, err);

// Should refresh route table
final Set<String> toRefresh = err.stream() //
.filter(Utils::shouldRefreshRouteTable) //
.flatMap(e -> e.getFailedWrites().stream()) //
.map(Point::getTable) //
.collect(Collectors.toSet());
this.routerClient.clearRouteCacheBy(toRefresh);

// Should retry
final List<Point> pointsToRetry = err.stream() //
.filter(Utils::shouldRetry) //
.flatMap(e -> e.getFailedWrites().stream()) //
.collect(Collectors.toList());
if (pointsToRetry.isEmpty()) {
return Utils.completedCf(r);
}

// Should not retry
final Optional<Err> noRetryErr = err.stream() //
.filter(Utils::shouldNotRetry) //
.reduce(Err::combine);
if (retries + 1 > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}

// Async refresh route info
final CompletableFuture<Result<WriteOk, Err>> rwf = this.routerClient
.routeRefreshFor(reqCtx, toRefresh)
final CompletableFuture<Result<WriteOk, Err>> rwf = this.routerClient.routeFor(reqCtx, toRefresh)
// Even for some data that does not require a refresh of the routing table,
// we still wait until the routing table is flushed successfully before
// retrying it, in order to give the server a break.
.thenComposeAsync(routes -> write0(reqCtx, pointsToRetry, ctx, retries + 1),
this.asyncPool);

// Should not retry
final Optional<Err> noRetryErr = err.stream() //
.filter(Utils::shouldNotRetry) //
.reduce(Err::combine);
return noRetryErr.isPresent() ?
rwf.thenApplyAsync(ret -> Utils.combineResult(noRetryErr.get().mapToResult(), ret),
this.asyncPool) :
Expand Down
7 changes: 3 additions & 4 deletions ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
*/
public final class Result<Ok, Err> {

public static final int SUCCESS = 200;
public static final int INVALID_ROUTE = 302;
public static final int SHOULD_RETRY = 310;
public static final int FLOW_CONTROL = 503;
public static final int SUCCESS = 200;
public static final int BAD_REQUEST = 400;
public static final int FLOW_CONTROL = 503;

private final Ok ok;
private final Err err;
Expand Down
8 changes: 2 additions & 6 deletions ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,11 @@ public static boolean shouldNotRetry(final Err err) {
}

public static boolean shouldRetry(final Err err) {
if (err == null) {
return false;
}
final int errCode = err.getCode();
return errCode == Result.INVALID_ROUTE || errCode == Result.SHOULD_RETRY;
return false;
}

public static boolean shouldRefreshRouteTable(final Err err) {
return err.getCode() == Result.INVALID_ROUTE;
return err != null;
}

public static <V> Observer<V> toUnaryObserver(final CompletableFuture<V> future) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void after() {

@Test(expected = IllegalStateException.class)
public void withoutInitTest() {
final List<Point> points = TestUtil.newTablePoints("test_table1_not_init");
final List<Point> points = TestUtil.newTableTwoPoints("test_table1_not_init");
this.client.write(new WriteRequest(points));
}

Expand All @@ -80,7 +80,7 @@ public void instancesTest() {
public void helloWorldTest() throws ExecutionException, InterruptedException {
initAndMockWriteClient();

final List<Point> points = TestUtil.newTablePoints("test_table1");
final List<Point> points = TestUtil.newTableTwoPoints("test_table1");

Mockito.when(this.writeClient.write(new WriteRequest(Mockito.anyList()), Mockito.any())) //
.thenReturn(Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult()));
Expand Down
Loading